Compare commits

..

12 Commits
v2.0.7 ... main

Author SHA1 Message Date
19525aec7d
sup roarie 2025-02-12 03:03:20 -05:00
f1f5a78ae0
sup tommyrot 2025-02-12 02:59:51 -05:00
9a4b7e977a
fixed chunk output 2025-02-12 02:57:44 -05:00
e220648a1a
fixed chunk output 2025-02-12 02:55:31 -05:00
7fe571ddad
fixed chunk output 2025-02-12 00:50:02 -05:00
6dacafeee5
fixed chunk output 2025-02-12 00:35:35 -05:00
41d7e53d30
fixed chunk output 2025-02-12 00:32:28 -05:00
db9590f59d
faster processing 2025-02-12 00:28:46 -05:00
90b5134c25
Added unit test 2025-02-11 22:30:22 -05:00
3d8b2d8e4f
fuck 2025-02-11 21:48:34 -05:00
63517430b7
fuck 2025-02-11 21:44:58 -05:00
311d37108a
fuck 2025-02-11 21:40:49 -05:00
11 changed files with 562 additions and 261 deletions

View File

@ -2,8 +2,8 @@
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/__init__.py # httpz_scanner/__init__.py
from .scanner import HTTPZScanner
from .colors import Colors from .colors import Colors
from .scanner import HTTPZScanner
__version__ = '2.0.7' __version__ = '2.1.8'

View File

@ -4,8 +4,11 @@
import asyncio import asyncio
import sys import sys
from .cli import main from .cli import main
if __name__ == '__main__': if __name__ == '__main__':
try: try:
asyncio.run(main()) asyncio.run(main())

View File

