1
mirror of git://git.acid.vegas/random.git synced 2025-01-09 23:16:37 +00:00
random/irc/bots/limitserv.py

265 lines
7.8 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# LimitServ IRC Service Bot - Developed by acidvegas in Python (https://acid.vegas/random)
import socket
import threading
import time
# Configuration
_connection = {'server':'irc.server.com', 'port':6697, 'ssl':True, 'ssl_verify':False, 'ipv6':False, 'vhost':None}
_cert = {'file':None, 'key':None, 'password':None}
_ident = {'nickname':'LimitServ', 'username':'services', 'realname':'Channel Limit Service'}
_login = {'nickserv':None, 'network':None, 'operator':None}
_throttle = {'limit':300, 'queue':0.5, 'voice':10}
_settings = {'anope':False, 'honeypot':'#blackhole', 'limit':10, 'modes':None}
def debug(msg):
print(f'{get_time()} | [~] - {msg}')
def error(msg, reason):
print(f'{get_time()} | [!] - {msg} ({reason})')
def get_time():
return time.strftime('%I:%M:%S')
class IRC(object):
def __init__(self):
self._channels = list()
self._names = list()
self._queue = list()
self._voices = dict()
self._sock = None
def _run(self):
Loop._loops()
self._connect()
def _connect(self):
try:
self._create_socket()
self._sock.connect((_connection['server'], _connection['port']))
self._register()
except socket.error as ex:
error('Failed to connect to IRC server.', ex)
Event._disconnect()
else:
self._listen()
def _create_socket(self):
self._sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) if _connection['ipv6'] else socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if _connection['vhost']:
self._sock.bind((_connection['vhost'], 0))
if _connection['ssl']:
ctx = ssl.SSLContext()
if _cert['file']:
ctx.load_cert_chain(_cert['file'], _cert['key'], _cert['password'])
if _connection['ssl_verify']:
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_default_certs()
else:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self._sock = ctx.wrap_socket(self._sock)
def _listen(self):
while True:
try:
data = self._sock.recv(1024).decode('utf-8')
for line in (line for line in data.split('\r\n') if len(line.split()) >= 2):
debug(line)
Event._handle(line)
except (UnicodeDecodeError,UnicodeEncodeError):
pass
except Exception as ex:
error('Unexpected error occured.', ex)
break
Event._disconnect()
def _register(self):
if _login['network']:
Bot._queue.append('PASS ' + _login['network'])
Bot._queue.append('USER {0} 0 * :{1}'.format(_ident['username'], _ident['realname']))
Bot._queue.append('NICK ' + _ident['nickname'])
class Command:
def _join(chan, key=None):
Bot._queue.append('JOIN {chan} {key}') if key else Bot._queue.append('JOIN ' + chan)
def _kick(chan, nick, msg=None):
Bot._queue.append(f'KICK {chan} {nick} {msg}') if msg else Bot._queue.append(f'KICK {chan} {nick}')
def _mode(target, mode):
Bot._queue.append(f'MODE {target} {mode}')
def _raw(msg):
Bot._sock.send(bytes(msg[:510] + '\r\n', 'utf-8'))
def _sendmsg(target, msg):
Bot._queue.append(f'PRIVMSG {target} :{msg}')
class Event:
def _connect():
if _settings['modes']:
Command._mode(_ident['nickname'], '+' + _settings['modes'])
if _login['nickserv']:
Command._sendmsg('NickServ', 'IDENTIFY {0} {1}'.format(_ident['nickname'], _login['nickserv']))
if _login['operator']:
Bot._queue.append('OPER {0} {1}'.format(_ident['username'], _login['operator']))
def _disconnect():
Bot._sock.close()
Bot._names = list()
Bot._queue = list()
Bot._voices = dict()
time.sleep(15)
Bot.connect()
def _end_of_names(chan):
limit = str(len(Bot._names) + _settings['limit'])
if settings['anope']:
Command._sendmsg('ChanServ', 'MODE {0} LOCK ADD +lL {1} {2}'.format(chan, limit, _settings['honeypot']))
else:
Command._mode(chan, '+lL {0} {1}'.format(limit, _settings['honeypot']))
Bot._names = list()
def _join(nick, chan):
if nick == _ident['nickname'].lower():
if chan not in Bot._channels:
Bot._channels.append(chan)
if chan not in Bot._voices:
Bot._voices[chan] = dict()
elif chan in Bot._channels:
if nick not in Bot._voices[chan]:
Bot._voices[chan][nick] = time.time()
def _kick(nick, chan, kicked):
if nick == _ident['nickname'].lower():
Bot._channels.remove(chan)
del Bot._voices[chan]
time.sleep(3)
Command._join(chan)
elif chan in Bot._channels:
if nick in Bot._voices[chan]:
del Bot._voices[chan][nick]
def _names(chan, nicks):
for name in nicks:
if name[:1] in '~!@%&+':
name = name[1:]
Bot._names.append(name)
def _nick(nick, new_nick):
for chan in Bot._voices:
if nick in Bot._voices[chan]:
Bot._voices[chan][new_nick] = Bot._voices[chan][nick]
del Bot._voices[chan][nick]
def _no_such_nick(nick):
for chan in Bot._voices:
if nick in Bot._voices[chan]:
del Bot.voices[chan][nick]
def _part(nick, chan):
if nick == _ident['nickname'].lower():
Bot._channels.remove(chan)
del Bot._voices[chan]
elif chan in Bot._channels:
if nick in Bot._voices[chan]:
del Bot._voices[chan][nick]
def _quit(nick):
for chan in Bot._voices:
if nick in Bot._voices[chan]:
del Bot._voices[chan][nick]
def _handle(data):
args = data.split()
if data.startswith('ERROR :Closing Link:'):
raise Exception('Connection has closed.')
elif data.startswith('ERROR :Reconnecting too fast, throttled.'):
raise Exception('Connection has closed. (throttled)')
elif args[0] == 'PING':
Command._raw('PONG ' + args[1][1:])
elif args[1] == '001': # RPL_WELCOME
Event._connect()
elif args[1] == '401': # ERR_NOSUCHNICK
nick = args[3].lower()
Event._no_such_nick(nick)
elif args[1] == '433': # ERR_NICKNAMEINUSE
raise Exception('Bot is already running or nick is in use.')
elif args[1] == '353' and len(args) >= 6: #RPL_NAMREPLY
chan = args[4].lower()
names = ' '.join(args[5:]).lower()[1:].split()
Event._names(chan, names)
elif args[1] == '366' and len(args) >= 4: # RPL_ENDOFNAMES
chan = args[3].lower()
Event._end_of_names(chan)
elif args[1] == 'JOIN' and len(args) == 3:
nick = args[0].split('!')[0][1:].lower()
chan = args[2][1:].lower()
Event._join(nick, chan)
elif args[1] == 'KICK' and len(args) >= 4:
nick = args[0].split('!')[0][1:].lower()
chan = args[2].lower()
kicked = args[3].lower()
Event._kick(nick, chan, kicked)
elif args[1] == 'NICK' and len(args) == 3:
nick = args[0].split('!')[0][1:].lower()
new_nick = args[2][1:].lower()
Event._nick(nick, new_nick)
elif args[1] == 'PART' and len(args) >= 3:
nick = args[0].split('!')[0][1:].lower()
chan = args[2].lower()
Event._part(nick, chan)
elif args[1] == 'QUIT':
nick = args[0].split('!')[0][1:].lower()
Event._quit(nick)
class Loop:
def _loops():
threading.Thread(target=Loop._queue).start() # start first to handle incoming data
threading.Thread(target=Loop._limit).start()
threading.Thread(target=Loop._voice).start()
def _limit():
while True:
try:
for chan in Bot._channels:
Bot._queue.append('NAMES ' + chan)
except Exception as ex:
error('Error occured in the loop!', ex)
finally:
time.sleep(_throttle['limit'])
def _queue():
while True:
try:
if Bot._queue:
Command._raw(Bot._queue.pop(0))
except Exception as ex:
error('Error occured in the queue loop!', ex)
finally:
time.sleep(_throttle['queue'])
def _voice():
while True:
try:
for chan in Bot._voices:
nicks = [nick for nick in Bot._voices[chan] if time.time() - Bot._voices[chan][nick] > _throttle['voice']]
for item in [nicks[i:i + 4] for i in range(0, len(nicks), 4)]:
Command._mode(chan, '+{0} {1}'.format('v'*len(item), ' '.join(item)))
for subitem in item:
del Bot._voices[chan][subitem]
except Exception as ex:
error('Error occured in the voice loop!', ex)
finally:
time.sleep(1)
# Main
if _connection['ssl']:
import ssl
else:
del cert, _connection['verify']
Bot = IRC()
Bot._run()