scroll/scroll.py

375 lines
14 KiB
Python

#!/usr/bin/env python
# Scroll IRC Art Bot - Developed by acidvegas in Python (https://git.acid.vegas/scroll)
import asyncio
import json
import random
import re
import ssl
import sys
import time
import urllib.request
class connection:
server = 'irc.server.com'
port = 6697
ipv6 = False
ssl = True
vhost = None # Must in ('ip', port) format
channel = '#chats'
key = None
modes = 'BdDg'
class identity:
nickname = 'scroll'
username = 'scroll'
realname = 'git.acid.vegas/scroll'
nickserv = None
# Settings
admin = 'acidvegas!*@*' # Can use wildcards (Must be in nick!user@host format)
# Formatting Control Characters / Color Codes
bold = '\x02'
italic = '\x1D'
underline = '\x1F'
reverse = '\x16'
reset = '\x0f'
white = '00'
black = '01'
blue = '02'
green = '03'
red = '04'
brown = '05'
purple = '06'
orange = '07'
yellow = '08'
light_green = '09'
cyan = '10'
light_cyan = '11'
light_blue = '12'
pink = '13'
grey = '14'
light_grey = '15'
def color(msg, foreground, background=None):
return f'\x03{foreground},{background}{msg}{reset}' if background else f'\x03{foreground}{msg}{reset}'
def debug(data):
print('{0} | [~] - {1}'.format(time.strftime('%I:%M:%S'), data))
def error(data, reason=None):
print('{0} | [!] - {1} ({2})'.format(time.strftime('%I:%M:%S'), data, str(reason))) if reason else print('{0} | [!] - {1}'.format(time.strftime('%I:%M:%S'), data))
def get_url(url, git=False):
data = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'}
if git:
data['Accept'] = 'application/vnd.github.v3+json'
req = urllib.request.Request(url, headers=data)
return urllib.request.urlopen(req, timeout=10)
def is_admin(ident):
return re.compile(admin.replace('*','.*')).search(ident)
def ssl_ctx():
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
class Bot():
def __init__(self):
self.db = None
self.last = time.time()
self.loops = dict()
self.host = ''
self.playing = False
self.settings = {
'flood' : 1,
'ignore' : 'big,birds,doc,gorf,hang,nazi,pokemon',
'lines' : 500,
'msg' : 0.03,
'paste' : True,
'png_palette' : 'RGB99',
'png_quantize' : 99,
'png_width' : 80,
'results' : 25}
self.slow = False
self.reader = None
self.writer = None
async def raw(self, data):
self.writer.write(data[:510].encode('utf-8') + b'\r\n')
await self.writer.drain()
async def action(self, chan, msg):
await self.sendmsg(chan, f'\x01ACTION {msg}\x01')
async def sendmsg(self, target, msg):
await self.raw(f'PRIVMSG {target} :{msg}')
async def irc_error(self, chan, msg, reason=None):
await self.sendmsg(chan, '[{0}] {1} {2}'.format(color('ERROR', red), msg, color(f'({reason})', grey))) if reason else await self.sendmsg(chan, '[{0}] {1}'.format(color('ERROR', red), msg))
async def connect(self):
while True:
try:
options = {
'host' : connection.server,
'port' : connection.port,
'limit' : 1024,
'ssl' : ssl_ctx() if connection.ssl else None,
'family' : 10 if connection.ipv6 else 2,
'local_addr' : connection.vhost
}
self.reader, self.writer = await asyncio.wait_for(asyncio.open_connection(**options), 15)
await self.raw(f'USER {identity.username} 0 * :{identity.realname}')
await self.raw('NICK ' + identity.nickname)
except Exception as ex:
error('failed to connect to ' + connection.server, ex)
else:
await self.listen()
finally:
for item in self.loops:
if self.loops[item]:
self.loops[item].cancel()
self.loops = dict()
self.playing = False
self.slow = False
await asyncio.sleep(30)
async def sync(self):
cache = self.db
self.db = {'root':list()}
try:
sha = [item['sha'] for item in json.loads(get_url('https://api.github.com/repos/ircart/ircart/contents', True).read().decode('utf-8')) if item['path'] == 'ircart'][0]
files = json.loads(get_url(f'https://api.github.com/repos/ircart/ircart/git/trees/{sha}?recursive=true', True).read().decode('utf-8'))['tree']
for file in files:
if file['type'] != 'tree':
file['path'] = file['path'][:-4]
if '/' in file['path']:
dir = file['path'].split('/')[0]
name = file['path'].split('/')[1]
self.db[dir] = self.db[dir]+[name,] if dir in self.db else [name,]
else:
self.db['root'].append(file['path'])
except Exception as ex:
try:
await self.irc_error(connection.channel, 'failed to sync database', ex)
except:
error(connection.channel, 'failed to sync database', ex)
finally:
self.db = cache
async def play(self, chan, name, img=False, paste=False):
try:
if img or paste:
ascii = get_url(name)
else:
ascii = get_url(f'https://raw.githubusercontent.com/ircart/ircart/master/ircart/{name}.txt')
if ascii.getcode() == 200:
if img:
ascii = img2irc.convert(ascii.read(), img, int(self.settings['png_width']), self.settings['png_palette'], int(self.settings['png_quantize_colors']))
else:
ascii = ascii.readlines()
if len(ascii) > int(self.settings['lines']) and chan != '#scroll':
await self.irc_error(chan, 'file is too big', f'take those {len(ascii):,} lines to #scroll')
else:
if not img and not paste:
await self.action(chan, 'the ascii gods have chosen... ' + color(name, cyan))
for line in ascii:
if type(line) == bytes:
try:
line = line.decode()
except UnicodeError:
line = line.decode(chardet.detect(line)['encoding']).encode().decode() # TODO: Do we need to re-encode/decode in UTF-8?
line = line.replace('\n','').replace('\r','')
await self.sendmsg(chan, line + reset)
await asyncio.sleep(self.settings['msg'])
else:
await self.irc_error(chan, 'invalid name', name) if not img and not paste else await self.irc_error(chan, 'invalid url', name)
except Exception as ex:
try:
await self.irc_error(chan, 'error in play function', ex)
except:
error('error in play function', ex)
finally:
self.playing = False
async def listen(self):
while True:
try:
if self.reader.at_eof():
break
data = await asyncio.wait_for(self.reader.readuntil(b'\r\n'), 600)
line = data.decode('utf-8').strip()
args = line.split()
debug(line)
if line.startswith('ERROR :Closing Link:'):
raise Exception('Connection has closed.')
elif args[0] == 'PING':
await self.raw('PONG '+args[1][1:])
elif args[1] == '001':
if connection.modes:
await self.raw(f'MODE {identity.nickname} +{connection.modes}')
if identity.nickserv:
await self.sendmsg('NickServ', f'IDENTIFY {identity.nickname} {identity.nickserv}')
await self.raw(f'JOIN {connection.channel} {connection.key}') if connection.key else await self.raw('JOIN ' + connection.channel)
await self.raw('JOIN #scroll')
await self.sync()
elif args[1] == '311' and len(args) >= 6: # RPL_WHOISUSER
nick = args[2]
host = args[5]
if nick == identity.nickname:
self.host = host
elif args[1] == '433':
error('The bot is already running or nick is in use.')
elif args[1] == 'INVITE' and len(args) == 4:
invited = args[2]
chan = args[3][1:]
if invited == identity.nickname and chan in (connection.channel, '#scroll'):
await self.raw(f'JOIN {connection.channel} {connection.key}') if connection.key else await self.raw('JOIN ' + connection.channel)
elif args[1] == 'JOIN' and len(args) >= 3:
nick = args[0].split('!')[0][1:]
host = args[0].split('@')[1]
if nick == identity.nickname:
self.host = host
elif args[1] == 'KICK' and len(args) >= 4:
chan = args[2]
kicked = args[3]
if kicked == identity.nickname and chan in (connection.channel,'#scroll'):
await asyncio.sleep(3)
await self.raw(f'JOIN {connection.channel} {connection.key}') if connection.key else await self.raw('JOIN ' + connection.channel)
elif args[1] == 'PRIVMSG' and len(args) >= 4:
ident = args[0][1:]
nick = args[0].split('!')[0][1:]
chan = args[2]
msg = ' '.join(args[3:])[1:]
if chan in (connection.channel, '#scroll'):
args = msg.split()
if msg == '@scroll':
await self.sendmsg(chan, bold + 'Scroll IRC Art Bot - Developed by acidvegas in Python - https://git.acid.vegas/scroll')
elif args[0] == '.ascii':
if msg == '.ascii stop':
if self.playing:
if chan in self.loops:
self.loops[chan].cancel()
elif time.time() - self.last < self.settings['flood']:
if not self.slow:
if not self.playing:
await self.irc_error(chan, 'slow down nerd')
self.slow = True
elif len(args) >= 2 and not self.playing:
self.slow = False
if msg == '.ascii dirs':
for dir in self.db:
await self.sendmsg(chan, '[{0}] {1}{2}'.format(color(str(list(self.db).index(dir)+1).zfill(2), pink), dir.ljust(10), color('('+str(len(self.db[dir]))+')', grey)))
await asyncio.sleep(self.settings['msg'])
elif args[1] == 'img' and len(args) == 3:
url = args[2]
if url.startswith('https://') or url.startswith('http://'):
self.playing = True
width = 512 - len(line.split(' :')[0])+4
self.loops[chan] = asyncio.create_task(self.play(chan, url, img=width))
elif msg == '.ascii list':
await self.sendmsg(chan, underline + color('https://raw.githubusercontent.com/ircart/ircart/master/ircart/.list', light_blue))
elif args[1] == 'random' and len(args) in (2,3):
if len(args) == 3:
query = args[2]
else:
query = random.choice([item for item in self.db if item not in self.settings['ignore']])
if query in self.db:
ascii = f'{query}/{random.choice(self.db[query])}'
self.playing = True
self.loops[chan] = asyncio.create_task(self.play(chan, ascii))
else:
results = [{'name':ascii,'dir':dir} for dir in self.db for ascii in self.db[dir] if query in ascii]
if results:
ascii = random.choice(results)
ascii = f'{ascii["dir"]}/{ascii["name"]}'
self.playing = True
self.loops[chan] = asyncio.create_task(self.play(chan, ascii))
else:
await self.irc_error(chan, 'invalid directory name or search query', query)
elif msg == '.ascii sync' and is_admin(ident):
await self.sync()
await self.sendmsg(chan, bold + color('database synced', light_green))
elif args[1] == 'play' and len(args) == 3 and self.settings['paste']:
url = args[2]
if url.startswith('https://pastebin.com/raw/') and len(url.split('raw/')) > 1:
self.loops[chan] = asyncio.create_task(self.play(chan, url, paste=True))
else:
await self.irc_error(chan, 'invalid pastebin url', url)
elif args[1] == 'search' and len(args) == 3:
query = args[2]
results = [{'name':ascii,'dir':dir} for dir in self.db for ascii in self.db[dir] if query in ascii]
if results:
for item in results[:int(self.settings['results'])]:
if item['dir'] == 'root':
await self.sendmsg(chan, '[{0}] {1}'.format(color(str(results.index(item)+1).zfill(2), pink), item['name']))
else:
await self.sendmsg(chan, '[{0}] {1} {2}'.format(color(str(results.index(item)+1).zfill(2), pink), item['name'], color('('+item['dir']+')', grey)))
await asyncio.sleep(self.settings['msg'])
else:
await self.irc_error(chan, 'no results found', query)
elif args[1] == 'settings':
if len(args) == 2:
for item in self.settings:
await self.sendmsg(chan, color(item.ljust(13), yellow) + color(str(self.settings[item]), grey))
elif len(args) == 4 and is_admin(ident):
setting = args[2]
option = args[3]
if setting in self.settings:
if setting in ('flood','lines','msg','png_quantize','png_width','results'):
try:
option = float(option)
self.settings[setting] = option
await self.sendmsg(chan, color('OK', light_green))
except ValueError:
await self.irc_error(chan, 'invalid option', 'must be a float or int')
elif setting == 'paste':
if option == 'on':
self.settings[setting] = True
await self.sendmsg(chan, color('OK', light_green))
elif option == 'off':
self.settings[setting] = False
await self.sendmsg(chan, color('OK', light_green))
else:
await self.irc_error(chan, 'invalid option', 'must be on or off')
else:
await self.irc_error(chan, 'invalid setting', setting)
elif len(args) == 2:
query = args[1]
results = [dir+'/'+ascii for dir in self.db for ascii in self.db[dir] if query == ascii]
if results:
results = results[0].replace('root/','')
self.playing = True
self.loops[chan] = asyncio.create_task(self.play(chan, results))
else:
await self.irc_error(chan, 'no results found', query)
except (UnicodeDecodeError, UnicodeEncodeError):
pass
except Exception as ex:
error('fatal error occured', ex)
break
finally:
self.last = time.time()
# Main
print('#'*56)
print('#{:^54}#'.format(''))
print('#{:^54}#'.format('Scroll IRC Art Bot'))
print('#{:^54}#'.format('Developed by acidvegas in Python'))
print('#{:^54}#'.format('https://git.acid.vegas/scroll'))
print('#{:^54}#'.format(''))
print('#'*56)
try:
import chardet
except ImportError:
raise SystemExit('missing required \'chardet\' library (https://pypi.org/project/chardet/)')
try:
import img2irc
except ImportError:
raise SystemExit('missing required \'img2irc\' file (https://github.com/ircart/scroll/blob/master/img2irc.py)')
asyncio.run(Bot().connect())