@ -4,16 +4,19 @@
import argparse import argparse
import asyncio import asyncio
import json
import logging import logging
import os import os
import sys import sys
import json
from datetime import datetime
from .colors import Colors from .colors import Colors
from .formatters import format_console_output
from .parsers import parse_status_codes, parse_shard
from .scanner import HTTPZScanner from .scanner import HTTPZScanner
from .utils import SILENT_MODE, info from .utils import SILENT_MODE, info
from .parsers import parse_status_codes, parse_shard
from .formatters import format_console_output
def setup_logging(level='INFO', log_to_disk=False): def setup_logging(level='INFO', log_to_disk=False):
''' '''
@ -22,16 +25,16 @@ def setup_logging(level='INFO', log_to_disk=False):
:param level: Logging level (INFO or DEBUG) :param level: Logging level (INFO or DEBUG)
:param log_to_disk: Whether to also log to file :param log_to_disk: Whether to also log to file
''' '''
class ColoredFormatter(logging.Formatter): class ColoredFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None): def formatTime(self, record):
# Format: MM-DD HH:MM
from datetime import datetime
dt = datetime.fromtimestamp(record.created) dt = datetime.fromtimestamp(record.created)
return f"{Colors.GRAY}{dt.strftime('%m-%d %H:%M')}{Colors.RESET}" return f'{Colors.GRAY}{dt.strftime("%m-%d %H:%M")}{Colors.RESET}'
def format(self, record): def format(self, record):
return f'{self.formatTime(record)} {record.getMessage()}' return f'{self.formatTime(record)} {record.getMessage()}'
# Setup logging handlers
handlers = [] handlers = []
# Console handler # Console handler
@ -47,48 +50,50 @@ def setup_logging(level='INFO', log_to_disk=False):
handlers.append(file_handler) handlers.append(file_handler)
# Setup logger # Setup logger
logging.basicConfig( logging.basicConfig(level=getattr(logging, level.upper()), handlers=handlers)
level=getattr(logging, level.upper()),
handlers=handlers
)
async def main(): async def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}',
formatter_class=argparse.RawDescriptionHelpFormatter
)
# Add arguments # Add arguments
parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin') parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags') parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags')
parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information') parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks') parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks')
parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console') parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console')
parser.add_argument('-o', '--output', help='Output file path (JSONL format)') parser.add_argument('-o', '--output', help='Output file path (JSONL format)')
# Output field flags # Output field flags
parser.add_argument('-b', '--body', action='store_true', help='Show body preview') parser.add_argument('-b', '--body', action='store_true', help='Show body preview')
parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records') parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records')
parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length') parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length')
parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type') parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type')
parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash') parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)') parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers') parser.add_argument('-hr', '--show-headers', action='store_true', help='Show response headers')
parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses') parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses')
parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code') parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code')
parser.add_argument('-ti', '--title', action='store_true', help='Show page title') parser.add_argument('-ti', '--title', action='store_true', help='Show page title')
parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information') parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information')
# Other arguments # Other arguments
parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers') parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers')
parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)') parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)')
parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)') parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)')
parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter') parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter')
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)') parser.add_argument('-pd', '--post-data', help='Send POST request with this data')
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)')
parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds') parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds')
# Add shard argument # Add shard argument
parser.add_argument('-sh','--shard', type=parse_shard, help='Shard index and total shards (e.g., 1/3)') parser.add_argument('-sh','--shard', type=parse_shard, help='Shard index and total shards (e.g., 1/3)')
# Add this to the argument parser section
parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")')
# Add these arguments in the parser section
parser.add_argument('-hd', '--headers', help='Custom headers to send with each request (format: "Header1: value1,Header2: value2")')
# If no arguments provided, print help and exit # If no arguments provided, print help and exit
if len(sys.argv) == 1: if len(sys.argv) == 1:
@ -121,7 +126,7 @@ async def main():
'body' : args.all_flags or args.body, 'body' : args.all_flags or args.body,
'ip' : args.all_flags or args.ip, 'ip' : args.all_flags or args.ip,
'favicon' : args.all_flags or args.favicon, 'favicon' : args.all_flags or args.favicon,
'headers' : args.all_flags or args.headers, 'headers' : args.all_flags or args.show_headers,
'follow_redirects' : args.all_flags or args.follow_redirects, 'follow_redirects' : args.all_flags or args.follow_redirects,
'cname' : args.all_flags or args.cname, 'cname' : args.all_flags or args.cname,
'tls' : args.all_flags or args.tls_info 'tls' : args.all_flags or args.tls_info
@ -145,7 +150,10 @@ async def main():
show_fields=show_fields, show_fields=show_fields,
match_codes=args.match_codes, match_codes=args.match_codes,
exclude_codes=args.exclude_codes, exclude_codes=args.exclude_codes,
shard=args.shard shard=args.shard,
paths=args.paths.split(',') if args.paths else None,
custom_headers=dict(h.split(': ', 1) for h in args.headers.split(',')) if args.headers else None,
post_data=args.post_data
) )
count = 0 count = 0
@ -154,10 +162,11 @@ async def main():
if args.output: if args.output:
with open(args.output, 'a') as f: with open(args.output, 'a') as f:
f.write(json.dumps(result) + '\n') f.write(json.dumps(result) + '\n')
f.flush() # Ensure file output is immediate
# Handle JSON output separately # Handle JSON output separately
if args.jsonl: if args.jsonl:
print(json.dumps(result)) print(json.dumps(result), flush=True) # Force flush
continue continue
# Only output and increment counter if we have content to show for normal output # Only output and increment counter if we have content to show for normal output
@ -166,8 +175,9 @@ async def main():
if args.progress: if args.progress:
count += 1 count += 1
info(f"[{count}] {formatted}") info(f"[{count}] {formatted}")
sys.stdout.flush() # Force flush after each domain
else: else:
print(formatted) print(formatted, flush=True) # Force flush
except KeyboardInterrupt: except KeyboardInterrupt:
logging.warning('Process interrupted by user') logging.warning('Process interrupted by user')
@ -176,9 +186,12 @@ async def main():
logging.error(f'Unexpected error: {str(e)}') logging.error(f'Unexpected error: {str(e)}')
sys.exit(1) sys.exit(1)
def run(): def run():
'''Entry point for the CLI''' '''Entry point for the CLI'''
asyncio.run(main()) asyncio.run(main())
if __name__ == '__main__': if __name__ == '__main__':
run() run()

View File

@ -4,7 +4,8 @@
class Colors: class Colors:
'''ANSI color codes for terminal output''' '''ANSI color codes for terminal output'''
HEADER = '\033[95m' # Light purple
HEADER = '\033[95m'
BLUE = '\033[94m' BLUE = '\033[94m'
GREEN = '\033[92m' GREEN = '\033[92m'
YELLOW = '\033[93m' YELLOW = '\033[93m'
@ -12,9 +13,9 @@ class Colors:
BOLD = '\033[1m' BOLD = '\033[1m'
UNDERLINE = '\033[4m' UNDERLINE = '\033[4m'
RESET = '\033[0m' RESET = '\033[0m'
PURPLE = '\033[35m' # Dark purple PURPLE = '\033[35m'
LIGHT_RED = '\033[38;5;203m' # Light red LIGHT_RED = '\033[38;5;203m'
DARK_GREEN = '\033[38;5;22m' # Dark green DARK_GREEN = '\033[38;5;22m'
PINK = '\033[38;5;198m' # Bright pink PINK = '\033[38;5;198m'
GRAY = '\033[90m' # Gray color GRAY = '\033[90m'
CYAN = '\033[96m' # Cyan color CYAN = '\033[96m'

