Code clean up, still tinkering here.

This commit is contained in:
Dionysus 2023-11-24 03:44:24 -05:00
parent 1ea867e1cb
commit a2b8d53a83
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
2 changed files with 51 additions and 35 deletions

View File

@ -2,8 +2,6 @@
PTRStream is an asynchronous reverse DNS lookup tool developed in Python. It generates random IP addresses and performs reverse DNS lookups using various DNS servers. PTRStream is an asynchronous reverse DNS lookup tool developed in Python. It generates random IP addresses and performs reverse DNS lookups using various DNS servers.
## Requirements ## Requirements
- [python](https://www.python.org/) - [python](https://www.python.org/)
- [aiodns](https://pypi.org/project/aiodns/) *(pip install aiodns)* - [aiodns](https://pypi.org/project/aiodns/) *(pip install aiodns)*

View File

@ -4,6 +4,7 @@
import argparse import argparse
import asyncio import asyncio
import ipaddress import ipaddress
import os
import random import random
import time import time
import urllib.request import urllib.request
@ -23,7 +24,6 @@ class colors:
reset = '\033[0m' reset = '\033[0m'
separator = '\033[90m' separator = '\033[90m'
dns_resolvers = []
def get_dns_servers() -> list: def get_dns_servers() -> list:
'''Get a list of DNS servers to use for lookups.''' '''Get a list of DNS servers to use for lookups.'''
@ -31,13 +31,13 @@ def get_dns_servers() -> list:
results = source.read().decode().split('\n') results = source.read().decode().split('\n')
return [server for server in results if ':' not in server] return [server for server in results if ':' not in server]
async def rdns(semaphore: asyncio.Semaphore, ip_address: str, resolver: aiodns.DNSResolver): async def rdns(semaphore: asyncio.Semaphore, ip_address: str, resolver: aiodns.DNSResolver):
''' '''
Perform a reverse DNS lookup on an IP address. Perform a reverse DNS lookup on an IP address.
:param ip_address: The IP address to lookup. :param semaphore: The semaphore to use for concurrency.
:param resolver: The DNS resolver to use for lookups. :param ip_address: The IP address to perform a reverse DNS lookup on.
:param semaphore: A semaphore to limit the number of concurrent lookups.
''' '''
async with semaphore: async with semaphore:
reverse_name = ipaddress.ip_address(ip_address).reverse_pointer reverse_name = ipaddress.ip_address(ip_address).reverse_pointer
@ -47,6 +47,7 @@ async def rdns(semaphore: asyncio.Semaphore, ip_address: str, resolver: aiodns.D
except: except:
return ip_address, None return ip_address, None
def rig(seed: int) -> str: def rig(seed: int) -> str:
''' '''
Random IP generator. Random IP generator.
@ -60,22 +61,58 @@ def rig(seed: int) -> str:
ip = ipaddress.ip_address(shuffled_index) ip = ipaddress.ip_address(shuffled_index)
yield str(ip) yield str(ip)
async def main(): def fancy_print(ip: str, result: str):
'''Generate random IPs and perform reverse DNS lookups.''' '''
Print the IP address and PTR record in a fancy way.
:param ip: The IP address.
:param result: The PTR record.
'''
if result in ('127.0.0.1', 'localhost'):
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}-> {result}{colors.reset}')
else:
if ip in result:
result = result.replace(ip, f'{colors.ip_match}{ip}{colors.ptr}')
elif (daship := ip.replace('.', '-')) in result:
result = result.replace(daship, f'{colors.ip_match}{daship}{colors.ptr}')
elif (revip := '.'.join(ip.split('.')[::-1])) in result:
result = result.replace(revip, f'{colors.ip_match}{revip}{colors.ptr}')
elif result.endswith('.gov') or result.endswith('.mil'):
result = result.replace('.gov', f'{colors.spooky}.gov{colors.reset}')
result = result.replace('.mil', f'{colors.spooky}.mil{colors.reset}')
elif '.gov.' in result or '.mil.' in result:
result = result.replace('.gov.', f'{colors.spooky}.gov.{colors.ptr}')
result = result.replace('.mil.', f'{colors.spooky}.mil.{colors.ptr}')
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}')
async def main(args: argparse.Namespace):
'''
Generate random IPs and perform reverse DNS lookups.
:param args: The command-line arguments.
'''
if args.resolvers:
if os.path.exists(args.resolvers):
with open(args.resolvers) as file:
dns_resolvers = [item.strip() for item in file.read().splitlines()]
else:
raise FileNotFoundError(f'could not find file \'{args.resolvers}\'')
else:
dns_resolvers = get_dns_servers()
dns_resolvers = random.shuffle(dns_resolvers)
resolver = aiodns.DNSResolver(nameservers=dns_resolvers, timeout=args.timeout, tries=args.retries, rotate=True)
semaphore = asyncio.Semaphore(args.concurrency) semaphore = asyncio.Semaphore(args.concurrency)
tasks = [] tasks = []
results_cache = [] results_cache = []
global dns_resolvers
if not dns_resolvers:
dns_resolvers = [aiodns.DNSResolver(nameservers=[server], rotate=False, timeout=args.timeout) for server in get_dns_servers()[:args.concurrency]]
seed = random.randint(10**9, 10**10 - 1) seed = random.randint(10**9, 10**10 - 1)
ip_generator = rig(seed) ip_generator = rig(seed)
for ip in ip_generator: for ip in ip_generator:
if len(tasks) < args.concurrency: if len(tasks) < args.concurrency:
resolver = random.choice(dns_resolvers)
task = asyncio.create_task(rdns(semaphore, ip, resolver)) task = asyncio.create_task(rdns(semaphore, ip, resolver))
tasks.append(task) tasks.append(task)
else: else:
@ -84,27 +121,7 @@ async def main():
for task in done: for task in done:
ip, result = task.result() ip, result = task.result()
if result: if result:
if result in ('127.0.0.1', 'localhost'): fancy_print(ip, result)
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}-> {result}{colors.reset}')
elif ip in result:
result = result.replace(ip, f'{colors.ip_match}{ip}{colors.ptr}')
elif (daship := ip.replace('.', '-')) in result:
change = result.replace(daship, f'{colors.ip_match}{daship}{colors.ptr}')
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{change}{colors.reset}')
elif (revip := '.'.join(ip.split('.')[::-1])) in result:
change = result.replace(revip, f'{colors.ip_match}{revip}{colors.ptr}')
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{change}{colors.reset}')
elif result.endswith('.gov') or result.endswith('.mil'):
change = result.replace('.gov', f'{colors.spooky}.gov{colors.reset}')
change = change.replace('.mil', f'{colors.spooky}.mil{colors.reset}')
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}')
elif '.gov.' in result or '.mil.' in result:
change = result.replace('.gov.', f'{colors.spooky}.gov.{colors.ptr}')
change = change.replace('.mil.', f'{colors.spooky}.mil.{colors.ptr}')
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}')
else:
print(f'{colors.ip}{ip.ljust(15)}{colors.reset} {colors.separator}->{colors.reset} {colors.ptr}{result}{colors.reset}')
results_cache.append(f'{ip}:{result}') results_cache.append(f'{ip}:{result}')
if len(results_cache) >= 1000: if len(results_cache) >= 1000:
stamp = time.strftime('%Y%m%d') stamp = time.strftime('%Y%m%d')
@ -119,6 +136,7 @@ if __name__ == '__main__':
parser.add_argument('-c', '--concurrency', type=int, default=50, help='Control the speed of lookups.') parser.add_argument('-c', '--concurrency', type=int, default=50, help='Control the speed of lookups.')
parser.add_argument('-t', '--timeout', type=int, default=5, help='Timeout for DNS lookups.') parser.add_argument('-t', '--timeout', type=int, default=5, help='Timeout for DNS lookups.')
parser.add_argument('-r', '--resolvers', type=str, help='File containing DNS servers to use for lookups.') parser.add_argument('-r', '--resolvers', type=str, help='File containing DNS servers to use for lookups.')
parser.add_argument('-rt', '--retries', type=int, default=3, help='Number of times to retry a DNS lookup.')
args = parser.parse_args() args = parser.parse_args()
asyncio.run(main()) asyncio.run(main(args))