colors fixed, script improved

This commit is contained in:
Dionysus 2023-11-23 04:46:49 -05:00
parent 581efb8f6d
commit 6e769ac39e
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE

View File

@ -13,7 +13,6 @@ try:
except ImportError: except ImportError:
raise ImportError('missing required \'aiodns\' library (pip install aiodns)') raise ImportError('missing required \'aiodns\' library (pip install aiodns)')
# Colors # Colors
class colors: class colors:
ip = '\033[35m' ip = '\033[35m'
@ -24,6 +23,7 @@ class colors:
reset = '\033[0m' reset = '\033[0m'
separator = '\033[90m' separator = '\033[90m'
dns_resolvers = []
def get_dns_servers() -> list: def get_dns_servers() -> list:
'''Get a list of DNS servers to use for lookups.''' '''Get a list of DNS servers to use for lookups.'''
@ -31,18 +31,15 @@ def get_dns_servers() -> list:
results = source.read().decode().split('\n') results = source.read().decode().split('\n')
return [server for server in results if ':' not in server] return [server for server in results if ':' not in server]
async def rdns(semaphore: asyncio.Semaphore, ip_address: str, resolver: aiodns.DNSResolver):
async def rdns(semaphore: asyncio.Semaphore, ip_address: str, custom_dns_server: str):
''' '''
Perform a reverse DNS lookup on an IP address. Perform a reverse DNS lookup on an IP address.
:param ip_address: The IP address to lookup. :param ip_address: The IP address to lookup.
:param custom_dns_server: The DNS server to use for lookups. :param resolver: The DNS resolver to use for lookups.
:param semaphore: A semaphore to limit the number of concurrent lookups. :param semaphore: A semaphore to limit the number of concurrent lookups.
:param timeout: The timeout for the lookup.
''' '''
async with semaphore: async with semaphore:
resolver = aiodns.DNSResolver(nameservers=[custom_dns_server], rotate=False, timeout=args.timeout)
reverse_name = ipaddress.ip_address(ip_address).reverse_pointer reverse_name = ipaddress.ip_address(ip_address).reverse_pointer
try: try:
answer = await resolver.query(reverse_name, 'PTR') answer = await resolver.query(reverse_name, 'PTR')
@ -50,7 +47,6 @@ async def rdns(semaphore: asyncio.Semaphore, ip_address: str, custom_dns_server:
except: except:
return ip_address, None return ip_address, None
def rig(seed: int) -> str: def rig(seed: int) -> str:
''' '''
Random IP generator. Random IP generator.
@ -64,33 +60,23 @@ def rig(seed: int) -> str:
ip = ipaddress.ip_address(shuffled_index) ip = ipaddress.ip_address(shuffled_index)
yield str(ip) yield str(ip)
async def main(): async def main():
'''Generate random IPs and perform reverse DNS lookups.''' '''Generate random IPs and perform reverse DNS lookups.'''
semaphore = asyncio.Semaphore(args.concurrency) semaphore = asyncio.Semaphore(args.concurrency)
tasks = [] tasks = []
results_cache = [] results_cache = []
if args.resolvers: global dns_resolvers
with open(args.resolvers) as file: if not dns_resolvers:
dns_servers = [server.strip() for server in file.readlines()] dns_resolvers = [aiodns.DNSResolver(nameservers=[server], rotate=False, timeout=args.timeout) for server in get_dns_servers()[:args.concurrency]]
while True:
if not args.resolvers:
dns_servers = []
while not dns_servers:
try:
dns_servers = get_dns_servers()
except:
time.sleep(300)
seed = random.randint(10**9, 10**10 - 1) seed = random.randint(10**9, 10**10 - 1)
ip_generator = rig(seed) ip_generator = rig(seed)
for ip in ip_generator: for ip in ip_generator:
if len(tasks) < args.concurrency: if len(tasks) < args.concurrency:
dns = random.choice(dns_servers) resolver = random.choice(dns_resolvers)
task = asyncio.create_task(rdns(semaphore, ip, dns)) task = asyncio.create_task(rdns(semaphore, ip, resolver))
tasks.append(task) tasks.append(task)
else: else:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
@ -113,8 +99,8 @@ async def main():
result = result.replace('.mil', f'{colors.spooky}.gov{colors.reset}') result = result.replace('.mil', f'{colors.spooky}.gov{colors.reset}')
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}') print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}')
elif '.gov.' in result or '.mil.' in result: elif '.gov.' in result or '.mil.' in result:
result = result.replace('.gov.', f'{colors.spooky}.gov.{colors.reset}') result = result.replace('.gov.', f'{colors.spooky}.gov.{colors.ptr}')
result = result.replace('.mil.', f'{colors.spooky}.mil.{colors.reset}') result = result.replace('.mil.', f'{colors.spooky}.mil.{colors.ptr}')
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}') print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}')
else: else:
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}') print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}')