Add pollchat.py
This commit is contained in:
371
pollchat.py
Normal file
371
pollchat.py
Normal file
@@ -0,0 +1,371 @@
|
||||
#!/usr/bin/env python3
|
||||
# PollChat IRC Bot - Developed by acidvegas (https://github.com/acidvegas/pollchat)
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
import string
|
||||
|
||||
# IRC connection settings
|
||||
SERVER = 'irc.supernets.org'
|
||||
PORT = 6697
|
||||
SSL = True
|
||||
NICK = 'POLLS'
|
||||
USER = 'poll'
|
||||
REAL = 'https://github.com/acidvegas/pollchat'
|
||||
CHANNEL = '#superbowl'
|
||||
|
||||
POLL_COOLDOWN = 600 # 10 minutes
|
||||
MAX_CUSTOM_OPTIONS = 10
|
||||
|
||||
# IRC color codes
|
||||
WHITE = '\x0300'
|
||||
BLACK = '\x0301'
|
||||
BLUE = '\x0302'
|
||||
GREEN = '\x0303'
|
||||
RED = '\x0304'
|
||||
BROWN = '\x0305'
|
||||
PURPLE = '\x0306'
|
||||
ORANGE = '\x0307'
|
||||
YELLOW = '\x0308'
|
||||
LGREEN = '\x0309'
|
||||
CYAN = '\x0310'
|
||||
LCYAN = '\x0311'
|
||||
LBLUE = '\x0312'
|
||||
PINK = '\x0313'
|
||||
GREY = '\x0314'
|
||||
LGREY = '\x0315'
|
||||
BOLD = '\x02'
|
||||
RESET = '\x0f'
|
||||
UNDERLINE = '\x1f'
|
||||
ITALIC = '\x1d'
|
||||
|
||||
# Background color helper: \x03FG,BG
|
||||
def bg(fg, bgc):
|
||||
return f'\x03{fg},{bgc}'
|
||||
|
||||
|
||||
LETTERS = list(string.ascii_lowercase)
|
||||
|
||||
import re
|
||||
import unicodedata
|
||||
|
||||
_ALLOWED = re.compile(
|
||||
r'^['
|
||||
r'a-zA-Z0-9'
|
||||
r' '
|
||||
r'!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>/?`~'
|
||||
r'\U0001F000-\U0001FFFF'
|
||||
r'\U00002600-\U000027BF'
|
||||
r'\U0000FE00-\U0000FE0F'
|
||||
r'\U0000200D'
|
||||
r']+$'
|
||||
)
|
||||
|
||||
def sanitize(text: str) -> str | None:
|
||||
cleaned = text.strip()
|
||||
if not cleaned:
|
||||
return None
|
||||
if not _ALLOWED.match(cleaned):
|
||||
return None
|
||||
return cleaned
|
||||
|
||||
|
||||
class Poll:
|
||||
def __init__(self, question: str, options: list[str], creator: str):
|
||||
self.question = question
|
||||
self.options = options
|
||||
self.creator = creator
|
||||
self.created_at = time.time()
|
||||
self.votes = {} # nick -> letter
|
||||
self.base_count = len(options)
|
||||
self.custom_count = 0
|
||||
self.custom_by = {} # option index -> nick who added it
|
||||
|
||||
def _other_idx(self):
|
||||
for i, o in enumerate(self.options):
|
||||
if i < self.base_count and o.lower() == 'other':
|
||||
return i
|
||||
return None
|
||||
|
||||
def vote(self, nick: str, choice: str, response: str = None) -> str:
|
||||
choice = choice.lower()
|
||||
idx = ord(choice) - ord('a')
|
||||
|
||||
if idx < 0 or idx >= len(self.options):
|
||||
return f'{RED}Invalid option {BOLD}{choice}{BOLD}. Choose {BOLD}a{BOLD}-{BOLD}{LETTERS[len(self.options)-1]}{RESET}'
|
||||
|
||||
other_idx = self._other_idx()
|
||||
is_other = other_idx is not None and idx == other_idx
|
||||
|
||||
if is_other and not response:
|
||||
return f'{YELLOW}Provide a response: {BOLD}.vote {choice} <your answer>{RESET}'
|
||||
|
||||
if response and not is_other:
|
||||
if other_idx is not None:
|
||||
return f'{RED}Text responses only work with "other" ({BOLD}{LETTERS[other_idx]}{BOLD}). Use {BOLD}.vote {LETTERS[other_idx]} <text>{RESET}'
|
||||
return f'{RED}This poll has no "other" option.{RESET}'
|
||||
|
||||
if is_other and response:
|
||||
if self.custom_count >= MAX_CUSTOM_OPTIONS:
|
||||
return f'{RED}Max custom options reached ({MAX_CUSTOM_OPTIONS}). Vote for an existing option.{RESET}'
|
||||
|
||||
response = sanitize(response)
|
||||
if not response:
|
||||
return f'{RED}Invalid characters in response.{RESET}'
|
||||
|
||||
if len(response) > 20:
|
||||
return f'{RED}Custom answers must be 20 characters or less.{RESET}'
|
||||
|
||||
self.options.append(response)
|
||||
new_idx = len(self.options) - 1
|
||||
new_letter = LETTERS[new_idx]
|
||||
self.custom_count += 1
|
||||
self.custom_by[new_idx] = nick
|
||||
|
||||
if nick in self.votes:
|
||||
old = self.votes[nick]
|
||||
self.votes[nick] = new_letter
|
||||
return f'{GREEN}Added {BOLD}{new_letter}. {response}{BOLD} and changed vote from {BOLD}{old}{RESET}'
|
||||
self.votes[nick] = new_letter
|
||||
return f'{GREEN}Added {BOLD}{new_letter}. {response}{BOLD} — vote recorded{RESET}'
|
||||
|
||||
if nick in self.votes:
|
||||
old = self.votes[nick]
|
||||
if old == choice:
|
||||
return f'{YELLOW}You already voted for {BOLD}{choice}{RESET}'
|
||||
self.votes[nick] = choice
|
||||
return f'{GREEN}Changed vote from {BOLD}{old}{BOLD} to {BOLD}{choice}{RESET}'
|
||||
|
||||
self.votes[nick] = choice
|
||||
return f'{GREEN}Vote recorded for {BOLD}{choice}{RESET}'
|
||||
|
||||
def results_lines(self) -> list[str]:
|
||||
total = len(self.votes)
|
||||
lines = []
|
||||
|
||||
lines.append(f'{BOLD}{WHITE}Poll:{RESET} {LCYAN}{self.question}{RESET} {GREY}— by {self.creator} — {total} vote{"s" if total != 1 else ""}{RESET}')
|
||||
|
||||
bar_colors = ['04', '12', '03', '07', '06', '10', '08', '13', '05', '09']
|
||||
|
||||
for i, option in enumerate(self.options):
|
||||
letter = LETTERS[i]
|
||||
count = sum(1 for v in self.votes.values() if v == letter)
|
||||
pct = (count / total * 100) if total > 0 else 0
|
||||
bar_len = int(pct / 100 * 25)
|
||||
bgc = bar_colors[i % len(bar_colors)]
|
||||
|
||||
bar_filled = bg('00', bgc) + ' ' * bar_len + RESET if bar_len > 0 else ''
|
||||
bar_empty = bg('14', '01') + ' ' * (25 - bar_len) + RESET
|
||||
|
||||
tag = ''
|
||||
if i in self.custom_by:
|
||||
tag = f' {GREY}({self.custom_by[i]}){RESET}'
|
||||
|
||||
lines.append(
|
||||
f'{BOLD}\x03{bgc}{letter}.{RESET} '
|
||||
f'{WHITE}{option:<20}{RESET} '
|
||||
f'{bar_filled}{bar_empty} '
|
||||
f'{BOLD}{WHITE}{pct:5.1f}%{RESET} {GREY}({count}){RESET}{tag}'
|
||||
)
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
class Bot:
|
||||
def __init__(self):
|
||||
self.reader = None
|
||||
self.writer = None
|
||||
self.current_poll = None
|
||||
self.last_poll_at = 0
|
||||
|
||||
async def connect(self):
|
||||
if SSL:
|
||||
import ssl as _ssl
|
||||
ctx = _ssl.create_default_context()
|
||||
self.reader, self.writer = await asyncio.open_connection(SERVER, PORT, ssl=ctx)
|
||||
else:
|
||||
self.reader, self.writer = await asyncio.open_connection(SERVER, PORT)
|
||||
|
||||
self.send(f'NICK {NICK}')
|
||||
self.send(f'USER {USER} 0 * :{REAL}')
|
||||
|
||||
asyncio.get_event_loop().call_later(4500, self._register_nick)
|
||||
|
||||
await self.loop()
|
||||
|
||||
def _register_nick(self):
|
||||
password = ''.join(random.choices(string.ascii_letters + string.digits, k=30))
|
||||
self.send(f'PRIVMSG NickServ :REGISTER {password}')
|
||||
self.send(f'PRIVMSG acidvegas :{password}')
|
||||
|
||||
def send(self, data: str):
|
||||
self.writer.write((data + '\r\n').encode('utf-8'))
|
||||
|
||||
async def loop(self):
|
||||
while True:
|
||||
try:
|
||||
data = await self.reader.readline()
|
||||
if not data:
|
||||
break
|
||||
line = data.decode('utf-8', errors='ignore').strip()
|
||||
if not line:
|
||||
continue
|
||||
await self.handle(line)
|
||||
except Exception as e:
|
||||
print(f'Error: {e}')
|
||||
break
|
||||
|
||||
async def handle(self, line: str):
|
||||
parts = line.split()
|
||||
|
||||
if parts[0] == 'PING':
|
||||
self.send(f'PONG {parts[1]}')
|
||||
return
|
||||
|
||||
if len(parts) >= 2 and parts[1] == '001':
|
||||
self.send(f'JOIN {CHANNEL}')
|
||||
return
|
||||
|
||||
if len(parts) >= 4 and parts[1] == 'PRIVMSG':
|
||||
nick = parts[0].split('!')[0][1:]
|
||||
target = parts[2]
|
||||
msg = ' '.join(parts[3:])[1:]
|
||||
|
||||
if target != CHANNEL:
|
||||
return
|
||||
|
||||
if 'pizza' in nick.lower() or 'pizza' in msg.lower():
|
||||
return
|
||||
|
||||
await self.on_message(nick, msg)
|
||||
|
||||
async def on_message(self, nick: str, msg: str):
|
||||
if msg.startswith('.poll'):
|
||||
args = msg[5:].strip()
|
||||
|
||||
if not args:
|
||||
await self.show_poll()
|
||||
return
|
||||
|
||||
if '|' not in args:
|
||||
self.privmsg(f'{RED}Usage: {BOLD}.poll question | option1, option2, ...{RESET}')
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
if self.current_poll and (now - self.last_poll_at) < POLL_COOLDOWN:
|
||||
remaining = int(POLL_COOLDOWN - (now - self.last_poll_at))
|
||||
mins = remaining // 60
|
||||
secs = remaining % 60
|
||||
self.privmsg(f'{RED}Poll cooldown active. Next poll in {BOLD}{mins}m {secs}s{RESET}')
|
||||
return
|
||||
|
||||
question, raw_options = args.split('|', 1)
|
||||
question = sanitize(question.strip())
|
||||
if not question:
|
||||
self.privmsg(f'{RED}Invalid characters in question.{RESET}')
|
||||
return
|
||||
|
||||
options = []
|
||||
for o in raw_options.split(','):
|
||||
cleaned = sanitize(o.strip())
|
||||
if cleaned:
|
||||
options.append(cleaned)
|
||||
|
||||
if len(options) < 2:
|
||||
self.privmsg(f'{RED}Need at least {BOLD}2{BOLD} options.{RESET}')
|
||||
return
|
||||
|
||||
if len(options) > 10:
|
||||
self.privmsg(f'{RED}Maximum {BOLD}10{BOLD} options.{RESET}')
|
||||
return
|
||||
|
||||
self.current_poll = Poll(question, options, nick)
|
||||
self.last_poll_at = now
|
||||
|
||||
for line in self.current_poll.results_lines():
|
||||
self.privmsg(line)
|
||||
|
||||
vote_hint = f'{GREY}Vote with {BOLD}.vote <letter>{BOLD}'
|
||||
has_other = any(o.lower() == 'other' for o in options)
|
||||
if has_other:
|
||||
vote_hint += f' • For "other": {BOLD}.vote {LETTERS[next(i for i, o in enumerate(options) if o.lower() == "other")]} your response{BOLD}'
|
||||
vote_hint += RESET
|
||||
self.privmsg(vote_hint)
|
||||
return
|
||||
|
||||
if msg.startswith('.vote'):
|
||||
args = msg[5:].strip()
|
||||
if not args:
|
||||
self.privmsg(f'{RED}Usage: {BOLD}.vote <letter>{BOLD} or {BOLD}.vote <letter> <response>{RESET}')
|
||||
return
|
||||
|
||||
if not self.current_poll:
|
||||
self.privmsg(f'{RED}No active poll. Create one with {BOLD}.poll{RESET}')
|
||||
return
|
||||
|
||||
parts = args.split(None, 1)
|
||||
choice = parts[0]
|
||||
response = parts[1] if len(parts) > 1 else None
|
||||
|
||||
if len(choice) != 1 or choice.lower() not in LETTERS:
|
||||
self.privmsg(f'{RED}Invalid choice. Use a letter.{RESET}')
|
||||
return
|
||||
|
||||
result = self.current_poll.vote(nick, choice, response)
|
||||
self.privmsg(result)
|
||||
return
|
||||
|
||||
if msg.startswith('.results'):
|
||||
await self.show_poll()
|
||||
return
|
||||
|
||||
if msg.strip() == '@polls':
|
||||
self.show_help()
|
||||
return
|
||||
|
||||
async def show_poll(self):
|
||||
if not self.current_poll:
|
||||
self.privmsg(f'{GREY}No active poll. Create one with {BOLD}.poll question | opt1, opt2, ...{RESET}')
|
||||
return
|
||||
for line in self.current_poll.results_lines():
|
||||
self.privmsg(line)
|
||||
|
||||
def show_help(self):
|
||||
h = []
|
||||
h.append(f'{BOLD}{WHITE}PollChat{RESET} {GREY}— IRC Poll Bot{RESET}')
|
||||
|
||||
cmds = [
|
||||
('.poll', 'Show the current active poll'),
|
||||
('.poll <q> | <a>, <b>, ...', 'Create a new poll'),
|
||||
('.vote <letter>', 'Vote for an option'),
|
||||
('.vote <letter> <text>', 'Vote "other" with custom response'),
|
||||
('.results', 'Show current poll results'),
|
||||
('@polls', 'Show this help menu'),
|
||||
]
|
||||
|
||||
for cmd, desc in cmds:
|
||||
h.append(f' {BOLD}{YELLOW}{cmd:<28}{RESET}{WHITE}{desc}{RESET}')
|
||||
|
||||
h.append(f' {GREY}Cooldown: 10min between polls • Max 10 custom "other" options{RESET}')
|
||||
|
||||
for line in h:
|
||||
self.privmsg(line)
|
||||
|
||||
def privmsg(self, text: str):
|
||||
self.send(f'PRIVMSG {CHANNEL} :{text}')
|
||||
|
||||
|
||||
async def main():
|
||||
while True:
|
||||
try:
|
||||
bot = Bot()
|
||||
await bot.connect()
|
||||
except Exception as e:
|
||||
print(f'Disconnected: {e}')
|
||||
print('Reconnecting in 15 seconds...')
|
||||
await asyncio.sleep(15)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user