From a006a1dac4c41fa47a6ddc82ff6813bde774d498 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Tue, 11 Feb 2025 00:03:28 -0500 Subject: [PATCH] Code cleanup --- httpz.py | 178 +++++++++++++++++++++++-------------------------------- 1 file changed, 74 insertions(+), 104 deletions(-) diff --git a/httpz.py b/httpz.py index c785bfa..7d4e1d5 100644 --- a/httpz.py +++ b/httpz.py @@ -69,15 +69,15 @@ class Colors: GRAY = '\033[90m' # Gray color CYAN = '\033[96m' # Cyan color - -_SILENT_MODE = False +# Global for silent mode +SILENT_MODE = False def debug(msg: str): - if not _SILENT_MODE: logging.debug(msg) + if not SILENT_MODE: logging.debug(msg) def error(msg: str): - if not _SILENT_MODE: logging.error(msg) + if not SILENT_MODE: logging.error(msg) def info(msg: str): - if not _SILENT_MODE: logging.info(msg) + if not SILENT_MODE: logging.info(msg) async def get_cert_info(ssl_object, url: str) -> dict: @@ -130,7 +130,7 @@ async def get_cert_info(ssl_object, url: str) -> dict: 'serial_number' : format(cert.serial_number, 'x'), } except Exception as e: - debug(f'Error getting cert info for {url}: {str(e)}') + error(f'Error getting cert info for {url}: {str(e)}') return None @@ -239,7 +239,7 @@ async def load_resolvers(resolver_file: str = None) -> list: async with aiohttp.ClientSession() as session: async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response: resolvers = await response.text() - if not _SILENT_MODE: + if not SILENT_MODE: info(f'Loaded {len(resolvers.splitlines()):,} resolvers.') return [resolver.strip() for resolver in resolvers.splitlines()] @@ -302,6 +302,43 @@ async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, return sorted(set(ips)), cname, nameservers, ns_ips +def parse_domain_url(domain: str) -> tuple: + ''' + Parse domain string into base domain, port, and protocol list + + :param domain: Raw domain string to parse + :return: Tuple of (base_domain, port, protocols) + ''' + + port = None + base_domain = domain.rstrip('/') + + if base_domain.startswith(('http://', 'https://')): + protocol = 'https://' if base_domain.startswith('https://') else 'http://' + base_domain = base_domain.split('://', 1)[1] + if ':' in base_domain.split('/')[0]: + base_domain, port_str = base_domain.split(':', 1) + try: + port = int(port_str.split('/')[0]) + except ValueError: + port = 443 if protocol == 'https://' else 80 + else: + port = 443 if protocol == 'https://' else 80 + protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}'] + else: + if ':' in base_domain.split('/')[0]: + base_domain, port_str = base_domain.split(':', 1) + port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else 443 + else: + port = 443 + protocols = [ + f'https://{base_domain}{":" + str(port) if port else ""}', + f'http://{base_domain}{":" + str(port) if port else ""}' + ] + + return base_domain, port, protocols + + async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redirects: bool = False, timeout: int = 5, check_axfr: bool = False, resolvers: list = None) -> dict: ''' Check a single domain for its status code, title, and body preview @@ -313,44 +350,10 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir :param check_axfr: whether to check for AXFR :param resolvers: list of DNS resolvers to use ''' - # Pick random resolver for this domain nameserver = random.choice(resolvers) if resolvers else None - - # Parse domain and port - port = None - base_domain = domain.rstrip('/') + base_domain, port, protocols = parse_domain_url(domain) - # Handle URLs with existing protocol - if base_domain.startswith(('http://', 'https://')): - protocol = 'https://' if base_domain.startswith('https://') else 'http://' - base_domain = base_domain.split('://', 1)[1] - # Try to extract port from domain - if ':' in base_domain.split('/')[0]: - base_domain, port_str = base_domain.split(':', 1) - try: - port = int(port_str.split('/')[0]) - except ValueError: - port = 443 if protocol == 'https://' else 80 - else: - port = 443 if protocol == 'https://' else 80 - protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}'] - else: - # No protocol specified - try HTTPS first, then HTTP - if ':' in base_domain.split('/')[0]: - base_domain, port_str = base_domain.split(':', 1) - try: - port = int(port_str.split('/')[0]) - except ValueError: - port = 443 # Default to HTTPS port - else: - port = 443 # Default to HTTPS port when no port specified - protocols = [ - f'https://{base_domain}{":" + str(port) if port else ""}', - f'http://{base_domain}{":" + str(port) if port else ""}' - ] - - result = {} # Start with empty dict - base_result = { + result = { 'domain' : base_domain, 'status' : 0, 'title' : None, @@ -367,37 +370,26 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir 'redirect_chain' : [], 'tls' : None } - result.update(base_result) # Update result with base fields - - # Do all DNS lookups at once - ips, cname, nameservers, ns_ips = await resolve_all_dns(base_domain, timeout, nameserver, check_axfr) - - result['ips'] = ips - result['cname'] = cname - result['nameservers'] = nameservers + # Do DNS lookups + result['ips'], result['cname'], result['nameservers'], _ = await resolve_all_dns(base_domain, timeout, nameserver, check_axfr) + # Try each protocol for url in protocols: try: - max_redirects = 10 if follow_redirects else 0 - async with session.get(url, timeout=timeout, allow_redirects=follow_redirects, max_redirects=max_redirects) as response: - result['status'] = response.status - result['url'] = str(response.url) - result['headers'] = dict(response.headers) - result['content_type'] = response.headers.get('content-type', '').split(';')[0] - result['content_length'] = response.headers.get('content-length') - - # Track redirect chain - if follow_redirects: - result['redirect_chain'] = [str(h.url) for h in response.history] - if result['redirect_chain']: - result['redirect_chain'].append(str(response.url)) + async with session.get(url, timeout=timeout, allow_redirects=follow_redirects, max_redirects=10 if follow_redirects else 0) as response: + result.update({ + 'status' : response.status, + 'url' : str(response.url), + 'headers' : dict(response.headers), + 'content_type' : response.headers.get('content-type', '').split(';')[0], + 'content_length' : response.headers.get('content-length'), + 'redirect_chain' : [str(h.url) for h in response.history] + [str(response.url)] if follow_redirects and response.history else [] + }) - # Try to get cert info for any successful HTTPS connection if response.url.scheme == 'https': try: - ssl_object = response._protocol.transport.get_extra_info('ssl_object') - if ssl_object: # Only get cert info if we have a valid SSL object + if ssl_object := response._protocol.transport.get_extra_info('ssl_object'): result['tls'] = await get_cert_info(ssl_object, str(response.url)) except AttributeError: debug(f'Failed to get SSL info for {url}') @@ -405,27 +397,21 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir if response.status == 200: html = (await response.text())[:1024*1024] soup = bs4.BeautifulSoup(html, 'html.parser') - if soup.title: - title = ' '.join(soup.title.string.strip().split()).rstrip('.') if soup.title.string else '' - result['title'] = title[:300] - if soup.get_text(): - body = ' '.join(soup.get_text().split()).rstrip('.') - result['body'] = body[:500] - result['favicon_hash'] = await get_favicon_hash(session, url, html) + result.update({ + 'title' : ' '.join(soup.title.string.strip().split()).rstrip('.')[:300] if soup.title and soup.title.string else None, + 'body' : ' '.join(soup.get_text().split()).rstrip('.')[:500] if soup.get_text() else None, + 'favicon_hash' : await get_favicon_hash(session, url, html) + }) break except Exception as e: debug(f'Error checking {url}: {str(e)}') result['status'] = -1 continue - # Make absolutely sure port is in the result before returning - if 'port' not in result: - result['port'] = port - return result -def format_status_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str: +def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str: ''' Format the output with colored sections @@ -568,32 +554,24 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr :param resolver_file: Path to file containing DNS resolvers ''' + # Check if input file exists if input_source and input_source != '-' and not os.path.exists(input_source): raise FileNotFoundError(f'Domain file not found: {input_source}') - # Clear the output file if specified - if output_file: - open(output_file, 'w').close() - + # Initialize tasks and processed domains tasks = set() - processed_domains = 0 # Simple counter for all processed domains + processed_domains = 0 # Load resolvers - await the coroutine resolvers = await load_resolvers(resolver_file) - async def write_result(result: dict): - '''Write a single result to the output file''' + nonlocal processed_domains # Create JSON output dict with required fields - output_dict = { - 'url' : result['url'], - 'domain' : result['domain'], - 'status' : result['status'], - 'port' : result['port'] - } + output_dict = {'url': result['url'], 'domain': result['domain'], 'status': result['status'], 'port': result['port']} # Add optional fields if they exist if result['title']: @@ -616,7 +594,8 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr output_dict['nameservers'] = result['nameservers'] # Get formatted output based on filters - formatted = format_status_output(result, debug, show_fields, match_codes, exclude_codes) + formatted = format_console_output(result, debug, show_fields, match_codes, exclude_codes) + if formatted: # Write to file if specified if output_file: @@ -665,7 +644,7 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr def main(): '''Main function to handle command line arguments and run the domain checker''' - global _SILENT_MODE + global SILENT_MODE # Setup argument parser parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter) @@ -698,17 +677,11 @@ def main(): parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter') parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)') parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds') - # Parse arguments args = parser.parse_args() - # Set silent mode based on jsonl argument - _SILENT_MODE = args.jsonl - - # Only setup logging if we're not in silent mode - if not _SILENT_MODE: - + if not (SILENT_MODE := args.jsonl): # Setup logging if args.debug: apv.setup_logging(level='DEBUG', log_to_disk=True, log_file_name='havoc', show_details=True) @@ -716,8 +689,6 @@ def main(): else: apv.setup_logging(level='INFO') - logging.info('Starting domain checker...') - if args.file == '-': logging.info('Reading domains from stdin') else: @@ -743,7 +714,6 @@ def main(): show_fields = {k: True for k in show_fields} try: - # Run the domain checker asyncio.run(process_domains(args.file, args.debug, args.concurrent, show_fields, args.output, args.jsonl, args.timeout, args.match_codes, args.exclude_codes, args.progress, check_axfr=args.axfr, resolver_file=args.resolvers)) except KeyboardInterrupt: logging.warning('Process interrupted by user')