mirror of
git://git.acid.vegas/archive.git
synced 2024-11-22 08:06:40 +00:00
191 lines
7.0 KiB
Python
191 lines
7.0 KiB
Python
#!/usr/bin/env python
|
|
# phalanx localhost bot - developed by acidvegas in python (https://git.acid.vegas/archive)
|
|
|
|
import asyncio
|
|
import random
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
class settings:
|
|
admin = 'acidvegas!~stillfree@most.dangerous.motherfuck' # Can use wildcards (Must be in nick!user@host format)
|
|
access = [admin,]
|
|
oper_user = 'phalanx'
|
|
oper_pass = 'simps0nsfAn420'
|
|
|
|
# Formatting Control Characters / Color Codes
|
|
bold = '\x02'
|
|
reset = '\x0f'
|
|
green = '03'
|
|
red = '04'
|
|
purple = '06'
|
|
orange = '07'
|
|
yellow = '08'
|
|
light_green = '09'
|
|
cyan = '10'
|
|
light_cyan = '11'
|
|
light_blue = '12'
|
|
pink = '13'
|
|
grey = '14'
|
|
|
|
def color(msg, foreground, background=None):
|
|
return f'\x03{foreground},{background}{msg}{reset}' if background else f'\x03{foreground}{msg}{reset}'
|
|
|
|
def is_access(ident):
|
|
result = False
|
|
for item in settings.access:
|
|
if re.compile(item.replace('*','.*')).search(ident):
|
|
result = True
|
|
return result
|
|
|
|
def is_admin(ident):
|
|
return re.compile(settings.admin.replace('*','.*')).search(ident)
|
|
|
|
def rndip():
|
|
host = ''
|
|
for i in range(3):
|
|
host = host + ''.join([random.choice('0123456789ABCDEF') for n in range(8)]) + '.'
|
|
return host + 'IP'
|
|
|
|
def rndnick():
|
|
prefix = random.choice(['st','sn','cr','pl','pr','fr','fl','qu','br','gr','sh','sk','tr','kl','wr','bl']+list('bcdfgklmnprstvwz'))
|
|
midfix = random.choice(('aeiou'))+random.choice(('aeiou'))+random.choice(('bcdfgklmnprstvwz'))
|
|
suffix = random.choice(['ed','est','er','le','ly','y','ies','iest','ian','ion','est','ing','led','inger']+list('abcdfgklmnprstvwz'))
|
|
return prefix+midfix+suffix
|
|
|
|
class clone:
|
|
def __init__(self, identity={'chan':'#phalanx','nick':'phalanx','user':'W','host':'R','real':'\x0304ROMAN WAR PHALANX\x03'}):
|
|
self.identity = identity
|
|
self.clones = dict()
|
|
self.destroy = False
|
|
self.reader = None
|
|
self.write = None
|
|
|
|
async def sendmsg(self, target, msg):
|
|
await self.raw(f'PRIVMSG {target} :{msg}')
|
|
|
|
async def error(self, chan, data, reason=None):
|
|
await self.sendmsg(chan, '[{0}] {1}'.format(color('error', red), data, color(f'({reason})', grey))) if reason else await self.sendmsg(chan, '[{0}] {1}'.format(color('error', red), data))
|
|
|
|
async def run(self):
|
|
await self.connect()
|
|
|
|
async def raw(self, data):
|
|
self.writer.write(data[:510].encode('utf-8') + b'\r\n')
|
|
await self.writer.drain()
|
|
|
|
async def connect(self):
|
|
while not self.destroy:
|
|
try:
|
|
options = {'host':'127.0.0.1', 'port':6667, 'limit':1024, 'ssl':None, 'family':2}
|
|
self.reader, self.writer = await asyncio.wait_for(asyncio.open_connection(**options), 5)
|
|
del options
|
|
await self.raw('USER {0} 0 * :{1}'.format(self.identity['user'], self.identity['real']))
|
|
await self.raw('NICK ' + self.identity['nick'])
|
|
await self.listen()
|
|
except Exception as ex:
|
|
if self.identity['nick'] == 'phalanx':
|
|
for item in self.clones:
|
|
if self.clones[item]:
|
|
self.clones[item].cancel()
|
|
self.clones = dict()
|
|
print('[!] - error occured (' + str(ex) + ')')
|
|
await asyncio.sleep(random.randint(60,300))
|
|
|
|
async def listen(self):
|
|
while True:
|
|
try:
|
|
data = await asyncio.wait_for(self.reader.readuntil(b'\r\n'), 600)
|
|
line = data.decode('utf-8').strip()
|
|
if self.identity['nick'] == 'phalanx':
|
|
print('[~] - ' + line)
|
|
args = line.split()
|
|
if args[0] == 'PING':
|
|
await self.raw('PONG ' + args[1][1:])
|
|
elif args[1] == '001':
|
|
await self.raw('MODE ' + self.identity['nick'] + ' +dD')
|
|
await self.raw(f'OPER {settings.oper_user} {settings.oper_pass}')
|
|
await asyncio.sleep(1)
|
|
await self.raw('SETHOST ' + self.identity['host'])
|
|
await self.raw('JOIN #phalanx')
|
|
if self.identity['chan'] != '#phalanx':
|
|
await self.raw('JOIN ' + self.identity['chan'])
|
|
elif args[1] == '433':
|
|
self.identity['nick'] = rndnick()
|
|
await self.raw('NICK ' + self.identity['nick'])
|
|
elif args[1] == 'KICK' and len(args) >= 4:
|
|
chan = args[2]
|
|
nick = args[3]
|
|
if nick == self.identity['nick']:
|
|
await asyncio.sleep(3)
|
|
await self.raw('JOIN ' + chan)
|
|
elif args[1] == 'NICK' and len(args) == 3:
|
|
nick = args[0][1:]
|
|
new = ' '.join(args[2:])[1:]
|
|
if nick == self.identity['nick']:
|
|
self.identity['nick'] = new
|
|
elif args[1] == 'PRIVMSG' and len(args) >= 4:
|
|
ident = args[0][1:]
|
|
nick = args[0].split('!')[0][1:]
|
|
target = args[2]
|
|
msg = ' '.join(args[3:])[1:]
|
|
args = msg.split()
|
|
if is_admin(ident):
|
|
if args[0] == '.access' and self.identity['nick'] == 'phalanx' and len(args) == 2:
|
|
action = args[1][:1]
|
|
host = args[1][1:]
|
|
if action == '+':
|
|
if host not in settings.access:
|
|
settings.access.append(host)
|
|
await self.sendmsg(target, color('added', green))
|
|
elif action == '-':
|
|
if host in settings.access:
|
|
settings.access.remove(host)
|
|
await self.sendmsg(target, color('removed', red))
|
|
elif action == '?':
|
|
for item in settings.access:
|
|
await self.sendmsg(target, color(item, yellow))
|
|
else:
|
|
await self.error(chan, 'invalid command', 'usage: .access <+/-><host>')
|
|
if is_access(ident):
|
|
if args[0] == '.create' and self.identity['nick'] == 'phalanx' and len(args) >= 2:
|
|
if len(self.clones) < 500:
|
|
chan = args[1]
|
|
if len(args) >= 6:
|
|
nick = rndnick() if args[2] == '%r' else args[2]
|
|
user = rndnick() if args[3] == '%r' else args[3]
|
|
host = rndip() if args[4] == '%r' else args[4]
|
|
real = rndnick() if args[5] == '%r' else ' '.join(args[5:])
|
|
elif len(args) == 2:
|
|
ident = random.choice(idents).split()
|
|
nick = ident[0]
|
|
user = ident[1]
|
|
host = ident[2]
|
|
real = ' '.join(ident[3:])
|
|
if nick != 'phalanx':
|
|
options = {'chan':chan,'nick':nick,'user':user,'host':host,'real':real}
|
|
self.clones[random.randint(1000,9999)] = asyncio.create_task(clone(identity=options).run())
|
|
else:
|
|
self.error(target, 'max clones', 'for now...')
|
|
elif args[0] == '.destroy' and self.identity['nick'] != 'phalanx' and len(args) == 2:
|
|
target = args[1]
|
|
if target == self.identity['nick'] or (target == '*' and is_admin(ident)):
|
|
self.destroy = True
|
|
await self.raw('QUIT :' + random.choice(('*.net *.split', f'Ping timeout: {random.randint(180,300)} seconds', 'Connection closed', 'G-line: User has been permanently banned from this network.')))
|
|
elif args[0] == '.raw' and len(args) >= 3:
|
|
target = args[1]
|
|
if target == 'phalanx' and not is_admin(ident):
|
|
self.error(chan 'yeah right...')
|
|
data = ' '.join(args[2:])
|
|
if '#5000' not in data:
|
|
if target == self.identity['nick'] or (target == '*' and is_admin(ident)):
|
|
if args[1] == '*':
|
|
await asyncio.sleep(float('0.0'+str(random.randint(1111,9999))))
|
|
await self.raw(data)
|
|
except (UnicodeDecodeError, UnicodeEncodeError):
|
|
pass
|
|
|
|
# Main
|
|
idents = open('idents.txt').readlines()
|
|
asyncio.run(clone().run())
|