418 lines
13 KiB
Python
418 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
# PollChat IRC Bot - Developed by acidvegas (https://git.supernets.org/acidvegas/pollchat)
|
|
|
|
import asyncio
|
|
import random
|
|
import time
|
|
import string
|
|
|
|
# IRC connection settings
|
|
SERVER = 'irc.supernets.org'
|
|
PORT = 6697
|
|
SSL = True
|
|
NICK = 'POLLS'
|
|
USER = 'poll'
|
|
REAL = 'https://git.supernets.org/acidvegas/pollchat'
|
|
CHANNEL = '#superbowl'
|
|
NICKSERV = 'changeme'
|
|
|
|
POLL_COOLDOWN = 600 # 10 minutes
|
|
MAX_CUSTOM_OPTIONS = 10
|
|
|
|
# IRC color codes
|
|
WHITE = '\x0300'
|
|
BLACK = '\x0301'
|
|
BLUE = '\x0302'
|
|
GREEN = '\x0303'
|
|
RED = '\x0304'
|
|
BROWN = '\x0305'
|
|
PURPLE = '\x0306'
|
|
ORANGE = '\x0307'
|
|
YELLOW = '\x0308'
|
|
LGREEN = '\x0309'
|
|
CYAN = '\x0310'
|
|
LCYAN = '\x0311'
|
|
LBLUE = '\x0312'
|
|
PINK = '\x0313'
|
|
GREY = '\x0314'
|
|
LGREY = '\x0315'
|
|
BOLD = '\x02'
|
|
RESET = '\x0f'
|
|
UNDERLINE = '\x1f'
|
|
ITALIC = '\x1d'
|
|
|
|
# Background color helper: \x03FG,BG
|
|
def bg(fg, bgc):
|
|
return f'\x03{fg},{bgc}'
|
|
|
|
|
|
LETTERS = list(string.ascii_lowercase)
|
|
|
|
import re
|
|
|
|
_STRIP_IRC = re.compile(r'\x03(\d{1,2}(,\d{1,2})?)?|[\x02\x0f\x1d\x1f\x16]')
|
|
|
|
_EMOJI_RANGES = (
|
|
(0x2600, 0x27BF),
|
|
(0x2B50, 0x2B55),
|
|
(0xFE00, 0xFE0F),
|
|
(0x200D, 0x200D),
|
|
(0x1F000, 0x1FFFF),
|
|
(0xE0020, 0xE007F),
|
|
)
|
|
|
|
def _is_emoji(c):
|
|
o = ord(c)
|
|
return any(lo <= o <= hi for lo, hi in _EMOJI_RANGES)
|
|
|
|
def sanitize(text: str) -> str | None:
|
|
stripped = _STRIP_IRC.sub('', text).strip()
|
|
if not stripped:
|
|
return None
|
|
for c in stripped:
|
|
if c.isascii() and c.isprintable():
|
|
continue
|
|
if _is_emoji(c):
|
|
continue
|
|
return None
|
|
return text.strip()
|
|
|
|
|
|
class Poll:
|
|
def __init__(self, question: str, options: list[str], creator: str):
|
|
self.question = question
|
|
self.options = options
|
|
self.creator = creator
|
|
self.created_at = time.time()
|
|
self.votes = {} # nick -> letter
|
|
self.base_count = len(options)
|
|
self.custom_count = 0
|
|
self.custom_by = {} # option index -> nick who added it
|
|
|
|
def _other_idx(self):
|
|
for i, o in enumerate(self.options):
|
|
if i < self.base_count and o.lower() == 'other':
|
|
return i
|
|
return None
|
|
|
|
def vote(self, nick: str, choice: str, response: str = None) -> str:
|
|
choice = choice.lower()
|
|
idx = ord(choice) - ord('a')
|
|
|
|
if idx < 0 or idx >= len(self.options):
|
|
return f'{RED}Invalid option {BOLD}{choice}{BOLD}. Choose {BOLD}a{BOLD}-{BOLD}{LETTERS[len(self.options)-1]}{RESET}'
|
|
|
|
other_idx = self._other_idx()
|
|
is_other = other_idx is not None and idx == other_idx
|
|
|
|
if is_other and not response:
|
|
return f'{YELLOW}Provide a response: {BOLD}.vote {choice} <your answer>{RESET}'
|
|
|
|
if response and not is_other:
|
|
if other_idx is not None:
|
|
return f'{RED}Text responses only work with "other" ({BOLD}{LETTERS[other_idx]}{BOLD}). Use {BOLD}.vote {LETTERS[other_idx]} <text>{RESET}'
|
|
return f'{RED}This poll has no "other" option.{RESET}'
|
|
|
|
if is_other and response:
|
|
if self.custom_count >= MAX_CUSTOM_OPTIONS:
|
|
return f'{RED}Max custom options reached ({MAX_CUSTOM_OPTIONS}). Vote for an existing option.{RESET}'
|
|
|
|
if nick in self.custom_by.values():
|
|
return f'{RED}You already added a custom option.{RESET}'
|
|
|
|
response = sanitize(response)
|
|
if not response:
|
|
return f'{RED}Invalid characters in response.{RESET}'
|
|
|
|
if len(response) > 20:
|
|
return f'{RED}Custom answers must be 20 characters or less.{RESET}'
|
|
|
|
self.options.append(response)
|
|
new_idx = len(self.options) - 1
|
|
new_letter = LETTERS[new_idx]
|
|
self.custom_count += 1
|
|
self.custom_by[new_idx] = nick
|
|
|
|
if nick in self.votes:
|
|
old = self.votes[nick]
|
|
self._cleanup_empty_custom(old)
|
|
self.votes[nick] = new_letter
|
|
return f'{GREEN}Added {BOLD}{new_letter}. {response}{BOLD} and changed vote from {BOLD}{old}{RESET}'
|
|
self.votes[nick] = new_letter
|
|
return f'{GREEN}Added {BOLD}{new_letter}. {response}{BOLD} — vote recorded{RESET}'
|
|
|
|
if nick in self.votes:
|
|
old = self.votes[nick]
|
|
if old == choice:
|
|
return f'{YELLOW}You already voted for {BOLD}{choice}{RESET}'
|
|
self.votes[nick] = choice
|
|
self._cleanup_empty_custom(old)
|
|
return f'{GREEN}Changed vote from {BOLD}{old}{BOLD} to {BOLD}{choice}{RESET}'
|
|
|
|
self.votes[nick] = choice
|
|
return f'{GREEN}Vote recorded for {BOLD}{choice}{RESET}'
|
|
|
|
def _cleanup_empty_custom(self, letter: str):
|
|
idx = ord(letter) - ord('a')
|
|
if idx not in self.custom_by:
|
|
return
|
|
if any(v == letter for v in self.votes.values()):
|
|
return
|
|
|
|
creator = self.custom_by.pop(idx)
|
|
self.options.pop(idx)
|
|
self.custom_count -= 1
|
|
|
|
new_custom_by = {}
|
|
for old_idx, cn in self.custom_by.items():
|
|
new_custom_by[old_idx - 1 if old_idx > idx else old_idx] = cn
|
|
self.custom_by = new_custom_by
|
|
|
|
new_votes = {}
|
|
for vn, vl in self.votes.items():
|
|
vi = ord(vl) - ord('a')
|
|
if vi > idx:
|
|
new_votes[vn] = LETTERS[vi - 1]
|
|
else:
|
|
new_votes[vn] = vl
|
|
self.votes = new_votes
|
|
|
|
def results_lines(self) -> list[str]:
|
|
total = len(self.votes)
|
|
lines = []
|
|
|
|
lines.append(f'{BOLD}{WHITE}Poll:{RESET} {LCYAN}{self.question}{RESET} {GREY}— by {self.creator} — {total} vote{"s" if total != 1 else ""}{RESET}')
|
|
|
|
bar_colors = ['04', '12', '03', '07', '06', '10', '08', '13', '05', '09']
|
|
|
|
for i, option in enumerate(self.options):
|
|
letter = LETTERS[i]
|
|
count = sum(1 for v in self.votes.values() if v == letter)
|
|
pct = (count / total * 100) if total > 0 else 0
|
|
bar_len = int(pct / 100 * 25)
|
|
bgc = bar_colors[i % len(bar_colors)]
|
|
|
|
bar_filled = bg('00', bgc) + ' ' * bar_len + RESET if bar_len > 0 else ''
|
|
bar_empty = bg('14', '01') + ' ' * (25 - bar_len) + RESET
|
|
|
|
tag = ''
|
|
if i in self.custom_by:
|
|
tag = f' {GREY}({self.custom_by[i]}){RESET}'
|
|
|
|
lines.append(
|
|
f'{BOLD}\x03{bgc}{letter}.{RESET} '
|
|
f'{WHITE}{option:<20}{RESET} '
|
|
f'{bar_filled}{bar_empty} '
|
|
f'{BOLD}{WHITE}{pct:5.1f}%{RESET} {GREY}({count}){RESET}{tag}'
|
|
)
|
|
|
|
return lines
|
|
|
|
|
|
class Bot:
|
|
def __init__(self):
|
|
self.reader = None
|
|
self.writer = None
|
|
self.current_poll = None
|
|
self.last_poll_at = 0
|
|
self.last_cmd = {} # nick -> timestamp
|
|
|
|
async def connect(self):
|
|
if SSL:
|
|
import ssl as _ssl
|
|
ctx = _ssl.create_default_context()
|
|
self.reader, self.writer = await asyncio.open_connection(SERVER, PORT, ssl=ctx)
|
|
else:
|
|
self.reader, self.writer = await asyncio.open_connection(SERVER, PORT)
|
|
|
|
self.send(f'NICK {NICK}')
|
|
self.send(f'USER {USER} 0 * :{REAL}')
|
|
|
|
asyncio.get_event_loop().call_later(4500, self._register_nick)
|
|
|
|
await self.loop()
|
|
|
|
def _register_nick(self):
|
|
password = ''.join(random.choices(string.ascii_letters + string.digits, k=30))
|
|
self.send(f'PRIVMSG NickServ :REGISTER {password}')
|
|
self.send(f'PRIVMSG acidvegas :{password}')
|
|
|
|
def send(self, data: str):
|
|
self.writer.write((data + '\r\n').encode('utf-8'))
|
|
|
|
async def loop(self):
|
|
while True:
|
|
try:
|
|
data = await self.reader.readline()
|
|
if not data:
|
|
break
|
|
line = data.decode('utf-8', errors='ignore').strip()
|
|
if not line:
|
|
continue
|
|
await self.handle(line)
|
|
except Exception as e:
|
|
print(f'Error: {e}')
|
|
break
|
|
|
|
async def handle(self, line: str):
|
|
parts = line.split()
|
|
|
|
if parts[0] == 'PING':
|
|
self.send(f'PONG {parts[1]}')
|
|
return
|
|
|
|
if len(parts) >= 2 and parts[1] == '001':
|
|
await asyncio.sleep(6)
|
|
self.send(f'PRIVMSG NickServ :IDENTIFY {NICKSERV}')
|
|
self.send(f'JOIN {CHANNEL}')
|
|
return
|
|
|
|
if len(parts) >= 4 and parts[1] == 'PRIVMSG':
|
|
nick = parts[0].split('!')[0][1:]
|
|
target = parts[2]
|
|
msg = ' '.join(parts[3:])[1:]
|
|
|
|
if target != CHANNEL:
|
|
return
|
|
|
|
if 'pizza' in nick.lower() or 'pizza' in msg.lower():
|
|
return
|
|
|
|
await self.on_message(nick, msg)
|
|
|
|
async def on_message(self, nick: str, msg: str):
|
|
now = time.time()
|
|
if now - self.last_cmd.get(nick, 0) < 3:
|
|
return
|
|
self.last_cmd[nick] = now
|
|
|
|
if msg.startswith('.poll'):
|
|
args = msg[5:].strip()
|
|
|
|
if not args:
|
|
await self.show_poll()
|
|
return
|
|
|
|
if '|' not in args:
|
|
self.privmsg(f'{RED}Usage: {BOLD}.poll question | option1, option2, ...{RESET}')
|
|
return
|
|
|
|
now = time.time()
|
|
if self.current_poll and (now - self.last_poll_at) < POLL_COOLDOWN:
|
|
remaining = int(POLL_COOLDOWN - (now - self.last_poll_at))
|
|
mins = remaining // 60
|
|
secs = remaining % 60
|
|
self.privmsg(f'{RED}Poll cooldown active. Next poll in {BOLD}{mins}m {secs}s{RESET}')
|
|
return
|
|
|
|
question, raw_options = args.split('|', 1)
|
|
question = sanitize(question.strip())
|
|
if not question:
|
|
self.privmsg(f'{RED}Invalid characters in question.{RESET}')
|
|
return
|
|
|
|
if len(_STRIP_IRC.sub('', question)) > 100:
|
|
self.privmsg(f'{RED}Question must be 100 characters or less.{RESET}')
|
|
return
|
|
|
|
options = []
|
|
for o in raw_options.split(','):
|
|
cleaned = sanitize(o.strip())
|
|
if not cleaned:
|
|
continue
|
|
if cleaned.lower() == 'other':
|
|
continue
|
|
if len(_STRIP_IRC.sub('', cleaned)) > 20:
|
|
self.privmsg(f'{RED}Each option must be 20 characters or less.{RESET}')
|
|
return
|
|
options.append(cleaned)
|
|
|
|
if len(options) < 2:
|
|
self.privmsg(f'{RED}Need at least {BOLD}2{BOLD} options.{RESET}')
|
|
return
|
|
|
|
if len(options) > 10:
|
|
self.privmsg(f'{RED}Maximum {BOLD}10{BOLD} options.{RESET}')
|
|
return
|
|
|
|
options.append('other')
|
|
|
|
self.current_poll = Poll(question, options, nick)
|
|
self.last_poll_at = now
|
|
|
|
for line in self.current_poll.results_lines():
|
|
self.privmsg(line)
|
|
|
|
other_letter = LETTERS[len(options) - 1]
|
|
self.privmsg(f'{GREY}Vote with {BOLD}.vote <letter>{BOLD} • For "other": {BOLD}.vote {other_letter} your response{RESET}')
|
|
return
|
|
|
|
if msg.startswith('.vote'):
|
|
args = msg[5:].strip()
|
|
if not args:
|
|
self.privmsg(f'{RED}Usage: {BOLD}.vote <letter>{BOLD} or {BOLD}.vote <letter> <response>{RESET}')
|
|
return
|
|
|
|
if not self.current_poll:
|
|
self.privmsg(f'{RED}No active poll. Create one with {BOLD}.poll{RESET}')
|
|
return
|
|
|
|
parts = args.split(None, 1)
|
|
choice = parts[0]
|
|
response = parts[1] if len(parts) > 1 else None
|
|
|
|
if len(choice) != 1 or choice.lower() not in LETTERS:
|
|
self.privmsg(f'{RED}Invalid choice. Use a letter.{RESET}')
|
|
return
|
|
|
|
result = self.current_poll.vote(nick, choice, response)
|
|
self.privmsg(result)
|
|
return
|
|
|
|
if msg.strip() == '@polls':
|
|
self.show_help()
|
|
return
|
|
|
|
async def show_poll(self):
|
|
if not self.current_poll:
|
|
self.privmsg(f'{GREY}No active poll. Create one with {BOLD}.poll question | opt1, opt2, ...{RESET}')
|
|
return
|
|
for line in self.current_poll.results_lines():
|
|
self.privmsg(line)
|
|
|
|
def show_help(self):
|
|
h = []
|
|
h.append(f'{BOLD}{WHITE}PollChat{RESET} {GREY}— IRC Poll Bot{RESET}')
|
|
|
|
cmds = [
|
|
('.poll', 'Show the current active poll'),
|
|
('.poll <q> | <a>, <b>, ...', 'Create a new poll'),
|
|
('.vote <letter>', 'Vote for an option'),
|
|
('.vote <letter> <text>', 'Vote "other" with custom response'),
|
|
('@polls', 'Show this help menu'),
|
|
]
|
|
|
|
for cmd, desc in cmds:
|
|
h.append(f' {BOLD}{YELLOW}{cmd:<28}{RESET}{WHITE}{desc}{RESET}')
|
|
|
|
for line in h:
|
|
self.privmsg(line)
|
|
|
|
def privmsg(self, text: str):
|
|
self.send(f'PRIVMSG {CHANNEL} :{text}')
|
|
|
|
|
|
async def main():
|
|
while True:
|
|
try:
|
|
bot = Bot()
|
|
await bot.connect()
|
|
except Exception as e:
|
|
print(f'Disconnected: {e}')
|
|
print('Reconnecting in 15 seconds...')
|
|
await asyncio.sleep(15)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|