View File

@ -4,14 +4,23 @@
import asyncio import asyncio
import os import os
import aiohttp
import dns.asyncresolver try:
import dns.query import aiohttp
import dns.resolver except ImportError:
import dns.zone raise ImportError('missing aiohttp library (pip install aiohttp)')
try:
import dns.asyncresolver
import dns.query
import dns.resolver
import dns.zone
except ImportError:
raise ImportError('missing dnspython library (pip install dnspython)')
from .utils import debug, info, SILENT_MODE from .utils import debug, info, SILENT_MODE
async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple: async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple:
''' '''
Resolve all DNS records for a domain Resolve all DNS records for a domain
@ -21,36 +30,35 @@ async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None,
:param nameserver: Specific nameserver to use :param nameserver: Specific nameserver to use
:param check_axfr: Whether to attempt zone transfer :param check_axfr: Whether to attempt zone transfer
''' '''
# Setup resolver
resolver = dns.asyncresolver.Resolver() resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout resolver.lifetime = timeout
if nameserver: if nameserver:
resolver.nameservers = [nameserver] resolver.nameservers = [nameserver]
results = await asyncio.gather(*[resolver.resolve(domain, rtype) # Resolve all DNS records
for rtype in ('NS', 'A', 'AAAA', 'CNAME')], results = await asyncio.gather(*[resolver.resolve(domain, rtype) for rtype in ('NS', 'A', 'AAAA', 'CNAME')], return_exceptions=True)
return_exceptions=True)
# Parse results
nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else [] nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else []
ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + \ ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else []) cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
# Get NS IPs
ns_ips = {} ns_ips = {}
if nameservers: if nameservers:
ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) for ns in nameservers for rtype in ('A', 'AAAA')], return_exceptions=True)
for ns in nameservers
for rtype in ('A', 'AAAA')],
return_exceptions=True)
for i, ns in enumerate(nameservers): for i, ns in enumerate(nameservers):
ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] if isinstance(records, dns.resolver.Answer) for ip in records]
if isinstance(records, dns.resolver.Answer)
for ip in records]
# Attempt zone transfer
if check_axfr: if check_axfr:
await attempt_axfr(domain, ns_ips, timeout) await attempt_axfr(domain, ns_ips, timeout)
return sorted(set(ips)), cname, nameservers, ns_ips return sorted(set(ips)), cname, nameservers, ns_ips
async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None: async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
''' '''
Attempt zone transfer for a domain Attempt zone transfer for a domain
@ -59,28 +67,37 @@ async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
:param ns_ips: Dictionary of nameserver hostnames to their IPs :param ns_ips: Dictionary of nameserver hostnames to their IPs
:param timeout: Timeout in seconds :param timeout: Timeout in seconds
''' '''
try: try:
os.makedirs('axfrout', exist_ok=True) os.makedirs('axfrout', exist_ok=True)
# Loop through each NS
for ns_host, ips in ns_ips.items(): for ns_host, ips in ns_ips.items():
# Loop through each NS IP
for ns_ip in ips: for ns_ip in ips:
try: try:
# Attempt zone transfer
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout)) zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
# Write zone to file
with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f: with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f:
zone.to_text(f) zone.to_text(f)
info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})') info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})')
except Exception as e: except Exception as e:
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}') debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e: except Exception as e:
debug(f'Failed AXFR for {domain}: {str(e)}') debug(f'Failed AXFR for {domain}: {str(e)}')
async def load_resolvers(resolver_file: str = None) -> list: async def load_resolvers(resolver_file: str = None) -> list:
''' '''
Load DNS resolvers from file or default source Load DNS resolvers from file or default source
:param resolver_file: Path to file containing resolver IPs :param resolver_file: Path to file containing resolver IPs
:return: List of resolver IPs
''' '''
# Load from file
if resolver_file: if resolver_file:
try: try:
with open(resolver_file) as f: with open(resolver_file) as f:
@ -90,6 +107,7 @@ async def load_resolvers(resolver_file: str = None) -> list:
except Exception as e: except Exception as e:
debug(f'Error loading resolvers from {resolver_file}: {str(e)}') debug(f'Error loading resolvers from {resolver_file}: {str(e)}')
# Load from GitHub
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response: async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
resolvers = await response.text() resolvers = await response.text()

