diff --git a/httpz.py b/httpz.py index ef649de..c785bfa 100644 --- a/httpz.py +++ b/httpz.py @@ -11,13 +11,9 @@ import asyncio import itertools import json import logging -from pathlib import Path -import sys import os -import dns.zone -import dns.query -import dns.resolver import random +import sys try: import aiohttp @@ -43,6 +39,9 @@ except ImportError: try: import dns.asyncresolver + import dns.query + import dns.resolver + import dns.zone except ImportError: raise ImportError('missing \'dns\' library (pip install dnspython)') @@ -73,114 +72,66 @@ class Colors: _SILENT_MODE = False -def debug(msg: str) -> None: - '''Print debug message if not in silent mode''' - if not _SILENT_MODE: - logging.debug(msg) - -def error(msg: str) -> None: - '''Print error message if not in silent mode''' - if not _SILENT_MODE: - logging.error(msg) - -def info(msg: str) -> None: - '''Print info message if not in silent mode''' - if not _SILENT_MODE: - logging.info(msg) +def debug(msg: str): + if not _SILENT_MODE: logging.debug(msg) +def error(msg: str): + if not _SILENT_MODE: logging.error(msg) +def info(msg: str): + if not _SILENT_MODE: logging.info(msg) -def load_resolvers(resolver_file: str = None) -> list: +async def get_cert_info(ssl_object, url: str) -> dict: ''' - Load DNS resolvers from file or return default resolvers + Get SSL certificate information for a domain - :param resolver_file: Path to file containing resolver IPs - :return: List of resolver IPs + :param ssl_object: SSL object to get certificate info from + :param url: URL to get certificate info from ''' - if resolver_file: + + try: + # Check if we have a certificate + if not ssl_object: + return None + + # Get the certificate in DER format + if not (cert_der := ssl_object.getpeercert(binary_form=True)): + return None + + # Load the certificate + cert = x509.load_der_x509_certificate(cert_der) + + # Extract all subject alternative names try: - with open(resolver_file) as f: - resolvers = [line.strip() for line in f if line.strip()] - if resolvers: - return resolvers - except Exception as e: - debug(f'Error loading resolvers from {resolver_file}: {str(e)}') - - # Default to Cloudflare, Google, Quad9 if no file or empty file - return ['1.1.1.1', '8.8.8.8', '9.9.9.9'] + san_extension = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) + alt_names = [name.value for name in san_extension.value] if san_extension else [] + except x509.extensions.ExtensionNotFound: + alt_names = [] + # Get subject CN + try: + common_name = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + except IndexError: + common_name = None -async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None) -> tuple: - ''' - Resolve all DNS records (NS, A, AAAA, CNAME) for a domain - - :param domain: Domain to resolve - :param timeout: Timeout in seconds - :param nameserver: Specific nameserver to use - :return: Tuple of (ips, cname, nameservers, ns_ips) - ''' - - resolver = dns.asyncresolver.Resolver() - resolver.lifetime = timeout - if nameserver: - resolver.nameservers = [nameserver] - - # Prepare all DNS queries - tasks = [ - resolver.resolve(domain, 'NS'), - resolver.resolve(domain, 'A'), - resolver.resolve(domain, 'AAAA'), - resolver.resolve(domain, 'CNAME') - ] - - # Run all queries concurrently - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Process results - nameservers = [] - ips = [] - cname = None - ns_ips = {} - - # NS records - if isinstance(results[0], dns.resolver.Answer): - nameservers = [str(ns).rstrip('.') for ns in results[0]] - - # Resolve each nameserver's IPs - ns_tasks = [] - for ns in nameservers: - ns_tasks.extend([ - resolver.resolve(ns, 'A'), - resolver.resolve(ns, 'AAAA') - ]) - - if ns_tasks: - ns_results = await asyncio.gather(*ns_tasks, return_exceptions=True) - - for i in range(0, len(ns_results), 2): - ns = nameservers[i//2] - ns_ips[ns] = [] - - # A records - if isinstance(ns_results[i], dns.resolver.Answer): - ns_ips[ns].extend(str(ip) for ip in ns_results[i]) - - # AAAA records - if isinstance(ns_results[i+1], dns.resolver.Answer): - ns_ips[ns].extend(str(ip) for ip in ns_results[i+1]) - - # A records - if isinstance(results[1], dns.resolver.Answer): - ips.extend(str(ip) for ip in results[1]) - - # AAAA records - if isinstance(results[2], dns.resolver.Answer): - ips.extend(str(ip) for ip in results[2]) - - # CNAME records - if isinstance(results[3], dns.resolver.Answer): - cname = str(results[3][0].target).rstrip('.') - - return sorted(set(ips)), cname, nameservers, ns_ips + # Get issuer CN + try: + issuer = cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + except IndexError: + issuer = None + + return { + 'fingerprint' : cert.fingerprint(hashes.SHA256()).hex(), + 'common_name' : common_name, + 'issuer' : issuer, + 'alt_names' : alt_names, + 'not_before' : cert.not_valid_before_utc.isoformat(), + 'not_after' : cert.not_valid_after_utc.isoformat(), + 'version' : cert.version.value, + 'serial_number' : format(cert.serial_number, 'x'), + } + except Exception as e: + debug(f'Error getting cert info for {url}: {str(e)}') + return None async def get_favicon_hash(session: aiohttp.ClientSession, base_url: str, html: str) -> str: @@ -214,75 +165,141 @@ async def get_favicon_hash(session: aiohttp.ClientSession, base_url: str, html: elif not favicon_url.startswith(('http://', 'https://')): favicon_url = base_url + '/' + favicon_url + # Get favicon hash async with session.get(favicon_url, timeout=10) as response: if response.status == 200: - content = await response.read() - if len(content) <= 1024*1024: # Check if favicon is <= 1MB - hash_value = mmh3.hash64(content)[0] - # Only return hash if it's not 0 (likely invalid favicon) - if hash_value != 0: - return str(hash_value) + content = (await response.read())[:1024*1024] + hash_value = mmh3.hash64(content)[0] + if hash_value != 0: + return str(hash_value) + except Exception as e: debug(f'Error getting favicon for {base_url}: {str(e)}') return None -async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict: +def human_size(size_bytes: int) -> str: ''' - Get SSL certificate information for a domain + Convert bytes to human readable string - :param session: aiohttp client session - :param url: URL to get certificate info from + :param size_bytes: Size in bytes ''' - try: - connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True) - - async with aiohttp.ClientSession(connector=connector) as cert_session: - async with cert_session.get(url) as response: - ssl_object = response._protocol.transport.get_extra_info('ssl_object') - - if not ssl_object: - return None - cert_der = ssl_object.getpeercert(binary_form=True) - if not cert_der: - return None + if not size_bytes: + return '0B' + + units = ('B', 'KB', 'MB', 'GB') + size = float(size_bytes) + unit_index = 0 + + while size >= 1024 and unit_index < len(units) - 1: + size /= 1024 + unit_index += 1 + + return f'{size:.1f}{units[unit_index]}' - cert = x509.load_der_x509_certificate(cert_der) - # Extract all subject alternative names - try: - san_extension = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) - alt_names = [name.value for name in san_extension.value] if san_extension else [] - except x509.extensions.ExtensionNotFound: - alt_names = [] +def input_generator(input_source: str): + ''' + Generator function to yield domains from file or stdin + + :param input_source: path to file containing domains, or None for stdin + ''' - # Get subject CN - try: - subject = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value - except IndexError: - subject = None + if input_source == '-' or input_source is None: + for line in sys.stdin: + if line.strip(): + yield line.strip() + else: + with open(input_source, 'r') as f: + for line in f: + if line.strip(): + yield line.strip() - # Get issuer CN - try: - issuer = cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value - except IndexError: - issuer = None - return { - 'fingerprint': cert.fingerprint(hashes.SHA256()).hex(), - 'subject': subject, - 'issuer': issuer, - 'alt_names': alt_names, - 'not_before': cert.not_valid_before_utc.isoformat(), - 'not_after': cert.not_valid_after_utc.isoformat(), - 'version': cert.version.value, - 'serial_number': format(cert.serial_number, 'x'), - } - except Exception as e: - debug(f'Error getting cert info for {url}: {str(e)}') - return None +async def load_resolvers(resolver_file: str = None) -> list: + ''' + Load DNS resolvers from file or return default resolvers + + :param resolver_file: Path to file containing resolver IPs + :return: List of resolver IPs + ''' + + if resolver_file: + try: + with open(resolver_file) as f: + resolvers = [line.strip() for line in f if line.strip()] + if resolvers: + return resolvers + except Exception as e: + debug(f'Error loading resolvers from {resolver_file}: {str(e)}') + + else: + async with aiohttp.ClientSession() as session: + async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response: + resolvers = await response.text() + if not _SILENT_MODE: + info(f'Loaded {len(resolvers.splitlines()):,} resolvers.') + return [resolver.strip() for resolver in resolvers.splitlines()] + + +async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple: + ''' + Resolve all DNS records (NS, A, AAAA, CNAME) for a domain + + :param domain: Domain to resolve + :param timeout: Timeout in seconds + :param nameserver: Specific nameserver to use + ''' + + # Create the resolver + resolver = dns.asyncresolver.Resolver() + resolver.lifetime = timeout + + # Set the nameserver if provided + if nameserver: + resolver.nameservers = [nameserver] + + # Do all DNS lookups at once + results = await asyncio.gather(*[resolver.resolve(domain, rtype) for rtype in ('NS', 'A', 'AAAA', 'CNAME')], return_exceptions=True) + + # Parse the results + nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else [] + ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else []) + cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None + + # Get NS IPs + ns_ips = {} + if nameservers: + ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) for ns in nameservers for rtype in ('A', 'AAAA')], return_exceptions=True) + for i, ns in enumerate(nameservers): + ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] if isinstance(records, dns.resolver.Answer) for ip in records] + + # Try AXFR if enabled (using already resolved nameserver IPs) + if check_axfr: + try: + # Create the axfrout directory if it doesn't exist + os.makedirs('axfrout', exist_ok=True) + + # Iterate over each nameserver and their IPs + for ns_host, ips in ns_ips.items(): + for ns_ip in ips: + try: + # Perform the AXFR transfer + zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout)) + + # Write the zone to a file + with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f: + zone.to_text(f) + + info(f'{Colors.GREEN}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}') + except Exception as e: + debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}') + except Exception as e: + debug(f'Failed AXFR for {domain}: {str(e)}') + + return sorted(set(ips)), cname, nameservers, ns_ips async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redirects: bool = False, timeout: int = 5, check_axfr: bool = False, resolvers: list = None) -> dict: @@ -340,7 +357,7 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir 'body' : None, 'content_type' : None, 'url' : protocols[0], - 'port' : port, # Set port here + 'port' : port, 'ips' : [], 'cname' : None, 'nameservers' : [], @@ -353,24 +370,21 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir result.update(base_result) # Update result with base fields # Do all DNS lookups at once - ips, cname, nameservers, ns_ips = await resolve_all_dns(base_domain, timeout, nameserver) + ips, cname, nameservers, ns_ips = await resolve_all_dns(base_domain, timeout, nameserver, check_axfr) - result['ips'] = ips - result['cname'] = cname + result['ips'] = ips + result['cname'] = cname result['nameservers'] = nameservers - - # Try AXFR if enabled (using already resolved nameserver IPs) - if check_axfr: - await try_axfr(base_domain, ns_ips, timeout) + for url in protocols: try: max_redirects = 10 if follow_redirects else 0 async with session.get(url, timeout=timeout, allow_redirects=follow_redirects, max_redirects=max_redirects) as response: - result['status'] = response.status - result['url'] = str(response.url) - result['headers'] = dict(response.headers) - result['content_type'] = response.headers.get('content-type', '').split(';')[0] + result['status'] = response.status + result['url'] = str(response.url) + result['headers'] = dict(response.headers) + result['content_type'] = response.headers.get('content-type', '').split(';')[0] result['content_length'] = response.headers.get('content-length') # Track redirect chain @@ -381,7 +395,12 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir # Try to get cert info for any successful HTTPS connection if response.url.scheme == 'https': - result['tls'] = await get_cert_info(session, str(response.url)) + try: + ssl_object = response._protocol.transport.get_extra_info('ssl_object') + if ssl_object: # Only get cert info if we have a valid SSL object + result['tls'] = await get_cert_info(ssl_object, str(response.url)) + except AttributeError: + debug(f'Failed to get SSL info for {url}') if response.status == 200: html = (await response.text())[:1024*1024] @@ -406,58 +425,6 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir return result -def domain_generator(input_source: str): - ''' - Generator function to yield domains from file or stdin - - :param input_source: path to file containing domains, or None for stdin - ''' - - if input_source == '-' or input_source is None: - for line in sys.stdin: - if line.strip(): - yield line.strip() - else: - with open(input_source, 'r') as f: - for line in f: - if line.strip(): - yield line.strip() - - -def human_size(size_bytes: int) -> str: - ''' - Convert bytes to human readable string - - :param size_bytes: Size in bytes - ''' - - if not size_bytes: - return '0B' - - units = ('B', 'KB', 'MB', 'GB') - size = float(size_bytes) - unit_index = 0 - - while size >= 1024 and unit_index < len(units) - 1: - size /= 1024 - unit_index += 1 - - return f"{size:.1f}{units[unit_index]}" - - -def parse_status_codes(codes_str: str) -> set: - ''' - Parse comma-separated status codes into a set of integers - - :param codes_str: Comma-separated status codes - ''' - - try: - return {int(code.strip()) for code in codes_str.split(',')} - except ValueError: - raise argparse.ArgumentTypeError('Status codes must be comma-separated numbers (e.g., 200,301,404)') - - def format_status_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str: ''' Format the output with colored sections @@ -489,7 +456,7 @@ def format_status_output(result: dict, debug: bool = False, show_fields: dict = status = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}" elif 300 <= result['status'] < 400: status = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}" - else: # 400+ and 500+ codes + else: # 400+ and 500+ codes status = f"{Colors.RED}[{result['status']}]{Colors.RESET}" parts.append(status) @@ -565,6 +532,25 @@ def format_status_output(result: dict, debug: bool = False, show_fields: dict = return ' '.join(parts) +def parse_status_codes(codes_str: str) -> set: + ''' + Parse comma-separated status codes and ranges into a set of integers + + :param codes_str: Comma-separated status codes (e.g., "200,301-399,404,500-503") + ''' + codes = set() + try: + for part in codes_str.split(','): + if '-' in part: + start, end = map(int, part.split('-')) + codes.update(range(start, end + 1)) + else: + codes.add(int(part)) + return codes + except ValueError: + raise argparse.ArgumentTypeError('Invalid status code format. Use comma-separated numbers or ranges (e.g., 200,301-399,404,500-503)') + + async def process_domains(input_source: str = None, debug: bool = False, concurrent_limit: int = 100, show_fields: dict = None, output_file: str = None, jsonl: bool = None, timeout: int = 5, match_codes: set = None, exclude_codes: set = None, show_progress: bool = False, check_axfr: bool = False, resolver_file: str = None): ''' Process domains from a file or stdin with concurrent requests @@ -582,29 +568,31 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr :param resolver_file: Path to file containing DNS resolvers ''' - if input_source and input_source != '-' and not Path(input_source).exists(): + if input_source and input_source != '-' and not os.path.exists(input_source): raise FileNotFoundError(f'Domain file not found: {input_source}') # Clear the output file if specified if output_file: open(output_file, 'w').close() - tasks = set() + tasks = set() processed_domains = 0 # Simple counter for all processed domains - # Load resolvers - resolvers = load_resolvers(resolver_file) - + # Load resolvers - await the coroutine + resolvers = await load_resolvers(resolver_file) + + async def write_result(result: dict): + '''Write a single result to the output file''' nonlocal processed_domains # Create JSON output dict with required fields output_dict = { - 'url': result['url'], - 'domain': result['domain'], - 'status': result['status'], - 'port': result['port'] # Always include port in output + 'url' : result['url'], + 'domain' : result['domain'], + 'status' : result['status'], + 'port' : result['port'] } # Add optional fields if they exist @@ -643,16 +631,16 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr else: processed_domains += 1 # Increment counter for each domain processed if show_progress: - info(f"{Colors.GRAY}[{processed_domains}]{Colors.RESET} {formatted}") + info(f"{Colors.GRAY}[{processed_domains:,}]{Colors.RESET} {formatted}") async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: # Start initial batch of tasks - for domain in itertools.islice(domain_generator(input_source), concurrent_limit): + for domain in itertools.islice(input_generator(input_source), concurrent_limit): task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout, check_axfr=check_axfr, resolvers=resolvers)) tasks.add(task) # Process remaining domains, maintaining concurrent_limit active tasks - domains_iter = domain_generator(input_source) + domains_iter = input_generator(input_source) next(itertools.islice(domains_iter, concurrent_limit, concurrent_limit), None) # Skip first concurrent_limit domains for domain in domains_iter: @@ -674,66 +662,45 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr await write_result(result) -async def try_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None: - ''' - Try AXFR transfer using pre-resolved nameserver IPs - - :param domain: Domain to attempt AXFR transfer - :param ns_ips: Dictionary of nameserver hostnames to their IPs - :param timeout: Timeout in seconds - ''' - try: - os.makedirs('axfrout', exist_ok=True) - - for ns_host, ips in ns_ips.items(): - for ns_ip in ips: - try: - zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout)) - filename = f'axfrout/{domain}_{ns_ip}.zone' - with open(filename, 'w') as f: - zone.to_text(f) - info(f'{Colors.RED}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}') - except Exception as e: - debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}') - except Exception as e: - debug(f'Failed AXFR for {domain}: {str(e)}') - - def main(): '''Main function to handle command line arguments and run the domain checker''' + global _SILENT_MODE + # Setup argument parser parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter) + + # Add arguments parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin') - parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information') - parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks') - parser.add_argument('-o', '--output', help='Output file path (JSONL format)') - parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console') - - # Add all-flags argument parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags') + parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information') + parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks') + parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console') + parser.add_argument('-o', '--output', help='Output file path (JSONL format)') # Output field flags - parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code') - parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type') - parser.add_argument('-ti', '--title', action='store_true', help='Show page title') parser.add_argument('-b', '--body', action='store_true', help='Show body preview') - parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses') - parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash') - parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers') - parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length') - parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)') parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records') + parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length') + parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type') + parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash') + parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)') + parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers') + parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses') + parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code') + parser.add_argument('-ti', '--title', action='store_true', help='Show page title') parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information') # Other arguments - parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds') - parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)') - parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)') - parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter') parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers') - parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)') + parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)') + parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)') + parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter') + parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)') + parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds') + + # Parse arguments args = parser.parse_args() # Set silent mode based on jsonl argument @@ -741,41 +708,51 @@ def main(): # Only setup logging if we're not in silent mode if not _SILENT_MODE: - apv.setup_logging(level='DEBUG' if args.debug else 'INFO') - info(f'{Colors.BOLD}Starting domain checker...{Colors.RESET}') - if args.file == '-': - info('Reading domains from stdin') - else: - info(f'Processing file: {Colors.UNDERLINE}{args.file}{Colors.RESET}') - info(f'Concurrent checks: {args.concurrent}') + # Setup logging + if args.debug: + apv.setup_logging(level='DEBUG', log_to_disk=True, log_file_name='havoc', show_details=True) + logging.debug('Debug logging enabled') + else: + apv.setup_logging(level='INFO') + + logging.info('Starting domain checker...') + + if args.file == '-': + logging.info('Reading domains from stdin') + else: + logging.info(f'Processing file: {args.file}') + + # Setup show_fields show_fields = { 'status_code' : args.all_flags or args.status_code, 'content_type' : args.all_flags or args.content_type, + 'content_length' : args.all_flags or args.content_length, 'title' : args.all_flags or args.title, 'body' : args.all_flags or args.body, 'ip' : args.all_flags or args.ip, 'favicon' : args.all_flags or args.favicon, 'headers' : args.all_flags or args.headers, - 'content_length' : args.all_flags or args.content_length, 'follow_redirects' : args.all_flags or args.follow_redirects, 'cname' : args.all_flags or args.cname, 'tls' : args.all_flags or args.tls_info } - # If no fields specified and no -all flag, show all (maintain existing behavior) + # If no fields specified show all if not any(show_fields.values()): show_fields = {k: True for k in show_fields} try: + # Run the domain checker asyncio.run(process_domains(args.file, args.debug, args.concurrent, show_fields, args.output, args.jsonl, args.timeout, args.match_codes, args.exclude_codes, args.progress, check_axfr=args.axfr, resolver_file=args.resolvers)) except KeyboardInterrupt: - logging.warning(f'{Colors.YELLOW}Process interrupted by user{Colors.RESET}') + logging.warning('Process interrupted by user') sys.exit(1) except Exception as e: - logging.error(f'{Colors.RED}An error occurred: {str(e)}{Colors.RESET}') + logging.error(f'Unexpected error: {str(e)}') sys.exit(1) + if __name__ == '__main__': main() \ No newline at end of file