From a2b8d53a830a657637ea1994d79cc9b99685c039 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Fri, 24 Nov 2023 03:44:24 -0500 Subject: [PATCH] Code clean up, still tinkering here. --- README.md | 2 -- ptrstream.py | 84 +++++++++++++++++++++++++++++++--------------------- 2 files changed, 51 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 7aadffd..cd21ff4 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,6 @@ PTRStream is an asynchronous reverse DNS lookup tool developed in Python. It generates random IP addresses and performs reverse DNS lookups using various DNS servers. - - ## Requirements - [python](https://www.python.org/) - [aiodns](https://pypi.org/project/aiodns/) *(pip install aiodns)* diff --git a/ptrstream.py b/ptrstream.py index a58c851..b7f21e3 100644 --- a/ptrstream.py +++ b/ptrstream.py @@ -4,6 +4,7 @@ import argparse import asyncio import ipaddress +import os import random import time import urllib.request @@ -23,7 +24,6 @@ class colors: reset = '\033[0m' separator = '\033[90m' -dns_resolvers = [] def get_dns_servers() -> list: '''Get a list of DNS servers to use for lookups.''' @@ -31,13 +31,13 @@ def get_dns_servers() -> list: results = source.read().decode().split('\n') return [server for server in results if ':' not in server] + async def rdns(semaphore: asyncio.Semaphore, ip_address: str, resolver: aiodns.DNSResolver): ''' Perform a reverse DNS lookup on an IP address. - :param ip_address: The IP address to lookup. - :param resolver: The DNS resolver to use for lookups. - :param semaphore: A semaphore to limit the number of concurrent lookups. + :param semaphore: The semaphore to use for concurrency. + :param ip_address: The IP address to perform a reverse DNS lookup on. ''' async with semaphore: reverse_name = ipaddress.ip_address(ip_address).reverse_pointer @@ -47,6 +47,7 @@ async def rdns(semaphore: asyncio.Semaphore, ip_address: str, resolver: aiodns.D except: return ip_address, None + def rig(seed: int) -> str: ''' Random IP generator. @@ -60,22 +61,58 @@ def rig(seed: int) -> str: ip = ipaddress.ip_address(shuffled_index) yield str(ip) -async def main(): - '''Generate random IPs and perform reverse DNS lookups.''' +def fancy_print(ip: str, result: str): + ''' + Print the IP address and PTR record in a fancy way. + + :param ip: The IP address. + :param result: The PTR record. + ''' + if result in ('127.0.0.1', 'localhost'): + print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}-> {result}{colors.reset}') + else: + if ip in result: + result = result.replace(ip, f'{colors.ip_match}{ip}{colors.ptr}') + elif (daship := ip.replace('.', '-')) in result: + result = result.replace(daship, f'{colors.ip_match}{daship}{colors.ptr}') + elif (revip := '.'.join(ip.split('.')[::-1])) in result: + result = result.replace(revip, f'{colors.ip_match}{revip}{colors.ptr}') + elif result.endswith('.gov') or result.endswith('.mil'): + result = result.replace('.gov', f'{colors.spooky}.gov{colors.reset}') + result = result.replace('.mil', f'{colors.spooky}.mil{colors.reset}') + elif '.gov.' in result or '.mil.' in result: + result = result.replace('.gov.', f'{colors.spooky}.gov.{colors.ptr}') + result = result.replace('.mil.', f'{colors.spooky}.mil.{colors.ptr}') + print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}') + + +async def main(args: argparse.Namespace): + ''' + Generate random IPs and perform reverse DNS lookups. + + :param args: The command-line arguments. + ''' + if args.resolvers: + if os.path.exists(args.resolvers): + with open(args.resolvers) as file: + dns_resolvers = [item.strip() for item in file.read().splitlines()] + else: + raise FileNotFoundError(f'could not find file \'{args.resolvers}\'') + else: + dns_resolvers = get_dns_servers() + dns_resolvers = random.shuffle(dns_resolvers) + + resolver = aiodns.DNSResolver(nameservers=dns_resolvers, timeout=args.timeout, tries=args.retries, rotate=True) semaphore = asyncio.Semaphore(args.concurrency) + tasks = [] results_cache = [] - global dns_resolvers - if not dns_resolvers: - dns_resolvers = [aiodns.DNSResolver(nameservers=[server], rotate=False, timeout=args.timeout) for server in get_dns_servers()[:args.concurrency]] - seed = random.randint(10**9, 10**10 - 1) ip_generator = rig(seed) for ip in ip_generator: if len(tasks) < args.concurrency: - resolver = random.choice(dns_resolvers) task = asyncio.create_task(rdns(semaphore, ip, resolver)) tasks.append(task) else: @@ -84,27 +121,7 @@ async def main(): for task in done: ip, result = task.result() if result: - if result in ('127.0.0.1', 'localhost'): - print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}-> {result}{colors.reset}') - elif ip in result: - result = result.replace(ip, f'{colors.ip_match}{ip}{colors.ptr}') - elif (daship := ip.replace('.', '-')) in result: - change = result.replace(daship, f'{colors.ip_match}{daship}{colors.ptr}') - print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{change}{colors.reset}') - elif (revip := '.'.join(ip.split('.')[::-1])) in result: - change = result.replace(revip, f'{colors.ip_match}{revip}{colors.ptr}') - print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{change}{colors.reset}') - elif result.endswith('.gov') or result.endswith('.mil'): - change = result.replace('.gov', f'{colors.spooky}.gov{colors.reset}') - change = change.replace('.mil', f'{colors.spooky}.mil{colors.reset}') - print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}') - elif '.gov.' in result or '.mil.' in result: - change = result.replace('.gov.', f'{colors.spooky}.gov.{colors.ptr}') - change = change.replace('.mil.', f'{colors.spooky}.mil.{colors.ptr}') - print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}') - else: - print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}') - + fancy_print(ip, result) results_cache.append(f'{ip}:{result}') if len(results_cache) >= 1000: stamp = time.strftime('%Y%m%d') @@ -119,6 +136,7 @@ if __name__ == '__main__': parser.add_argument('-c', '--concurrency', type=int, default=50, help='Control the speed of lookups.') parser.add_argument('-t', '--timeout', type=int, default=5, help='Timeout for DNS lookups.') parser.add_argument('-r', '--resolvers', type=str, help='File containing DNS servers to use for lookups.') + parser.add_argument('-rt', '--retries', type=int, default=3, help='Number of times to retry a DNS lookup.') args = parser.parse_args() - asyncio.run(main()) + asyncio.run(main(args))