mirror of
git://git.acid.vegas/archive.git
synced 2024-11-15 04:36:53 +00:00
363 lines
11 KiB
Python
363 lines
11 KiB
Python
|
#!/usr/bin/env python
|
||
|
# IExTrading IRC Bot - Developed by acidvegas in Python (https://acid.vegas/iex)
|
||
|
|
||
|
import http.client
|
||
|
import json
|
||
|
import random
|
||
|
import ssl
|
||
|
import socket
|
||
|
import time
|
||
|
|
||
|
# Connection
|
||
|
server = 'irc.supernets.org'
|
||
|
channel = '#dev'
|
||
|
|
||
|
# Identity
|
||
|
nickname = 'StockMarket'
|
||
|
username = 'iex'
|
||
|
realname = 'acid.vegas/iex'
|
||
|
|
||
|
# Login
|
||
|
nickserv_password = None
|
||
|
network_password = None
|
||
|
|
||
|
# Settings
|
||
|
throttle_cmd = 3
|
||
|
throttle_msg = 0.5
|
||
|
user_modes = None
|
||
|
|
||
|
# Formatting Control Characters / Color Codes
|
||
|
bold = '\x02'
|
||
|
italic = '\x1D'
|
||
|
underline = '\x1F'
|
||
|
reverse = '\x16'
|
||
|
reset = '\x0f'
|
||
|
white = '00'
|
||
|
black = '01'
|
||
|
blue = '02'
|
||
|
green = '03'
|
||
|
red = '04'
|
||
|
brown = '05'
|
||
|
purple = '06'
|
||
|
orange = '07'
|
||
|
yellow = '08'
|
||
|
light_green = '09'
|
||
|
cyan = '10'
|
||
|
light_cyan = '11'
|
||
|
light_blue = '12'
|
||
|
pink = '13'
|
||
|
grey = '14'
|
||
|
light_grey = '15'
|
||
|
|
||
|
def condense_value(value):
|
||
|
value = float(value)
|
||
|
if value < 0.01:
|
||
|
return '${0:,.8f}'.format(value)
|
||
|
elif value < 24.99:
|
||
|
return '${0:,.2f}'.format(value)
|
||
|
else:
|
||
|
return '${:,}'.format(int(value))
|
||
|
|
||
|
def debug(msg):
|
||
|
print(f'{get_time()} | [~] - {msg}')
|
||
|
|
||
|
def error(msg, reason=None):
|
||
|
if reason:
|
||
|
print(f'{get_time()} | [!] - {msg} ({reason})')
|
||
|
else:
|
||
|
print(f'{get_time()} | [!] - {msg}')
|
||
|
|
||
|
def error_exit(msg):
|
||
|
raise SystemExit(f'{get_time()} | [!] - {msg}')
|
||
|
|
||
|
def get_float(data):
|
||
|
try:
|
||
|
float(data)
|
||
|
return True
|
||
|
except ValueError:
|
||
|
return False
|
||
|
|
||
|
def get_time():
|
||
|
return time.strftime('%I:%M:%S')
|
||
|
|
||
|
def percent_color(percent):
|
||
|
percent = float(percent)
|
||
|
if percent == 0.0:
|
||
|
return grey
|
||
|
elif percent < 0.0:
|
||
|
if percent > -10.0:
|
||
|
return brown
|
||
|
else:
|
||
|
return red
|
||
|
else:
|
||
|
if percent < 10.0:
|
||
|
return green
|
||
|
else:
|
||
|
return light_green
|
||
|
|
||
|
def random_int(min, max):
|
||
|
return random.randint(min, max)
|
||
|
|
||
|
class IEX:
|
||
|
def api(api_data):
|
||
|
conn = http.client.HTTPSConnection('api.iextrading.com', timeout=15)
|
||
|
conn.request('GET', '/1.0/' + api_data)
|
||
|
response = conn.getresponse().read().decode('utf-8')
|
||
|
data = json.loads(response)
|
||
|
conn.close()
|
||
|
return data
|
||
|
|
||
|
def company(symbol):
|
||
|
return IEX.api(f'stock/{symbol}/company')
|
||
|
|
||
|
def lists(list_type):
|
||
|
return IEX.api('stock/market/list/' + list_type)
|
||
|
|
||
|
def news(symbol):
|
||
|
return IEX.api(f'stock/{symbol}/news')
|
||
|
|
||
|
def quote(symbols):
|
||
|
data = IEX.api(f'stock/market/batch?symbols={symbols}&types=quote')
|
||
|
if len(data) == 1:
|
||
|
return data[next(iter(data))]['quote']
|
||
|
else:
|
||
|
return [data[item]['quote'] for item in data]
|
||
|
|
||
|
def stats(symbol):
|
||
|
return IEX.api(f'stock/{symbol}/stats')
|
||
|
|
||
|
def symbols():
|
||
|
return IEX.api('ref-data/symbols')
|
||
|
|
||
|
class IRC(object):
|
||
|
def __init__(self):
|
||
|
self.last = 0
|
||
|
self.slow = False
|
||
|
self.sock = None
|
||
|
|
||
|
def stock_info(self, data):
|
||
|
sep = self.color('|', grey)
|
||
|
sep2 = self.color('/', grey)
|
||
|
name = '{0} ({1})'.format(self.color(data['companyName'], white), data['symbol'])
|
||
|
value = condense_value(data['latestPrice'])
|
||
|
percent = self.color('{:,.2f}%'.format(float(data['change'])), percent_color(data['change']))
|
||
|
volume = '{0} {1}'.format(self.color('Volume:', white), '${:,}'.format(data['avgTotalVolume']))
|
||
|
cap = '{0} {1}'.format(self.color('Market Cap:', white), '${:,}'.format(data['marketCap']))
|
||
|
return f'{name} {sep} {value} ({percent}) {sep} {volume} {sep} {cap}'
|
||
|
|
||
|
def stock_matrix(self, data): # very retarded way of calculating the longest strings per-column
|
||
|
results = {'symbol':list(),'value':list(),'percent':list(),'volume':list(),'cap':list()}
|
||
|
for item in data:
|
||
|
results['symbol'].append(item['symbol'])
|
||
|
results['value'].append(condense_value(item['latestPrice']))
|
||
|
results['percent'].append('{:,.2f}%'.format(float(item['change'])))
|
||
|
results['volume'].append('${:,}'.format(item['avgTotalVolume']))
|
||
|
results['cap'].append('${:,}'.format(item['marketCap']))
|
||
|
for item in results:
|
||
|
results[item] = len(max(results[item], key=len))
|
||
|
if results['symbol'] < len('Symbol'):
|
||
|
results['symbol'] = len('Symbol')
|
||
|
if results['value'] < len('Value'):
|
||
|
results['value'] = len('Value')
|
||
|
if results['percent'] < len('Change'):
|
||
|
results['percent'] = len('Change')
|
||
|
if results['volume'] < len('Volume'):
|
||
|
results['volume'] = len('Volume')
|
||
|
if results['cap'] < len('Market Cap'):
|
||
|
results['cap'] = len('Market Cap')
|
||
|
return results
|
||
|
|
||
|
def stock_table(self, data):
|
||
|
matrix = self.stock_matrix(data)
|
||
|
header = self.color(' {0} {1} {2} {3} {4}'.format('Symbol'.center(matrix['symbol']), 'Value'.center(matrix['value']), 'Percent'.center(matrix['percent']), 'Volume'.center(matrix['volume']), 'Market Cap'.center(matrix['cap'])), black, light_grey)
|
||
|
lines = [header,]
|
||
|
for item in data:
|
||
|
symbol = item['symbol'].ljust(matrix['symbol'])
|
||
|
value = condense_value(item['latestPrice']).rjust(matrix['value'])
|
||
|
percent = self.color('{:,.2f}%'.format(float(item['change'])).rjust(matrix['percent']), percent_color(item['change']))
|
||
|
volume = '${:,}'.format(item['avgTotalVolume']).rjust(matrix['volume'])
|
||
|
cap = '${:,}'.format(item['marketCap']).rjust(matrix['cap'])
|
||
|
lines.append(' {0} | {1} | {2} | {3} | {4} '.format(symbol,value,percent,volume,cap))
|
||
|
return lines
|
||
|
|
||
|
def color(self, msg, foreground, background=None):
|
||
|
if background:
|
||
|
return f'\x03{foreground},{background}{msg}{reset}'
|
||
|
else:
|
||
|
return f'\x03{foreground}{msg}{reset}'
|
||
|
|
||
|
def connect(self):
|
||
|
try:
|
||
|
self.sock = ssl.wrap_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
|
||
|
self.sock.connect((server, 6697))
|
||
|
self.raw(f'USER {username} 0 * :{realname}')
|
||
|
self.nick(nickname)
|
||
|
except socket.error as ex:
|
||
|
error('Failed to connect to IRC server.', ex)
|
||
|
self.event_disconnect()
|
||
|
else:
|
||
|
self.listen()
|
||
|
|
||
|
def error(self, chan, msg, reason=None):
|
||
|
if reason:
|
||
|
self.sendmsg(chan, '[{0}] {1} {2}'.format(self.color('!', red), msg, self.color('({0})'.format(reason), grey)))
|
||
|
else:
|
||
|
self.sendmsg(chan, '[{0}] {1}'.format(self.color('!', red), msg))
|
||
|
|
||
|
def event_connect(self):
|
||
|
if user_modes:
|
||
|
self.mode(nickname, '+' + user_modes)
|
||
|
if nickserv_password:
|
||
|
self.identify(nickname, nickserv_password)
|
||
|
self.join_channel(channel)
|
||
|
|
||
|
def event_disconnect(self):
|
||
|
self.sock.close()
|
||
|
time.sleep(10)
|
||
|
self.connect()
|
||
|
|
||
|
def event_kick(self, chan, kicked):
|
||
|
if chan == channel and kicked == nickname:
|
||
|
time.sleep(3)
|
||
|
self.join_channel(channel, key)
|
||
|
|
||
|
def event_message(self, nick, chan, msg):
|
||
|
#try:
|
||
|
if msg[:1] in '@!':
|
||
|
if time.time() - self.last < throttle_cmd:
|
||
|
if not self.slow:
|
||
|
self.error(chan, 'Slow down nerd!')
|
||
|
self.slow = True
|
||
|
else:
|
||
|
args = msg.split()
|
||
|
if msg == '@iex':
|
||
|
self.sendmsg(chan, bold + 'IExTrading IRC Bot - Developed by acidvegas in Python - https://acid.vegas/iex')
|
||
|
elif args[0] == '!stock':
|
||
|
if len(args) == 2:
|
||
|
symbols = args[1].upper()
|
||
|
if ',' in symbols:
|
||
|
symbols = ','.join(list(symbols.split(','))[:10])
|
||
|
data = IEX.quote(symbols)
|
||
|
if type(data) == dict:
|
||
|
self.sendmsg(chan, self.stock_info(data))
|
||
|
elif type(data) == list:
|
||
|
for line in self.stock_table(data):
|
||
|
self.sendmsg(chan, line)
|
||
|
time.sleep(throttle_msg)
|
||
|
else:
|
||
|
self.error(chan, 'Invalid stock names!')
|
||
|
else:
|
||
|
symbol = args[1]
|
||
|
data = IEX.quote(symbol)
|
||
|
if data:
|
||
|
self.sendmsg(chan, self.stock_info(data))
|
||
|
else:
|
||
|
self.error(chan, 'Invalid stock name!')
|
||
|
elif len(args) == 3:
|
||
|
if args[1] == 'company':
|
||
|
symbol = args[2]
|
||
|
data = IEX.company(symbol)
|
||
|
if data:
|
||
|
self.sendmsg(chan, '{0} {1} ({2}) {3} {4} {5} {6} {7} {8}'.format(self.color('Company:', white), data['companyName'], data['symbol'], self.color('|', grey), data['website'], self.color('|', grey), data['industry'], self.color('|', grey), data['CEO']))
|
||
|
self.sendmsg(chan, '{0} {1}'.format(self.color('Description:', white), data['description']))
|
||
|
else:
|
||
|
self.error('Invalid stock name!')
|
||
|
elif args[1] == 'search':
|
||
|
query = args[2].lower()
|
||
|
data = [{'symbol':item['symbol'],'name':item['name']} for item in IEX.symbols() if query in item['name'].lower()]
|
||
|
if data:
|
||
|
count = 1
|
||
|
max_length = len(max([item['name'] for item in data], key=len))
|
||
|
for item in data[:10]:
|
||
|
self.sendmsg(chan, '[{0}] {1} {2} {3}'.format(self.color(str(count), pink), item['name'].ljust(max_length), self.color('|', grey), item['symbol']))
|
||
|
count += 1
|
||
|
time.sleep(throttle_msg)
|
||
|
else:
|
||
|
self.error(chan, 'No results found.')
|
||
|
elif args[1] == 'list':
|
||
|
options = {'active':'mostactive','gainers':'gainers','losers':'losers','volume':'iexvolume','percent':'iexpercent'}
|
||
|
option = args[2]
|
||
|
try:
|
||
|
option = options[option]
|
||
|
except KeyError:
|
||
|
self.error(chan, 'Invalid option!', 'Valid options are active, gainers, losers, volume, & percent')
|
||
|
else:
|
||
|
data = IEX.lists(option)
|
||
|
for line in self.stock_table(data):
|
||
|
self.sendmsg(chan, line)
|
||
|
time.sleep(throttle_msg)
|
||
|
elif args[1] == 'news':
|
||
|
symbol = args[2]
|
||
|
data = IEX.news(symbol)
|
||
|
if data:
|
||
|
count = 1
|
||
|
for item in data:
|
||
|
self.sendmsg(chan, '[{0}] {1}'.format(self.color(str(count), pink), item['headline']))
|
||
|
self.sendmsg(chan, ' - ' + self.color(item['url'], grey))
|
||
|
count += 1
|
||
|
time.sleep(throttle_msg)
|
||
|
else:
|
||
|
self.error(chan, 'Invalid stock name!')
|
||
|
self.last = time.time()
|
||
|
#except Exception as ex:
|
||
|
# self.error(chan, 'Unknown error occured!', ex)
|
||
|
|
||
|
def event_nick_in_use(self):
|
||
|
self.nick('IEX_' + str(random_int(10,99)))
|
||
|
|
||
|
def handle_events(self, data):
|
||
|
args = data.split()
|
||
|
if data.startswith('ERROR :Closing Link:'):
|
||
|
raise Exception('Connection has closed.')
|
||
|
elif args[0] == 'PING':
|
||
|
self.raw('PONG ' + args[1][1:])
|
||
|
elif args[1] == '001':
|
||
|
self.event_connect()
|
||
|
elif args[1] == '433':
|
||
|
self.event_nick_in_use()
|
||
|
elif args[1] == 'KICK':
|
||
|
chan = args[2]
|
||
|
kicked = args[3]
|
||
|
self.event_kick(nick, chan, kicked)
|
||
|
elif args[1] == 'PRIVMSG':
|
||
|
nick = args[0].split('!')[0][1:]
|
||
|
chan = args[2]
|
||
|
msg = ' '.join(args[3:])[1:]
|
||
|
if chan == channel:
|
||
|
self.event_message(nick, chan, msg)
|
||
|
|
||
|
def identify(self, nick, passwd):
|
||
|
self.sendmsg('nickserv', f'identify {nick} {passwd}')
|
||
|
|
||
|
def join_channel(self, chan, key=None):
|
||
|
self.raw(f'JOIN {chan} {key}') if key else self.raw('JOIN ' + chan)
|
||
|
|
||
|
def listen(self):
|
||
|
while True:
|
||
|
try:
|
||
|
data = self.sock.recv(1024).decode('utf-8')
|
||
|
for line in (line for line in data.split('\r\n') if line):
|
||
|
debug(line)
|
||
|
if len(line.split()) >= 2:
|
||
|
self.handle_events(line)
|
||
|
except (UnicodeDecodeError,UnicodeEncodeError):
|
||
|
pass
|
||
|
#except Exception as ex:
|
||
|
# error('Unexpected error occured.', ex)
|
||
|
# break
|
||
|
self.event_disconnect()
|
||
|
|
||
|
def mode(self, target, mode):
|
||
|
self.raw(f'MODE {target} {mode}')
|
||
|
|
||
|
def nick(self, nick):
|
||
|
self.raw('NICK ' + nick)
|
||
|
|
||
|
def raw(self, msg):
|
||
|
self.sock.send(bytes(msg + '\r\n', 'utf-8'))
|
||
|
|
||
|
def sendmsg(self, target, msg):
|
||
|
self.raw(f'PRIVMSG {target} :{msg}')
|
||
|
|
||
|
IRC().connect()
|