mirror of
https://github.com/acidvegas/avoidr.git
synced 2024-11-27 10:27:00 +00:00
191 lines
6.4 KiB
Python
191 lines
6.4 KiB
Python
|
#/usr/bin/env python
|
||
|
# avoidr (masscan with exclusive exclusions) - developed by acidvegas in python (https://git.acid.vegas/avoidr)
|
||
|
|
||
|
import hashlib
|
||
|
import ipaddress
|
||
|
import json
|
||
|
import logging
|
||
|
import os
|
||
|
import sys
|
||
|
import time
|
||
|
import urllib.request
|
||
|
from zipfile import ZipFile
|
||
|
|
||
|
# Globals
|
||
|
grand_total = {'4': 0, '6': 0}
|
||
|
results = dict()
|
||
|
|
||
|
# Setup logger
|
||
|
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
|
||
|
|
||
|
|
||
|
def calculate_hash(path):
|
||
|
'''
|
||
|
Calculate the SHA1 hash of a file.
|
||
|
|
||
|
:param path: The path to the file to hash.
|
||
|
'''
|
||
|
hash_sha1 = hashlib.sha1()
|
||
|
with open(path, 'rb') as f:
|
||
|
for chunk in iter(lambda: f.read(4096), b''):
|
||
|
hash_sha1.update(chunk)
|
||
|
return hash_sha1.hexdigest()
|
||
|
|
||
|
|
||
|
def download_file(url: str, dest_filename: str, chunk_size: int = 1024*1024):
|
||
|
'''
|
||
|
Download a file from a given URL in chunks and save to a destination filename.
|
||
|
|
||
|
:param url: The URL of the file to download
|
||
|
:param dest_filename: The destination filename to save the downloaded file
|
||
|
:param chunk_size: Size of chunks to download. Default is set to 1MB.
|
||
|
'''
|
||
|
with urllib.request.urlopen(url) as response:
|
||
|
total_size = int(response.getheader('Content-Length').strip())
|
||
|
downloaded_size = 0
|
||
|
with open(dest_filename, 'wb') as out_file:
|
||
|
while True:
|
||
|
start_time = time.time()
|
||
|
chunk = response.read(chunk_size)
|
||
|
if not chunk:
|
||
|
break
|
||
|
downloaded_size += len(chunk)
|
||
|
out_file.write(chunk)
|
||
|
end_time = time.time()
|
||
|
speed = len(chunk) / (end_time - start_time)
|
||
|
progress = (downloaded_size / total_size) * 100
|
||
|
sys.stdout.write(f'\rDownloaded {downloaded_size} of {total_size} bytes ({progress:.2f}%) at {speed/1024:.2f} KB/s\r')
|
||
|
sys.stdout.flush()
|
||
|
print()
|
||
|
|
||
|
|
||
|
def get_url(url) -> str:
|
||
|
'''
|
||
|
Get the contents of a URL.
|
||
|
|
||
|
:param url: The URL to get the contents of.
|
||
|
'''
|
||
|
data = {'Accept': 'application/vnd.github.v3+json', 'User-Agent': 'Avoidr/1.0 (https://git.acid.vegas/avoidr)'}
|
||
|
req = urllib.request.Request(url, headers=data)
|
||
|
return urllib.request.urlopen(req, timeout=10).read().decode()
|
||
|
|
||
|
|
||
|
def update_database():
|
||
|
'''Update the ASN database.'''
|
||
|
|
||
|
logging.info('Checking for database updates...')
|
||
|
|
||
|
DB = 'databases/fullASN.json.zip'
|
||
|
update = False
|
||
|
os.makedirs('databases', exist_ok=True)
|
||
|
|
||
|
if not os.path.exists(DB):
|
||
|
update = True
|
||
|
else:
|
||
|
old_hash = calculate_hash(DB)
|
||
|
new_hash = json.loads(get_url('https://api.github.com/repos/ipapi-is/ipapi/contents/'+DB))['sha']
|
||
|
if old_hash != new_hash:
|
||
|
update = True
|
||
|
|
||
|
if update:
|
||
|
logging.info('Updating database...')
|
||
|
for OLD_DB in (DB, DB[:-4]):
|
||
|
if os.path.exists(OLD_DB):
|
||
|
os.remove(OLD_DB)
|
||
|
download_file('https://github.com/ipapi-is/ipapi/raw/main/'+DB, DB)
|
||
|
with ZipFile(DB) as zObject:
|
||
|
zObject.extract(DB[10:-4], 'databases')
|
||
|
else:
|
||
|
logging.info('Database is up-to-date!')
|
||
|
|
||
|
|
||
|
|
||
|
def process_asn(data: dict):
|
||
|
'''
|
||
|
Proccess an ASN and add it to the results.
|
||
|
|
||
|
:param data: The ASN data to process.
|
||
|
'''
|
||
|
|
||
|
title = data['descr'] if 'org' not in data else data['descr'] + ' / ' + data['org']
|
||
|
results[data['asn']] = {'name': title, 'ranges': dict()}
|
||
|
|
||
|
if 'prefixes' in data and not args.ipv6:
|
||
|
results[data['asn']]['ranges']['4'] = data['prefixes']
|
||
|
total = total_ips(data['prefixes'])
|
||
|
grand_total['4'] += total
|
||
|
logging.info('Found \033[93mAS{0}\033[0m \033[1;30m({1})\033[0m containing \033[32m{2:,}\033[0m IPv4 ranges with \033[36m{3:,}\033[0m total IP addresses'.format(data['asn'], title, len(data['prefixes']), total))
|
||
|
|
||
|
if 'prefixesIPv6' in data and not args.ipv4:
|
||
|
results[data['asn']]['ranges']['6'] = data['prefixesIPv6']
|
||
|
total = total_ips(data['prefixesIPv6'])
|
||
|
grand_total['6'] += total
|
||
|
logging.info('Found \033[93mAS{0}\033[0m \033[1;30m({1})\033[0m containing \033[32m{2:,}\033[0m IPv6 ranges with \033[36m{3:,}\033[0m total IP addresses'.format(data['asn'], title, len(data['prefixesIPv6']), total))
|
||
|
|
||
|
|
||
|
def total_ips(ranges: list) -> int:
|
||
|
'''
|
||
|
Calculate the total number of IP addresses in a list of CIDR ranges.
|
||
|
|
||
|
:param ranges: The list of CIDR ranges to calculate the total number of IP addresses for.
|
||
|
'''
|
||
|
return sum(ipaddress.ip_network(cidr).num_addresses for cidr in ranges)
|
||
|
|
||
|
|
||
|
|
||
|
# Main
|
||
|
if __name__ == '__main__':
|
||
|
import argparse
|
||
|
|
||
|
parser = argparse.ArgumentParser(description='masscan with exclusive exclusions')
|
||
|
parser.add_argument('-4', '--ipv4', help='process IPv4 addresses only', action='store_true')
|
||
|
parser.add_argument('-6', '--ipv6', help='process IPv6 addresses only', action='store_true')
|
||
|
parser.add_argument('-x', '--exclude', help='create exclusions for masscan instead of a json output', action='store_true')
|
||
|
parser.add_argument('-s', '--search', help='comma seperated strings to search (no output file)', type=str)
|
||
|
parser.add_argument('-u', '--update', help='update the ASN database', action='store_true')
|
||
|
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
if args.update or not os.path.exists('databases/fullASN.json'):
|
||
|
update_database()
|
||
|
|
||
|
asn_data = json.loads(open('databases/fullASN.json').read())
|
||
|
|
||
|
if args.search:
|
||
|
queries = args.search.split(',')
|
||
|
else:
|
||
|
queries = [line.rstrip() for line in open('custom.txt').readlines()]
|
||
|
|
||
|
logging.debug(f'Searching {len(queries):,} queries against {len(asn_data):,} ASNs...')
|
||
|
|
||
|
for asn in asn_data:
|
||
|
for field in [x for x in asn_data[asn] if x in ('descr','org')]:
|
||
|
if [x for x in queries if x.lower() in asn_data[asn][field].lower()]:
|
||
|
if asn_data[asn]['asn'] not in results:
|
||
|
process_asn(asn_data[asn])
|
||
|
break
|
||
|
|
||
|
if not args.search:
|
||
|
os.makedirs('output', exist_ok=True)
|
||
|
|
||
|
if args.exclude:
|
||
|
with open('output/exclude.conf', 'w') as fp:
|
||
|
for item in results:
|
||
|
fp.write(f'# AS{item} - {results[item]["name"]}\n')
|
||
|
for version in results[item]['ranges']:
|
||
|
for _range in results[item]['ranges'][version]:
|
||
|
fp.write(_range+'\n')
|
||
|
fp.write('\n')
|
||
|
else:
|
||
|
with open('output/out.json', 'w') as fp:
|
||
|
json.dump(results, fp)
|
||
|
else:
|
||
|
logging.info('Add these to your custom.txt file to create output files...')
|
||
|
|
||
|
total_v4 = ipaddress.ip_network('0.0.0.0/0').num_addresses
|
||
|
total_v6 = ipaddress.ip_network('::/0').num_addresses
|
||
|
print('Total IPv4 Addresses : {0:,}'.format(total_v4))
|
||
|
print('Total IPv4 After Clean : {0:,}'.format(total_v4-grand_total['4']))
|
||
|
print('Total IPv6 Addresses : {0:,}'.format(total_v6))
|
||
|
print('Total IPv6 After Clean : {0:,}'.format(total_v6-grand_total['6']))
|