View File

@ -5,6 +5,7 @@
from .colors import Colors from .colors import Colors
from .utils import human_size from .utils import human_size
def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str: def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
''' '''
Format the output with colored sections Format the output with colored sections
@ -37,9 +38,17 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}" status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
parts.append(status) parts.append(status)
# Domain (always shown) # Domain/URL
parts.append(f"[{result['url']}]") parts.append(f"[{result['url']}]")
# Content Type
if show_fields.get('content_type') and result.get('content_type'):
parts.append(f"{Colors.CYAN}[{result['content_type']}]{Colors.RESET}")
# Content Length
if show_fields.get('content_length') and result.get('content_length'):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# Title # Title
if show_fields.get('title') and result.get('title'): if show_fields.get('title') and result.get('title'):
parts.append(f"{Colors.DARK_GREEN}[{result['title']}]{Colors.RESET}") parts.append(f"{Colors.DARK_GREEN}[{result['title']}]{Colors.RESET}")
@ -59,8 +68,8 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
parts.append(f"{Colors.PURPLE}[{result['favicon_hash']}]{Colors.RESET}") parts.append(f"{Colors.PURPLE}[{result['favicon_hash']}]{Colors.RESET}")
# Headers # Headers
if show_fields.get('headers') and result.get('headers'): if show_fields.get('headers') and result.get('response_headers'):
headers_text = [f"{k}: {v}" for k, v in result['headers'].items()] headers_text = [f"{k}: {v}" for k, v in result['response_headers'].items()]
parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}") parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}")
else: else:
if show_fields.get('content_type') and result.get('content_type'): if show_fields.get('content_type') and result.get('content_type'):
@ -72,18 +81,18 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
parts.append(f"{Colors.PINK}[{size}]{Colors.RESET}") parts.append(f"{Colors.PINK}[{size}]{Colors.RESET}")
except (ValueError, TypeError): except (ValueError, TypeError):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}") parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# CNAME
if show_fields.get('cname') and result.get('cname'):
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# Redirect Chain # Redirect Chain
if show_fields.get('follow_redirects') and result.get('redirect_chain'): if show_fields.get('follow_redirects') and result.get('redirect_chain'):
chain = ' -> '.join(result['redirect_chain']) chain = ' -> '.join(result['redirect_chain'])
parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}") parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}")
# CNAME
if show_fields.get('cname') and result.get('cname'):
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# TLS Certificate Info # TLS Certificate Info
if result.get('tls'): if show_fields.get('tls') and result.get('tls'):
cert = result['tls'] cert = result['tls']
tls_parts = [] tls_parts = []
if cert.get('common_name'): if cert.get('common_name'):

View File

@ -2,6 +2,8 @@
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/parsers.py # httpz_scanner/parsers.py
import argparse
try: try:
import bs4 import bs4
except ImportError: except ImportError:
@ -20,7 +22,6 @@ except ImportError:
raise ImportError('missing mmh3 module (pip install mmh3)') raise ImportError('missing mmh3 module (pip install mmh3)')
from .utils import debug, error from .utils import debug, error
import argparse
def parse_domain_url(domain: str) -> tuple: def parse_domain_url(domain: str) -> tuple:
@ -41,20 +42,13 @@ def parse_domain_url(domain: str) -> tuple:
try: try:
port = int(port_str.split('/')[0]) port = int(port_str.split('/')[0])
except ValueError: except ValueError:
port = 443 if protocol == 'https://' else 80 port = None
else:
port = 443 if protocol == 'https://' else 80
protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}']
else: else:
if ':' in base_domain.split('/')[0]: if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1) base_domain, port_str = base_domain.split(':', 1)
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else 443 port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else None
else:
port = 443 protocols = ['http://', 'https://'] # Always try HTTP first
protocols = [
f'https://{base_domain}{":" + str(port) if port else ""}',
f'http://{base_domain}{":" + str(port) if port else ""}'
]
return base_domain, port, protocols return base_domain, port, protocols
@ -188,7 +182,7 @@ def parse_title(html: str, content_type: str = None) -> str:
:param html: HTML content of the page :param html: HTML content of the page
:param content_type: Content-Type header value :param content_type: Content-Type header value
''' '''
# Only parse title for HTML content # Only parse title for HTML content
if content_type and not any(x in content_type.lower() for x in ['text/html', 'application/xhtml']): if content_type and not any(x in content_type.lower() for x in ['text/html', 'application/xhtml']):
return None return None

