tools/discordrelay.py

95 lines
3.3 KiB
Python

#!/usr/bin/env python
# Discord IRC Relay - Developed by acidvegas in Python (https://git.acid.vegas/tools)
import asyncio
import ssl
class IRC:
def __init__(self):
self.options = {'host':'irc.supernets.org','port':6697,'limit':1024,'ssl':self.ssl_ctx(),'family':2,'local_addr':None}
self.reader, self.writer = (None, None)
def ssl_ctx(self):
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
#ctx.load_cert_chain(config.cert.file, password=config.cert.password)
return ctx
def raw(self, data):
self.writer.write(data[:510].encode('utf-8') + b'\r\n')
async def connect(self):
try:
self.reader, self.writer = await asyncio.open_connection(**self.options)
self.raw(f'USER relay 0 * :Discord Relay Bot')
self.raw('NICK DISCORD')
except Exception as ex:
print(f'[!] - Failed to connect to IRC server! ({ex!s})')
else:
while not self.reader.at_eof():
try:
line = await self.reader.readline()
line = line.decode('utf-8').strip()
print('[IRC] ' + line)
args = line.split()
if args[0] == 'PING':
self.raw('PONG ' + args[1][1:])
elif args[1] == '001': #RPL_WELCOME
self.raw('MODE DISCORD +BD')
self.raw('PRIVMSG NickServ :IDENTIFY DISCORD REDACTED')
self.raw('JOIN #superbowl')
elif args[1] == 'PRIVMSG':
nick = args[0].split('!')[0][1:]
chan = args[2]
msg = ' '.join(args[3:])[1:]
if chan == '#superbowl' and nick not in ('DISCORD','EliManning','CANCER','scroll','DickServ','AMBERALERT'):
if '\x02' not in msg and '\x03' not in msg and '\x1D' not in msg and '\x1F' not in msg and '\x16' not in msg:
DiscordBot.queue.append(f'{nick}: {msg}')
except (UnicodeDecodeError, UnicodeEncodeError):
pass
except Exception as ex:
print(f'[!] - Unknown error has occured! ({ex!s})')
Bot_IRC = IRC()
# ---------------------------------------------------------------------------------------------------- #
try:
import discord
except ImportError:
raise SystemExit('missing discord modules (https://pypi.org/project/discord.py/)')
class Discord(discord.Client):
server_id = 'CHANGEME'
channel_id = 'CHANGEME'
admin_id = 'CHANGEME'
queue = list()
async def queue_loop(self):
while True: # 5 every 5 minutes?
if self.queue:
message = '\n'.join(self.queue)[:2000]
await self.channel_id.send(message)
self.queue = list()
await asyncio.sleep(1)
async def on_message(self, message):
if message.author == self.user or message.channel != self.channel_id:
return
content = message.clean_content
if len(message.attachments) > 0:
content += ' ' + message.attachments[0].url
print(f'[Discord] {message.author.name}: {content}')
Bot_IRC.raw(f'PRIVMSG #superbowl :{message.author.name}: {content}')
async def on_ready(self):
print(f'[Discord] Client: {self.user.name} ({self.user.id!s})')
self.server_id = [x for x in self.guilds if str(x.id) == self.server_id][0]
self.channel_id = [x for x in self.server_id.channels if str(x.id) == self.channel_id and x.type == discord.ChannelType.text][0]
print(f'[Discord] Server: {self.server_id} ({self.channel_id!s})')
asyncio.create_task(self.queue_loop())
asyncio.create_task(Bot_IRC.connect())
DiscordBot = Discord()
DiscordBot.run('CHANGEME')