tools/bots/hateserv/commands.py

143 lines
5.0 KiB
Python

import csv
import io
import json
import urllib.request
import sys
import time
def download_file(url: str, dest_filename: str, chunk_size: int = 1024*1024):
'''
Download a file from a given URL in chunks and save to a destination filename.
:param url: The URL of the file to download
:param dest_filename: The destination filename to save the downloaded file
:param chunk_size: Size of chunks to download. Default is set to 1MB.
'''
with urllib.request.urlopen(url) as response:
total_size = int(response.getheader('Content-Length').strip())
downloaded_size = 0
with open(dest_filename, 'wb') as out_file:
while True:
start_time = time.time()
chunk = response.read(chunk_size)
if not chunk:
break
downloaded_size += len(chunk)
out_file.write(chunk)
end_time = time.time()
speed = len(chunk) / (end_time - start_time)
progress = (downloaded_size / total_size) * 100
sys.stdout.write(f'\rDownloaded {downloaded_size} of {total_size} bytes ({progress:.2f}%) at {speed/1024:.2f} KB/s\r')
sys.stdout.flush()
print()
def get_url(url: str, sent_headers: dict = {}, reader: bool = True):
'''
Retrieve a URL with custom headers.
:param url: The URL to retrieve
:param data: The headers to send
:param reader: Return the reader object instead of the decoded data
'''
req = urllib.request.Request(url, headers=sent_headers)
if reader:
return urllib.request.urlopen(req, timeout=10).read().decode()
else:
return urllib.request.urlopen(req, timeout=10)
def setup_user_agent(user_agent: str = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'):
'''
Set up urllib.request user agent.
:param user_agent: The user agent to use
'''
handler = urllib.request.HTTPHandler()
opener = urllib.request.build_opener(handler)
opener.addheaders = [('User-agent', user_agent)]
urllib.request.install_opener(opener)
# -------------------------------------------------------------------------------- #
def asn_seach(query: str):
'''
Search for an ASN by string.
:param query: The string to search
'''
return json.loads(get_url('https://api.bgpview.io/search?query_term='+query))
def cve_search(query: str, limit: str = '25'):
'''
Search for a CVE by string.
:param query: The string to search
:param limit: The number of results to return
'''
return json.loads(get_url(f'https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={query}&resultsPerPage={limit}'))
def geoip(ip: str):
'''
Get the geolocation of an IP address.
:param ip: The IP address to geolocate
'''
return json.loads(get_url('https://api.ipapi.is/?q='+ip))
def github(option: str, query: str):
'''
Search for a GitHub repository or user.
:param option: The option to search for (search, repo, user)
:param query: The query to search
'''
header = {'Accept': 'application/vnd.github.v3+json'}
if option == 'search':
url = 'https://api.github.com/search/repositories?q=' + query
data = json.loads(get_url(url, header)) # Changed this line
return data['items'] if data['items'] else False
elif option == 'repo':
url = 'https://api.github.com/repos/' + query
return json.loads(get_url(url, header)) # And this one
elif option == 'user':
url = 'https://api.github.com/users/' + query
return json.loads(get_url(url, header)) # And this one
def librex(query: str):
'''
Search on the SuperNETs running LibreX.
:param query: The query to search
'''
return json.loads(get_url(f'https://librex.supernets.org/api.php?q={query}&t=0'))
def reddit(option, subreddit, id=None):
'''
Search for a Reddit post or subreddit.
:param option: The option to search for (post, subreddit)
:param subreddit: The subreddit to search
:param id: The post ID to search
'''
header = {'Accept':'application/json'}
if option == 'post':
data = json.loads(get_url('https://reddit.com', f'/r/{subreddit}/comments/{id}.json', header))
return data[0]['data']['children'][0]['data'] if 'error' not in data else False
elif option == 'subreddit':
data = json.loads(get_url('https://reddit.com', f'/r/{subreddit}.json?limit=20', header))
posts = [item['data'] for item in data['data']['children'] if not item['data']['stickied']]
return posts if posts else None
def exploitdb(query: str):
'''
Search for an exploit or shellcode on ExploitDB.
:param query: The query to search
'''
exploits = get_url('https://git.supernets.org/mirrors/exploitdb/raw/branch/main/files_exploits.csv')
shellcodes = get_url('https://git.supernets.org/mirrors/exploitdb/raw/branch/main/files_shellcodes.csv')
results = []
for database in (exploits, shellcodes):
reader = csv.DictReader(io.StringIO(database))
results += [row for row in reader if query.lower() in row['description'].lower()]
return results