142 lines
5.1 KiB
Python
142 lines
5.1 KiB
Python
|
#!/usr/bin/env python
|
||
|
# MassRDAP - developed by acidvegas (https://git.acid.vegas/massrdap)
|
||
|
|
||
|
import argparse
|
||
|
import asyncio
|
||
|
import logging
|
||
|
import json
|
||
|
import re
|
||
|
|
||
|
try:
|
||
|
import aiofiles
|
||
|
except ImportError:
|
||
|
raise ImportError('missing required aiofiles library (pip install aiofiles)')
|
||
|
|
||
|
try:
|
||
|
import aiohttp
|
||
|
except ImportError:
|
||
|
raise ImportError('missing required aiohttp library (pip install aiohttp)')
|
||
|
|
||
|
# Color codes
|
||
|
BLUE = '\033[1;34m'
|
||
|
CYAN = '\033[1;36m'
|
||
|
GREEN = '\033[1;32m'
|
||
|
GREY = '\033[1;90m'
|
||
|
PINK = '\033[1;95m'
|
||
|
PURPLE = '\033[0;35m'
|
||
|
RED = '\033[1;31m'
|
||
|
YELLOW = '\033[1;33m'
|
||
|
RESET = '\033[0m'
|
||
|
|
||
|
# Setup basic logging
|
||
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
|
||
|
# Global variable to store RDAP servers
|
||
|
RDAP_SERVERS = {}
|
||
|
|
||
|
async def fetch_rdap_servers():
|
||
|
'''Fetches RDAP servers from IANA's RDAP Bootstrap file.'''
|
||
|
|
||
|
async with aiohttp.ClientSession() as session:
|
||
|
async with session.get('https://data.iana.org/rdap/dns.json') as response:
|
||
|
data = await response.json()
|
||
|
for entry in data['services']:
|
||
|
tlds = entry[0]
|
||
|
rdap_url = entry[1][0]
|
||
|
for tld in tlds:
|
||
|
RDAP_SERVERS[tld] = rdap_url
|
||
|
|
||
|
|
||
|
def get_tld(domain: str):
|
||
|
'''Extracts the top-level domain from a domain name.'''
|
||
|
parts = domain.split('.')
|
||
|
return '.'.join(parts[1:]) if len(parts) > 1 else parts[0]
|
||
|
|
||
|
|
||
|
async def lookup_domain(domain: str, proxy_url: str, semaphore: asyncio.Semaphore, success_file, failure_file):
|
||
|
'''
|
||
|
Looks up a domain using the RDAP protocol.
|
||
|
|
||
|
:param domain: The domain to look up.
|
||
|
:param proxy_url: The proxy URL to use for the request.
|
||
|
:param semaphore: The semaphore to use for concurrency limiting.
|
||
|
'''
|
||
|
|
||
|
async with semaphore:
|
||
|
tld = get_tld(domain)
|
||
|
|
||
|
rdap_url = RDAP_SERVERS.get(tld)
|
||
|
|
||
|
if not rdap_url:
|
||
|
return
|
||
|
|
||
|
query_url = f'{rdap_url}domain/{domain}'
|
||
|
|
||
|
try:
|
||
|
async with aiohttp.ClientSession() as session:
|
||
|
async with session.get(query_url, proxy=proxy_url if proxy_url else None) as response:
|
||
|
|
||
|
if response.status == 200:
|
||
|
data = await response.json()
|
||
|
await success_file.write(json.dumps(data) + '\n')
|
||
|
print(f'{GREEN}SUCCESS {GREY}| {BLUE}{response.status} {GREY}| {PURPLE}{rdap_url.ljust(50)} {GREY}| {CYAN}{domain}{GREEN}')
|
||
|
|
||
|
else:
|
||
|
await failure_file.write(domain + '\n')
|
||
|
print(f'{RED}FAILED {GREY}| {YELLOW}{response.status} {GREY}| {PURPLE}{rdap_url.ljust(50)} {GREY}| {CYAN}{domain}{RESET}')
|
||
|
|
||
|
except Exception as e:
|
||
|
print(f'{RED}FAILED {GREY}| --- | {PURPLE}{rdap_url.ljust(50)} {GREY}| {CYAN}{domain} {RED}| {e}{RESET}')
|
||
|
|
||
|
|
||
|
async def process_domains(args: argparse.Namespace):
|
||
|
'''
|
||
|
Processes a list of domains, performing RDAP lookups for each one.
|
||
|
|
||
|
:param args: The parsed command-line arguments.
|
||
|
'''
|
||
|
|
||
|
await fetch_rdap_servers() # Populate RDAP_SERVERS with TLDs and their RDAP servers
|
||
|
|
||
|
if not RDAP_SERVERS:
|
||
|
logging.error('No RDAP servers found.')
|
||
|
return
|
||
|
|
||
|
semaphore = asyncio.Semaphore(args.concurrency)
|
||
|
|
||
|
async with aiofiles.open(args.output, 'w') as success_file, aiofiles.open(args.failed, 'w') as failure_file:
|
||
|
async with aiofiles.open(args.input_file) as file:
|
||
|
async for domain in file:
|
||
|
domain = domain.strip()
|
||
|
if domain:
|
||
|
await semaphore.acquire()
|
||
|
task = asyncio.create_task(lookup_domain(domain, args.proxy, semaphore, success_file, failure_file))
|
||
|
task.add_done_callback(lambda t: semaphore.release())
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
parser = argparse.ArgumentParser(description='Perform RDAP lookups for a list of domains.')
|
||
|
parser.add_argument('-i', '--input_file', required=True, help='File containing list of domains (one per line).')
|
||
|
parser.add_argument('-p', '--proxy', help='Proxy in user:pass@host:port format. If not supplied, none is used.')
|
||
|
parser.add_argument('-c', '--concurrency', type=int, default=25, help='Number of concurrent requests to make. (default: 25)')
|
||
|
parser.add_argument('-o', '--output', default='output.json', help='Output file to write successful RDAP data to. (default: output.json)')
|
||
|
parser.add_argument('-f', '--failed', default='failed.txt', help='Output file to write failed domains to. (optional)')
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
if not args.input_file:
|
||
|
raise ValueError('File path is required.')
|
||
|
|
||
|
if args.concurrency < 1:
|
||
|
raise ValueError('Concurrency must be at least 1.')
|
||
|
|
||
|
if args.proxy:
|
||
|
if not re.match(r'^https?:\/\/[^:]+:[^@]+@[^:]+:\d+$', args.proxy):
|
||
|
raise ValueError('Invalid proxy format. Must be in user:pass@host:port format.')
|
||
|
|
||
|
if not args.output:
|
||
|
raise ValueError('Output file path is required.')
|
||
|
|
||
|
if not args.failed:
|
||
|
print(f'{YELLOW}Failed domains will not be saved.{RESET}')
|
||
|
|
||
|
asyncio.run(process_domains(args))
|