massrdap/massrdap.py

142 lines
5.1 KiB
Python

#!/usr/bin/env python
# MassRDAP - developed by acidvegas (https://git.acid.vegas/massrdap)
import argparse
import asyncio
import logging
import json
import re
try:
import aiofiles
except ImportError:
raise ImportError('missing required aiofiles library (pip install aiofiles)')
try:
import aiohttp
except ImportError:
raise ImportError('missing required aiohttp library (pip install aiohttp)')
# Color codes
BLUE = '\033[1;34m'
CYAN = '\033[1;36m'
GREEN = '\033[1;32m'
GREY = '\033[1;90m'
PINK = '\033[1;95m'
PURPLE = '\033[0;35m'
RED = '\033[1;31m'
YELLOW = '\033[1;33m'
RESET = '\033[0m'
# Setup basic logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Global variable to store RDAP servers
RDAP_SERVERS = {}
async def fetch_rdap_servers():
'''Fetches RDAP servers from IANA's RDAP Bootstrap file.'''
async with aiohttp.ClientSession() as session:
async with session.get('https://data.iana.org/rdap/dns.json') as response:
data = await response.json()
for entry in data['services']:
tlds = entry[0]
rdap_url = entry[1][0]
for tld in tlds:
RDAP_SERVERS[tld] = rdap_url
def get_tld(domain: str):
'''Extracts the top-level domain from a domain name.'''
parts = domain.split('.')
return '.'.join(parts[1:]) if len(parts) > 1 else parts[0]
async def lookup_domain(domain: str, proxy_url: str, semaphore: asyncio.Semaphore, success_file, failure_file):
'''
Looks up a domain using the RDAP protocol.
:param domain: The domain to look up.
:param proxy_url: The proxy URL to use for the request.
:param semaphore: The semaphore to use for concurrency limiting.
'''
async with semaphore:
tld = get_tld(domain)
rdap_url = RDAP_SERVERS.get(tld)
if not rdap_url:
return
query_url = f'{rdap_url}domain/{domain}'
try:
async with aiohttp.ClientSession() as session:
async with session.get(query_url, proxy=proxy_url if proxy_url else None) as response:
if response.status == 200:
data = await response.json()
await success_file.write(json.dumps(data) + '\n')
print(f'{GREEN}SUCCESS {GREY}| {BLUE}{response.status} {GREY}| {PURPLE}{rdap_url.ljust(50)} {GREY}| {CYAN}{domain}{GREEN}')
else:
await failure_file.write(domain + '\n')
print(f'{RED}FAILED {GREY}| {YELLOW}{response.status} {GREY}| {PURPLE}{rdap_url.ljust(50)} {GREY}| {CYAN}{domain}{RESET}')
except Exception as e:
print(f'{RED}FAILED {GREY}| --- | {PURPLE}{rdap_url.ljust(50)} {GREY}| {CYAN}{domain} {RED}| {e}{RESET}')
async def process_domains(args: argparse.Namespace):
'''
Processes a list of domains, performing RDAP lookups for each one.
:param args: The parsed command-line arguments.
'''
await fetch_rdap_servers() # Populate RDAP_SERVERS with TLDs and their RDAP servers
if not RDAP_SERVERS:
logging.error('No RDAP servers found.')
return
semaphore = asyncio.Semaphore(args.concurrency)
async with aiofiles.open(args.output, 'w') as success_file, aiofiles.open(args.failed, 'w') as failure_file:
async with aiofiles.open(args.input_file) as file:
async for domain in file:
domain = domain.strip()
if domain:
await semaphore.acquire()
task = asyncio.create_task(lookup_domain(domain, args.proxy, semaphore, success_file, failure_file))
task.add_done_callback(lambda t: semaphore.release())
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Perform RDAP lookups for a list of domains.')
parser.add_argument('-i', '--input_file', required=True, help='File containing list of domains (one per line).')
parser.add_argument('-p', '--proxy', help='Proxy in user:pass@host:port format. If not supplied, none is used.')
parser.add_argument('-c', '--concurrency', type=int, default=25, help='Number of concurrent requests to make. (default: 25)')
parser.add_argument('-o', '--output', default='output.json', help='Output file to write successful RDAP data to. (default: output.json)')
parser.add_argument('-f', '--failed', default='failed.txt', help='Output file to write failed domains to. (optional)')
args = parser.parse_args()
if not args.input_file:
raise ValueError('File path is required.')
if args.concurrency < 1:
raise ValueError('Concurrency must be at least 1.')
if args.proxy:
if not re.match(r'^https?:\/\/[^:]+:[^@]+@[^:]+:\d+$', args.proxy):
raise ValueError('Invalid proxy format. Must be in user:pass@host:port format.')
if not args.output:
raise ValueError('Output file path is required.')
if not args.failed:
print(f'{YELLOW}Failed domains will not be saved.{RESET}')
asyncio.run(process_domains(args))