View File

@ -3,8 +3,9 @@
# httpz_scanner/scanner.py # httpz_scanner/scanner.py
import asyncio import asyncio
import json
import random import random
import urllib.parse
import json
try: try:
import aiohttp import aiohttp
@ -16,17 +17,15 @@ try:
except ImportError: except ImportError:
raise ImportError('missing bs4 module (pip install beautifulsoup4)') raise ImportError('missing bs4 module (pip install beautifulsoup4)')
from .dns import resolve_all_dns, load_resolvers from .dns import resolve_all_dns, load_resolvers
from .formatters import format_console_output from .parsers import parse_domain_url, get_cert_info, get_favicon_hash
from .colors import Colors from .utils import debug, USER_AGENTS, input_generator
from .parsers import parse_domain_url, get_cert_info, get_favicon_hash, parse_title
from .utils import debug, info, USER_AGENTS, input_generator
class HTTPZScanner: class HTTPZScanner:
'''Core scanner class for HTTP domain checking''' '''Core scanner class for HTTP domain checking'''
def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None): def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None, paths = None, custom_headers=None, post_data=None):
''' '''
Initialize the HTTPZScanner class Initialize the HTTPZScanner class
@ -43,6 +42,9 @@ class HTTPZScanner:
:param match_codes: Status codes to match :param match_codes: Status codes to match
:param exclude_codes: Status codes to exclude :param exclude_codes: Status codes to exclude
:param shard: Tuple of (shard_index, total_shards) for distributed scanning :param shard: Tuple of (shard_index, total_shards) for distributed scanning
:param paths: List of additional paths to check on each domain
:param custom_headers: Dictionary of custom headers to send with each request
:param post_data: Data to send with POST requests
''' '''
self.concurrent_limit = concurrent_limit self.concurrent_limit = concurrent_limit
@ -55,6 +57,9 @@ class HTTPZScanner:
self.debug_mode = debug_mode self.debug_mode = debug_mode
self.jsonl_output = jsonl_output self.jsonl_output = jsonl_output
self.shard = shard self.shard = shard
self.paths = paths or []
self.custom_headers = custom_headers or {}
self.post_data = post_data
self.show_fields = show_fields or { self.show_fields = show_fields or {
'status_code' : True, 'status_code' : True,
@ -77,106 +82,110 @@ class HTTPZScanner:
self.progress_count = 0 self.progress_count = 0
async def init(self):
'''Initialize resolvers - must be called before scanning'''
self.resolvers = await load_resolvers(self.resolver_file)
async def check_domain(self, session: aiohttp.ClientSession, domain: str): async def check_domain(self, session: aiohttp.ClientSession, domain: str):
'''Check a single domain and return results''' '''Check a single domain and return results'''
nameserver = random.choice(self.resolvers) if self.resolvers else None
base_domain, port, protocols = parse_domain_url(domain) base_domain, port, protocols = parse_domain_url(domain)
result = { for protocol in protocols:
'domain' : base_domain, url = f'{protocol}{base_domain}'
'status' : 0, if port:
'url' : protocols[0], url += f':{port}'
'port' : port,
}
# Try each protocol
for url in protocols:
try:
# Set random user agent for each request
headers = {'User-Agent': random.choice(USER_AGENTS)}
async with session.get(url, timeout=self.timeout, try:
allow_redirects=self.follow_redirects, debug(f'Trying {url}...')
max_redirects=10 if self.follow_redirects else 0, result = await self._check_url(session, url)
headers=headers) as response: debug(f'Got result for {url}: {result}')
if result and (result['status'] != 400 or result.get('redirect_chain')): # Accept redirects
result['status'] = response.status return result
# Early exit if status code doesn't match criteria
if self.match_codes and result['status'] not in self.match_codes:
return result
if self.exclude_codes and result['status'] in self.exclude_codes:
return result
# Continue with full processing only if status code matches criteria
result['url'] = str(response.url)
# Add headers if requested
headers = dict(response.headers)
if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')):
result['headers'] = headers
else:
# Only add content type/length if headers aren't included
if content_type := response.headers.get('content-type', '').split(';')[0]:
result['content_type'] = content_type
if content_length := response.headers.get('content-length'):
result['content_length'] = content_length
# Only add redirect chain if it exists
if self.follow_redirects and response.history:
result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
# Do DNS lookups only if we're going to use the result
ips, cname, nameservers, _ = await resolve_all_dns(
base_domain, self.timeout, nameserver, self.check_axfr
)
# Only add DNS fields if they have values
if ips:
result['ips'] = ips
if cname:
result['cname'] = cname
if nameservers:
result['nameservers'] = nameservers
# Only add TLS info if available
if response.url.scheme == 'https':
try:
if ssl_object := response._protocol.transport.get_extra_info('ssl_object'):
if tls_info := await get_cert_info(ssl_object, str(response.url)):
# Only add TLS fields that have values
result['tls'] = {k: v for k, v in tls_info.items() if v}
except AttributeError:
debug(f'Failed to get SSL info for {url}')
content_type = response.headers.get('Content-Type', '')
html = await response.text() if any(x in content_type.lower() for x in ['text/html', 'application/xhtml']) else None
# Only add title if it exists
if soup := bs4.BeautifulSoup(html, 'html.parser'):
if soup.title and soup.title.string:
result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300]
# Only add body if it exists
if body_text := soup.get_text():
result['body'] = ' '.join(body_text.split()).rstrip('.')[:500]
# Only add favicon hash if it exists
if favicon_hash := await get_favicon_hash(session, url, html):
result['favicon_hash'] = favicon_hash
break
except Exception as e: except Exception as e:
debug(f'Error checking {url}: {str(e)}') debug(f'Error checking {url}: {str(e)}')
result['status'] = -1
continue continue
return None
return result async def _check_url(self, session: aiohttp.ClientSession, url: str):
'''Check a single URL and return results'''
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
headers.update(self.custom_headers)
debug(f'Making request to {url} with headers: {headers}')
async with session.request('GET', url,
timeout=self.timeout,
allow_redirects=True, # Always follow redirects
max_redirects=10,
ssl=False, # Don't verify SSL
headers=headers) as response:
debug(f'Got response from {url}: status={response.status}, headers={dict(response.headers)}')
result = {
'domain': urllib.parse.urlparse(url).hostname,
'status': response.status,
'url': str(response.url),
'response_headers': dict(response.headers)
}
if response.history:
result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
debug(f'Redirect chain for {url}: {result["redirect_chain"]}')
return result
except aiohttp.ClientSSLError as e:
debug(f'SSL Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'SSL Error: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'SSL'
}
except aiohttp.ClientConnectorCertificateError as e:
debug(f'Certificate Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Certificate Error: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'CERT'
}
except aiohttp.ClientConnectorError as e:
debug(f'Connection Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Connection Failed: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'CONN'
}
except aiohttp.ClientError as e:
debug(f'HTTP Error for {url}: {e.__class__.__name__}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'HTTP Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'HTTP'
}
except asyncio.TimeoutError:
debug(f'Timeout for {url}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Connection Timed Out after {self.timeout}s',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'TIMEOUT'
}
except Exception as e:
debug(f'Unexpected error for {url}: {e.__class__.__name__}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'UNKNOWN'
}
async def scan(self, input_source): async def scan(self, input_source):
@ -192,75 +201,93 @@ class HTTPZScanner:
''' '''
if not self.resolvers: if not self.resolvers:
await self.init() self.resolvers = await load_resolvers(self.resolver_file)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: # Just use ssl=False, that's all we need
tasks = set() connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True)
count = 0 # Move counter here since that's all process_result was doing async with aiohttp.ClientSession(connector=connector) as session:
tasks = {} # Change to dict to track domain for each task
domain_queue = asyncio.Queue()
queue_empty = False
# Handle different input types async def process_domain(domain):
if isinstance(input_source, str): try:
# File or stdin input result = await self.check_domain(session, domain)
gen = input_generator(input_source, self.shard)
async for domain in gen:
if len(tasks) >= self.concurrent_limit:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
if self.show_progress:
count += 1 # Increment counter here
yield result
task = asyncio.create_task(self.check_domain(session, domain))
tasks.add(task)
elif isinstance(input_source, (list, tuple)):
# List/tuple input
for line_num, domain in enumerate(input_source):
if domain := str(domain).strip():
if self.shard is None or line_num % self.shard[1] == self.shard[0]:
if len(tasks) >= self.concurrent_limit:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
if self.show_progress:
count += 1
yield result
task = asyncio.create_task(self.check_domain(session, domain))
tasks.add(task)
else:
# Async generator input
line_num = 0
async for domain in input_source:
if isinstance(domain, bytes):
domain = domain.decode()
domain = domain.strip()
if domain:
if self.shard is None or line_num % self.shard[1] == self.shard[0]:
if len(tasks) >= self.concurrent_limit:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
if self.show_progress:
count += 1
yield result
task = asyncio.create_task(self.check_domain(session, domain))
tasks.add(task)
line_num += 1
# Process remaining tasks
if tasks:
done, _ = await asyncio.wait(tasks)
for task in done:
result = await task
if self.show_progress: if self.show_progress:
count += 1 self.progress_count += 1
yield result if result:
return domain, result
else:
# Create a proper error result if check_domain returns None
return domain, {
'domain': domain,
'status': -1,
'error': 'No successful response from either HTTP or HTTPS',
'protocol': 'unknown',
'error_type': 'NO_RESPONSE'
}
except Exception as e:
debug(f'Error processing {domain}: {e.__class__.__name__}: {str(e)}')
# Return structured error information
return domain, {
'domain': domain,
'status': -1,
'error': f'{e.__class__.__name__}: {str(e)}',
'protocol': 'unknown',
'error_type': 'PROCESS'
}
# Queue processor
async def queue_processor():
async for domain in input_generator(input_source, self.shard):
await domain_queue.put(domain)
self.processed_domains += 1
nonlocal queue_empty
queue_empty = True
# Start queue processor
queue_task = asyncio.create_task(queue_processor())
try:
while not (queue_empty and domain_queue.empty() and not tasks):
# Fill up tasks until we hit concurrent limit
while len(tasks) < self.concurrent_limit and not domain_queue.empty():
domain = await domain_queue.get()
task = asyncio.create_task(process_domain(domain))
tasks[task] = domain
if tasks:
# Wait for at least one task to complete
done, _ = await asyncio.wait(
tasks.keys(),
return_when=asyncio.FIRST_COMPLETED
)
# Process completed tasks
for task in done:
domain = tasks.pop(task)
try:
_, result = await task
if result:
yield result
except Exception as e:
debug(f'Task error for {domain}: {e.__class__.__name__}: {str(e)}')
yield {
'domain': domain,
'status': -1,
'error': f'Task Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'unknown',
'error_type': 'TASK'
}
else:
await asyncio.sleep(0.1) # Prevent CPU spin when no tasks
finally:
# Clean up
for task in tasks:
task.cancel()
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass

