mirror of
git://git.acid.vegas/archive.git
synced 2024-11-14 12:16:40 +00:00
265 lines
7.8 KiB
Python
265 lines
7.8 KiB
Python
#!/usr/bin/env python
|
|
# LimitServ IRC Service Bot - Developed by acidvegas in Python (https://acid.vegas/random)
|
|
|
|
import socket
|
|
import threading
|
|
import time
|
|
|
|
# Configuration
|
|
_connection = {'server':'irc.server.com', 'port':6697, 'ssl':True, 'ssl_verify':False, 'ipv6':False, 'vhost':None}
|
|
_cert = {'file':None, 'key':None, 'password':None}
|
|
_ident = {'nickname':'LimitServ', 'username':'services', 'realname':'Channel Limit Service'}
|
|
_login = {'nickserv':'fartsimpson29', 'network':None, 'operator':'simps0nsfan69'}
|
|
_throttle = {'limit':300, 'queue':0.5, 'voice':10}
|
|
_settings = {'anope':False, 'honeypot':'#blackhole', 'limit':10, 'modes':None}
|
|
|
|
def debug(msg):
|
|
print(f'{get_time()} | [~] - {msg}')
|
|
|
|
def error(msg, reason):
|
|
print(f'{get_time()} | [!] - {msg} ({reason})')
|
|
|
|
def get_time():
|
|
return time.strftime('%I:%M:%S')
|
|
|
|
class IRC(object):
|
|
def __init__(self):
|
|
self._channels = list()
|
|
self._names = list()
|
|
self._queue = list()
|
|
self._voices = dict()
|
|
self._sock = None
|
|
|
|
def _run(self):
|
|
Loop._loops()
|
|
self._connect()
|
|
|
|
def _connect(self):
|
|
try:
|
|
self._create_socket()
|
|
self._sock.connect((_connection['server'], _connection['port']))
|
|
self._register()
|
|
except socket.error as ex:
|
|
error('Failed to connect to IRC server.', ex)
|
|
Event._disconnect()
|
|
else:
|
|
self._listen()
|
|
|
|
def _create_socket(self):
|
|
self._sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) if _connection['ipv6'] else socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
if _connection['vhost']:
|
|
self._sock.bind((_connection['vhost'], 0))
|
|
if _connection['ssl']:
|
|
ctx = ssl.SSLContext()
|
|
if _cert['file']:
|
|
ctx.load_cert_chain(_cert['file'], _cert['key'], _cert['password'])
|
|
if _connection['ssl_verify']:
|
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
|
ctx.load_default_certs()
|
|
else:
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
self._sock = ctx.wrap_socket(self._sock)
|
|
|
|
def _listen(self):
|
|
while True:
|
|
try:
|
|
data = self._sock.recv(1024).decode('utf-8')
|
|
for line in (line for line in data.split('\r\n') if len(line.split()) >= 2):
|
|
debug(line)
|
|
Event._handle(line)
|
|
except (UnicodeDecodeError,UnicodeEncodeError):
|
|
pass
|
|
except Exception as ex:
|
|
error('Unexpected error occured.', ex)
|
|
break
|
|
Event._disconnect()
|
|
|
|
def _register(self):
|
|
if _login['network']:
|
|
Bot._queue.append('PASS ' + _login['network'])
|
|
Bot._queue.append('USER {0} 0 * :{1}'.format(_ident['username'], _ident['realname']))
|
|
Bot._queue.append('NICK ' + _ident['nickname'])
|
|
|
|
class Command:
|
|
def _join(chan, key=None):
|
|
Bot._queue.append('JOIN {chan} {key}') if key else Bot._queue.append('JOIN ' + chan)
|
|
|
|
def _kick(chan, nick, msg=None):
|
|
Bot._queue.append(f'KICK {chan} {nick} {msg}') if msg else Bot._queue.append(f'KICK {chan} {nick}')
|
|
|
|
def _mode(target, mode):
|
|
Bot._queue.append(f'MODE {target} {mode}')
|
|
|
|
def _raw(msg):
|
|
Bot._sock.send(bytes(msg[:510] + '\r\n', 'utf-8'))
|
|
|
|
def _sendmsg(target, msg):
|
|
Bot._queue.append(f'PRIVMSG {target} :{msg}')
|
|
|
|
class Event:
|
|
def _connect():
|
|
if _settings['modes']:
|
|
Command._mode(_ident['nickname'], '+' + _settings['modes'])
|
|
if _login['nickserv']:
|
|
Command._sendmsg('NickServ', 'IDENTIFY {0} {1}'.format(_ident['nickname'], _login['nickserv']))
|
|
if _login['operator']:
|
|
Bot._queue.append('OPER {0} {1}'.format(_ident['username'], _login['operator']))
|
|
|
|
def _disconnect():
|
|
Bot._sock.close()
|
|
Bot._names = list()
|
|
Bot._queue = list()
|
|
Bot._voices = dict()
|
|
time.sleep(15)
|
|
Bot.connect()
|
|
|
|
def _end_of_names(chan):
|
|
limit = str(len(Bot._names) + _settings['limit'])
|
|
if settings['anope']:
|
|
Command._sendmsg('ChanServ', 'MODE {0} LOCK ADD +lL {1} {2}'.format(chan, limit, _settings['honeypot']))
|
|
else:
|
|
Command._mode(chan, '+lL {0} {1}'.format(limit, _settings['honeypot']))
|
|
Bot._names = list()
|
|
|
|
def _join(nick, chan):
|
|
if nick == _ident['nickname'].lower():
|
|
if chan not in Bot._channels:
|
|
Bot._channels.append(chan)
|
|
if chan not in Bot._voices:
|
|
Bot._voices[chan] = dict()
|
|
elif chan in Bot._channels:
|
|
if nick not in Bot._voices[chan]:
|
|
Bot._voices[chan][nick] = time.time()
|
|
|
|
def _kick(nick, chan, kicked):
|
|
if nick == _ident['nickname'].lower():
|
|
Bot._channels.remove(chan)
|
|
del Bot._voices[chan]
|
|
time.sleep(3)
|
|
Command._join(chan)
|
|
elif chan in Bot._channels:
|
|
if nick in Bot._voices[chan]:
|
|
del Bot._voices[chan][nick]
|
|
|
|
def _names(chan, nicks):
|
|
for name in nicks:
|
|
if name[:1] in '~!@%&+':
|
|
name = name[1:]
|
|
Bot._names.append(name)
|
|
|
|
def _nick(nick, new_nick):
|
|
for chan in Bot._voices:
|
|
if nick in Bot._voices[chan]:
|
|
Bot._voices[chan][new_nick] = Bot._voices[chan][nick]
|
|
del Bot._voices[chan][nick]
|
|
|
|
def _no_such_nick(nick):
|
|
for chan in Bot._voices:
|
|
if nick in Bot._voices[chan]:
|
|
del Bot.voices[chan][nick]
|
|
|
|
def _part(nick, chan):
|
|
if nick == _ident['nickname'].lower():
|
|
Bot._channels.remove(chan)
|
|
del Bot._voices[chan]
|
|
elif chan in Bot._channels:
|
|
if nick in Bot._voices[chan]:
|
|
del Bot._voices[chan][nick]
|
|
|
|
def _quit(nick):
|
|
for chan in Bot._voices:
|
|
if nick in Bot._voices[chan]:
|
|
del Bot._voices[chan][nick]
|
|
|
|
def _handle(data):
|
|
args = data.split()
|
|
if data.startswith('ERROR :Closing Link:'):
|
|
raise Exception('Connection has closed.')
|
|
elif data.startswith('ERROR :Reconnecting too fast, throttled.'):
|
|
raise Exception('Connection has closed. (throttled)')
|
|
elif args[0] == 'PING':
|
|
Command._raw('PONG ' + args[1][1:])
|
|
elif args[1] == '001': # RPL_WELCOME
|
|
Event._connect()
|
|
elif args[1] == '401': # ERR_NOSUCHNICK
|
|
nick = args[3].lower()
|
|
Event._no_such_nick(nick)
|
|
elif args[1] == '433': # ERR_NICKNAMEINUSE
|
|
raise Exception('Bot is already running or nick is in use.')
|
|
elif args[1] == '353' and len(args) >= 6: #RPL_NAMREPLY
|
|
chan = args[4].lower()
|
|
names = ' '.join(args[5:]).lower()[1:].split()
|
|
Event._names(chan, names)
|
|
elif args[1] == '366' and len(args) >= 4: # RPL_ENDOFNAMES
|
|
chan = args[3].lower()
|
|
Event._end_of_names(chan)
|
|
elif args[1] == 'JOIN' and len(args) == 3:
|
|
nick = args[0].split('!')[0][1:].lower()
|
|
chan = args[2][1:].lower()
|
|
Event._join(nick, chan)
|
|
elif args[1] == 'KICK' and len(args) >= 4:
|
|
nick = args[0].split('!')[0][1:].lower()
|
|
chan = args[2].lower()
|
|
kicked = args[3].lower()
|
|
Event._kick(nick, chan, kicked)
|
|
elif args[1] == 'NICK' and len(args) == 3:
|
|
nick = args[0].split('!')[0][1:].lower()
|
|
new_nick = args[2][1:].lower()
|
|
Event._nick(nick, new_nick)
|
|
elif args[1] == 'PART' and len(args) >= 3:
|
|
nick = args[0].split('!')[0][1:].lower()
|
|
chan = args[2].lower()
|
|
Event._part(nick, chan)
|
|
elif args[1] == 'QUIT':
|
|
nick = args[0].split('!')[0][1:].lower()
|
|
Event._quit(nick)
|
|
|
|
class Loop:
|
|
def _loops():
|
|
threading.Thread(target=Loop._queue).start() # start first to handle incoming data
|
|
threading.Thread(target=Loop._limit).start()
|
|
threading.Thread(target=Loop._voice).start()
|
|
|
|
def _limit():
|
|
while True:
|
|
try:
|
|
for chan in Bot._channels:
|
|
Bot._queue.append('NAMES ' + chan)
|
|
except Exception as ex:
|
|
error('Error occured in the loop!', ex)
|
|
finally:
|
|
time.sleep(_throttle['limit'])
|
|
|
|
def _queue():
|
|
while True:
|
|
try:
|
|
if Bot._queue:
|
|
Command._raw(Bot._queue.pop(0))
|
|
except Exception as ex:
|
|
error('Error occured in the queue loop!', ex)
|
|
finally:
|
|
time.sleep(_throttle['queue'])
|
|
|
|
def _voice():
|
|
while True:
|
|
try:
|
|
for chan in Bot._voices:
|
|
nicks = [nick for nick in Bot._voices[chan] if time.time() - Bot._voices[chan][nick] > _throttle['voice']]
|
|
for item in [nicks[i:i + 4] for i in range(0, len(nicks), 4)]:
|
|
Command._mode(chan, '+{0} {1}'.format('v'*len(item), ' '.join(item)))
|
|
for subitem in item:
|
|
del Bot._voices[chan][subitem]
|
|
except Exception as ex:
|
|
error('Error occured in the voice loop!', ex)
|
|
finally:
|
|
time.sleep(1)
|
|
|
|
# Main
|
|
if _connection['ssl']:
|
|
import ssl
|
|
else:
|
|
del cert, _connection['verify']
|
|
Bot = IRC()
|
|
Bot._run()
|