avoidr/avoidr.py

191 lines
6.4 KiB
Python

#/usr/bin/env python
# avoidr (masscan with exclusive exclusions) - developed by acidvegas in python (https://git.acid.vegas/avoidr)
import hashlib
import ipaddress
import json
import logging
import os
import sys
import time
import urllib.request
from zipfile import ZipFile
# Globals
grand_total = {'4': 0, '6': 0}
results = dict()
# Setup logger
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
def calculate_hash(path):
'''
Calculate the SHA1 hash of a file.
:param path: The path to the file to hash.
'''
hash_sha1 = hashlib.sha1()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_sha1.update(chunk)
return hash_sha1.hexdigest()
def download_file(url: str, dest_filename: str, chunk_size: int = 1024*1024):
'''
Download a file from a given URL in chunks and save to a destination filename.
:param url: The URL of the file to download
:param dest_filename: The destination filename to save the downloaded file
:param chunk_size: Size of chunks to download. Default is set to 1MB.
'''
with urllib.request.urlopen(url) as response:
total_size = int(response.getheader('Content-Length').strip())
downloaded_size = 0
with open(dest_filename, 'wb') as out_file:
while True:
start_time = time.time()
chunk = response.read(chunk_size)
if not chunk:
break
downloaded_size += len(chunk)
out_file.write(chunk)
end_time = time.time()
speed = len(chunk) / (end_time - start_time)
progress = (downloaded_size / total_size) * 100
sys.stdout.write(f'\rDownloaded {downloaded_size} of {total_size} bytes ({progress:.2f}%) at {speed/1024:.2f} KB/s\r')
sys.stdout.flush()
print()
def get_url(url) -> str:
'''
Get the contents of a URL.
:param url: The URL to get the contents of.
'''
data = {'Accept': 'application/vnd.github.v3+json', 'User-Agent': 'Avoidr/1.0 (https://git.acid.vegas/avoidr)'}
req = urllib.request.Request(url, headers=data)
return urllib.request.urlopen(req, timeout=10).read().decode()
def update_database():
'''Update the ASN database.'''
logging.info('Checking for database updates...')
DB = 'databases/fullASN.json.zip'
update = False
os.makedirs('databases', exist_ok=True)
if not os.path.exists(DB):
update = True
else:
old_hash = calculate_hash(DB)
new_hash = json.loads(get_url('https://api.github.com/repos/ipapi-is/ipapi/contents/'+DB))['sha']
if old_hash != new_hash:
update = True
if update:
logging.info('Updating database...')
for OLD_DB in (DB, DB[:-4]):
if os.path.exists(OLD_DB):
os.remove(OLD_DB)
download_file('https://github.com/ipapi-is/ipapi/raw/main/'+DB, DB)
with ZipFile(DB) as zObject:
zObject.extract(DB[10:-4], 'databases')
else:
logging.info('Database is up-to-date!')
def process_asn(data: dict):
'''
Proccess an ASN and add it to the results.
:param data: The ASN data to process.
'''
title = data['descr'] if 'org' not in data else data['descr'] + ' / ' + data['org']
results[data['asn']] = {'name': title, 'ranges': dict()}
if 'prefixes' in data and not args.ipv6:
results[data['asn']]['ranges']['4'] = data['prefixes']
total = total_ips(data['prefixes'])
grand_total['4'] += total
logging.info('Found \033[93mAS{0}\033[0m \033[1;30m({1})\033[0m containing \033[32m{2:,}\033[0m IPv4 ranges with \033[36m{3:,}\033[0m total IP addresses'.format(data['asn'], title, len(data['prefixes']), total))
if 'prefixesIPv6' in data and not args.ipv4:
results[data['asn']]['ranges']['6'] = data['prefixesIPv6']
total = total_ips(data['prefixesIPv6'])
grand_total['6'] += total
logging.info('Found \033[93mAS{0}\033[0m \033[1;30m({1})\033[0m containing \033[32m{2:,}\033[0m IPv6 ranges with \033[36m{3:,}\033[0m total IP addresses'.format(data['asn'], title, len(data['prefixesIPv6']), total))
def total_ips(ranges: list) -> int:
'''
Calculate the total number of IP addresses in a list of CIDR ranges.
:param ranges: The list of CIDR ranges to calculate the total number of IP addresses for.
'''
return sum(ipaddress.ip_network(cidr).num_addresses for cidr in ranges)
# Main
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='masscan with exclusive exclusions')
parser.add_argument('-4', '--ipv4', help='process IPv4 addresses only', action='store_true')
parser.add_argument('-6', '--ipv6', help='process IPv6 addresses only', action='store_true')
parser.add_argument('-x', '--exclude', help='create exclusions for masscan instead of a json output', action='store_true')
parser.add_argument('-s', '--search', help='comma seperated strings to search (no output file)', type=str)
parser.add_argument('-u', '--update', help='update the ASN database', action='store_true')
args = parser.parse_args()
if args.update or not os.path.exists('databases/fullASN.json'):
update_database()
asn_data = json.loads(open('databases/fullASN.json').read())
if args.search:
queries = args.search.split(',')
else:
queries = [line.rstrip() for line in open('custom.txt').readlines()]
logging.debug(f'Searching {len(queries):,} queries against {len(asn_data):,} ASNs...')
for asn in asn_data:
for field in [x for x in asn_data[asn] if x in ('descr','org')]:
if [x for x in queries if x.lower() in asn_data[asn][field].lower()]:
if asn_data[asn]['asn'] not in results:
process_asn(asn_data[asn])
break
if not args.search:
os.makedirs('output', exist_ok=True)
if args.exclude:
with open('output/exclude.conf', 'w') as fp:
for item in results:
fp.write(f'# AS{item} - {results[item]["name"]}\n')
for version in results[item]['ranges']:
for _range in results[item]['ranges'][version]:
fp.write(_range+'\n')
fp.write('\n')
else:
with open('output/out.json', 'w') as fp:
json.dump(results, fp)
else:
logging.info('Add these to your custom.txt file to create output files...')
total_v4 = ipaddress.ip_network('0.0.0.0/0').num_addresses
total_v6 = ipaddress.ip_network('::/0').num_addresses
print('Total IPv4 Addresses : {0:,}'.format(total_v4))
print('Total IPv4 After Clean : {0:,}'.format(total_v4-grand_total['4']))
print('Total IPv6 Addresses : {0:,}'.format(total_v6))
print('Total IPv6 After Clean : {0:,}'.format(total_v6-grand_total['6']))