View File

@ -68,8 +68,9 @@ USER_AGENTS = [
] ]
def debug(msg: str): def debug(msg: str):
if not SILENT_MODE: logging.debug(msg) if not SILENT_MODE:
logging.debug(msg)
def error(msg: str): def error(msg: str):
if not SILENT_MODE: logging.error(msg) if not SILENT_MODE: logging.error(msg)
def info(msg: str): def info(msg: str):
@ -117,7 +118,7 @@ async def input_generator(input_source, shard: tuple = None):
# Handle stdin # Handle stdin
if input_source == '-' or input_source is None: if input_source == '-' or input_source is None:
for line in sys.stdin: for line in sys.stdin:
await asyncio.sleep(0) # Yield control await asyncio.sleep(0)
if line := line.strip(): if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]: if shard is None or line_num % shard[1] == shard[0]:
yield line yield line
@ -127,7 +128,7 @@ async def input_generator(input_source, shard: tuple = None):
elif isinstance(input_source, str) and os.path.exists(input_source): elif isinstance(input_source, str) and os.path.exists(input_source):
with open(input_source, 'r') as f: with open(input_source, 'r') as f:
for line in f: for line in f:
await asyncio.sleep(0) # Yield control await asyncio.sleep(0)
if line := line.strip(): if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]: if shard is None or line_num % shard[1] == shard[0]:
yield line yield line
@ -136,7 +137,7 @@ async def input_generator(input_source, shard: tuple = None):
# Handle iterables (generators, lists, etc) # Handle iterables (generators, lists, etc)
elif hasattr(input_source, '__iter__') and not isinstance(input_source, (str, bytes)): elif hasattr(input_source, '__iter__') and not isinstance(input_source, (str, bytes)):
for line in input_source: for line in input_source:
await asyncio.sleep(0) # Yield control await asyncio.sleep(0)
if isinstance(line, bytes): if isinstance(line, bytes):
line = line.decode() line = line.decode()
if line := line.strip(): if line := line.strip():
@ -149,7 +150,7 @@ async def input_generator(input_source, shard: tuple = None):
if isinstance(input_source, bytes): if isinstance(input_source, bytes):
input_source = input_source.decode() input_source = input_source.decode()
for line in input_source.splitlines(): for line in input_source.splitlines():
await asyncio.sleep(0) # Yield control await asyncio.sleep(0)
if line := line.strip(): if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]: if shard is None or line_num % shard[1] == shard[0]:
yield line yield line

View File

@ -10,7 +10,7 @@ with open('README.md', 'r', encoding='utf-8') as f:
setup( setup(
name='httpz_scanner', name='httpz_scanner',
version='2.0.7', version='2.1.8',
author='acidvegas', author='acidvegas',
author_email='acid.vegas@acid.vegas', author_email='acid.vegas@acid.vegas',
description='Hyper-fast HTTP Scraping Tool', description='Hyper-fast HTTP Scraping Tool',

235
unit_test.py Normal file
View File

@ -0,0 +1,235 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Unit Tests
# unit_test.py
import asyncio
import logging
import sys
import time
try:
from httpz_scanner import HTTPZScanner
from httpz_scanner.colors import Colors
except ImportError:
raise ImportError('missing httpz_scanner library (pip install httpz_scanner)')
class ColoredFormatter(logging.Formatter):
'''Custom formatter for colored log output'''
def format(self, record):
if record.levelno == logging.INFO:
color = Colors.GREEN
elif record.levelno == logging.WARNING:
color = Colors.YELLOW
elif record.levelno == logging.ERROR:
color = Colors.RED
else:
color = Colors.RESET
record.msg = f'{color}{record.msg}{Colors.RESET}'
return super().format(record)
# Configure logging with colors
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(handler)
async def get_domains_from_url() -> list:
'''
Fetch domains from SecLists URL
:return: List of domains
'''
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return [line.strip() for line in content.splitlines() if line.strip()]
async def domain_generator(domains: list):
'''
Async generator that yields domains
:param domains: List of domains to yield
'''
for domain in domains:
await asyncio.sleep(0) # Allow other coroutines to run
yield domain
async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple:
'''Run a single benchmark test'''
logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
count = 0
got_first = False
start_time = None
if test_type == 'List':
async for result in scanner.scan(domains):
if result:
if not got_first:
got_first = True
start_time = time.time()
count += 1
# More detailed status reporting
status_str = ''
if result['status'] < 0:
error_type = result.get('error_type', 'UNKNOWN')
error_msg = result.get('error', 'Unknown Error')
status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}"
elif 200 <= result['status'] < 300:
status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
elif 300 <= result['status'] < 400:
status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
else:
status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
# Show protocol and response headers if available
protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else ''
headers_info = ''
if result.get('response_headers'):
important_headers = ['server', 'location', 'content-type']
headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers]
if headers:
headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}"
# Show redirect chain if present
redirect_info = ''
if result.get('redirect_chain'):
redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}"
# Show error details if present
error_info = ''
if result.get('error'):
error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}"
# Show final URL if different from original
url_info = ''
if result.get('url') and result['url'] != f"http(s)://{result['domain']}":
url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}"
logging.info(
f"{test_type}-{concurrency} Result {count}: "
f"{status_str}{protocol_info} "
f"{Colors.CYAN}{result['domain']}{Colors.RESET}"
f"{redirect_info}"
f"{url_info}"
f"{headers_info}"
f"{error_info}"
)
else:
# Skip generator test
pass
elapsed = time.time() - start_time if start_time else 0
domains_per_sec = count/elapsed if elapsed > 0 else 0
logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}')
return elapsed, domains_per_sec
async def test_list_input(domains: list):
'''Test scanning using a list input'''
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
count = 0
async for result in scanner.scan(domains):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
async def test_generator_input(domains: list):
'''Test scanning using an async generator input'''
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
count = 0
async for result in scanner.scan(domain_generator(domains)):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
async def main() -> None:
'''Main test function'''
try:
# Fetch domains
domains = await get_domains_from_url()
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
# Store benchmark results
results = []
# Run tests with different concurrency levels
for concurrency in [25, 50, 100]:
# Generator tests
gen_result = await run_benchmark('Generator', domains, concurrency)
results.append(('Generator', concurrency, *gen_result))
# List tests
list_result = await run_benchmark('List', domains, concurrency)
results.append(('List', concurrency, *list_result))
# Print benchmark comparison
logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}')
logging.info('-' * 80)
logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}')
logging.info('-' * 80)
# Sort by domains per second (fastest first)
results.sort(key=lambda x: x[3], reverse=True)
for test_type, concurrency, elapsed, domains_per_sec in results:
logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}')
# Highlight fastest result
fastest = results[0]
logging.info('-' * 80)
logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections')
logging.info(f'Time: {fastest[2]:.2f} seconds')
logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}')
logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
except Exception as e:
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
sys.exit(1)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}')
sys.exit(1)