From 173b7e3cf0f91d91161123ba8558f8200f948ee1 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Mon, 10 Feb 2025 01:01:37 -0500 Subject: [PATCH] Cleanup output and colors --- httpz.py | 110 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 56 insertions(+), 54 deletions(-) diff --git a/httpz.py b/httpz.py index b1d0fb0..846a128 100644 --- a/httpz.py +++ b/httpz.py @@ -66,6 +66,8 @@ class Colors: LIGHT_RED = '\033[38;5;203m' # Light red DARK_GREEN = '\033[38;5;22m' # Dark green PINK = '\033[38;5;198m' # Bright pink + GRAY = '\033[90m' # Gray color + CYAN = '\033[96m' # Cyan color _SILENT_MODE = False @@ -86,15 +88,17 @@ def info(msg: str) -> None: logging.info(msg) -async def resolve_dns(domain: str) -> tuple: +async def resolve_dns(domain: str, timeout: int = 5) -> tuple: ''' Resolve A, AAAA, and CNAME records for a domain :param domain: domain to resolve + :param timeout: timeout in seconds :return: tuple of (ips, cname) ''' resolver = dns.asyncresolver.Resolver() + resolver.lifetime = timeout ips = [] cname = None @@ -183,7 +187,7 @@ async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict: return None cert_bin = ssl_object.getpeercert(binary_form=True) - cert = x509.load_der_x509_certificate(cert_bin) + cert = x509.load_der_x509_certificate(cert_bin) # Get certificate details cert_info = { @@ -221,10 +225,10 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir if not domain.startswith(('http://', 'https://')): protocols = ['https://', 'http://'] - base_domain = domain + base_domain = domain.rstrip('/') else: protocols = [domain] - base_domain = domain.split('://')[-1].split('/')[0] + base_domain = domain.split('://')[-1].split('/')[0].rstrip('/') result = { 'domain' : base_domain, @@ -235,6 +239,7 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir 'url' : f"https://{base_domain}" if base_domain else domain, 'ips' : [], 'cname' : None, + 'nameservers' : [], 'favicon_hash' : None, 'headers' : {}, 'content_length' : None, @@ -243,7 +248,16 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir } # Resolve DNS records - result['ips'], result['cname'] = await resolve_dns(base_domain) + result['ips'], result['cname'] = await resolve_dns(base_domain, timeout) + + # After DNS resolution, add nameserver lookup: + try: + resolver = dns.asyncresolver.Resolver() + resolver.lifetime = timeout + ns_records = await resolver.resolve(base_domain, 'NS') + result['nameservers'] = [str(ns).rstrip('.') for ns in ns_records] + except Exception as e: + debug(f'Error getting nameservers for {base_domain}: {str(e)}') for protocol in protocols: url = f'{protocol}{base_domain}' @@ -270,10 +284,10 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir html = (await response.text())[:1024*1024] soup = bs4.BeautifulSoup(html, 'html.parser') if soup.title: - title = ' '.join(soup.title.string.strip().split()) if soup.title.string else '' + title = ' '.join(soup.title.string.strip().split()).rstrip('.') if soup.title.string else '' result['title'] = title[:300] if soup.get_text(): - body = ' '.join(soup.get_text().split()) + body = ' '.join(soup.get_text().split()).rstrip('.') result['body'] = body[:500] result['favicon_hash'] = await get_favicon_hash(session, url, html) break @@ -283,7 +297,7 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir continue if check_axfr: - await try_axfr(base_domain) + await try_axfr(base_domain, timeout) return result @@ -401,7 +415,7 @@ def format_status_output(result: dict, debug: bool = False, show_fields: dict = headers_text = [] for k, v in result['headers'].items(): headers_text.append(f"{k}: {v}") - parts.append(f"{Colors.LIGHT_RED}[{', '.join(headers_text)}]{Colors.RESET}") + parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}") else: # Only show content-type and content-length if headers aren't shown if show_fields['content_type'] and result['content_type']: @@ -438,20 +452,6 @@ def format_status_output(result: dict, debug: bool = False, show_fields: dict = return ' '.join(parts) -def count_domains(input_source: str = None) -> int: - ''' - Count total number of domains from file or stdin - - :param input_source: path to file containing domains, or None for stdin - ''' - if input_source == '-' or input_source is None: - # Can't count lines from stdin without consuming them - return 0 - else: - with open(input_source, 'r') as f: - return sum(1 for line in f if line.strip()) - - async def process_domains(input_source: str = None, debug: bool = False, concurrent_limit: int = 100, show_fields: dict = None, output_file: str = None, jsonl: bool = None, timeout: int = 5, match_codes: set = None, exclude_codes: set = None, show_progress: bool = False, check_axfr: bool = False): ''' Process domains from a file or stdin with concurrent requests @@ -471,15 +471,12 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr if input_source and input_source != '-' and not Path(input_source).exists(): raise FileNotFoundError(f'Domain file not found: {input_source}') - # Get total domain count if showing progress (only works for files) - total_domains = count_domains(input_source) if show_progress else 0 - processed_domains = 0 - # Clear the output file if specified if output_file: open(output_file, 'w').close() tasks = set() + processed_domains = 0 # Simple counter for all processed domains async def write_result(result: dict): '''Write a single result to the output file''' @@ -505,6 +502,8 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr output_dict['redirect_chain'] = result['redirect_chain'] if result['tls']: output_dict['tls'] = result['tls'] + if result['nameservers']: + output_dict['nameservers'] = result['nameservers'] # Get formatted output based on filters formatted = format_status_output(result, debug, show_fields, match_codes, exclude_codes) @@ -518,22 +517,16 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr # Console output if jsonl: - # Pure JSON Lines output without any logging prefixes print(json.dumps(output_dict)) else: + processed_domains += 1 # Increment counter for each domain processed if show_progress: - processed_domains += 1 - info(f"{Colors.BOLD}[{processed_domains}/{total_domains}]{Colors.RESET} {formatted}") - else: - info(formatted) + info(f"{Colors.GRAY}[{processed_domains}]{Colors.RESET} {formatted}") async with aiohttp.ClientSession() as session: # Start initial batch of tasks for domain in itertools.islice(domain_generator(input_source), concurrent_limit): - task = asyncio.create_task(check_domain(session, domain, - follow_redirects=show_fields['follow_redirects'], - timeout=timeout, - check_axfr=check_axfr)) + task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout, check_axfr=check_axfr)) tasks.add(task) # Process remaining domains, maintaining concurrent_limit active tasks @@ -548,10 +541,7 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr result = await task await write_result(result) - task = asyncio.create_task(check_domain(session, domain, - follow_redirects=show_fields['follow_redirects'], - timeout=timeout, - check_axfr=check_axfr)) + task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout, check_axfr=check_axfr)) tasks.add(task) # Wait for remaining tasks @@ -562,11 +552,12 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr await write_result(result) -async def try_axfr(domain: str) -> None: +async def try_axfr(domain: str, timeout: int = 5) -> None: ''' Try AXFR transfer for a domain against all its nameservers :param domain: Domain to attempt AXFR transfer + :param timeout: timeout in seconds ''' try: @@ -575,25 +566,36 @@ async def try_axfr(domain: str) -> None: # Get nameservers resolver = dns.asyncresolver.Resolver() - nameservers = await resolver.resolve(domain, 'NS') + resolver.lifetime = timeout + ns_records = await resolver.resolve(domain, 'NS') + nameservers = [str(ns).rstrip('.') for ns in ns_records] - # Try AXFR against each nameserver - for ns in nameservers: - ns_host = str(ns).rstrip('.') + # Try AXFR against each nameserver's IPs + for ns_host in nameservers: try: - # Get nameserver IP - ns_ips = await resolver.resolve(ns_host, 'A') - for ns_ip in ns_ips: - ns_ip = str(ns_ip) + # Get A records + a_ips = [] + try: + a_records = await resolver.resolve(ns_host, 'A') + a_ips.extend(str(ip) for ip in a_records) + except Exception as e: + debug(f'Failed to get A records for {ns_host}: {str(e)}') + + # Get AAAA records + try: + aaaa_records = await resolver.resolve(ns_host, 'AAAA') + a_ips.extend(str(ip) for ip in aaaa_records) + except Exception as e: + debug(f'Failed to get AAAA records for {ns_host}: {str(e)}') + + # Try AXFR against each IP + for ns_ip in a_ips: try: - # Attempt zone transfer - zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain)) - - # Save successful transfer + zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout+10)) filename = f'axfrout/{domain}_{ns_ip}.zone' with open(filename, 'w') as f: zone.to_text(f) - info(f'{Colors.RED}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}') + info(f'{Colors.GREEN}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}') except Exception as e: debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}') except Exception as e: