235 lines
9.4 KiB
235 lines
9.4 KiB
#!/usr/bin/env python3
# HTTPZ Web Scanner - Unit Tests
# unit_test.py
import asyncio
import logging
import sys
import time
from httpz_scanner import HTTPZScanner
from httpz_scanner.colors import Colors
except ImportError:
raise ImportError('missing httpz_scanner library (pip install httpz_scanner)')
class ColoredFormatter(logging.Formatter):
'''Custom formatter for colored log output'''
def format(self, record):
if record.levelno == logging.INFO:
color = Colors.GREEN
elif record.levelno == logging.WARNING:
color = Colors.YELLOW
elif record.levelno == logging.ERROR:
color = Colors.RED
color = Colors.RESET
record.msg = f'{color}{record.msg}{Colors.RESET}'
return super().format(record)
# Configure logging with colors
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s'))
async def get_domains_from_url() -> list:
Fetch domains from SecLists URL
:return: List of domains
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return [line.strip() for line in content.splitlines() if line.strip()]
async def domain_generator(domains: list):
Async generator that yields domains
:param domains: List of domains to yield
for domain in domains:
await asyncio.sleep(0) # Allow other coroutines to run
yield domain
async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple:
'''Run a single benchmark test'''
logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
count = 0
got_first = False
start_time = None
if test_type == 'List':
async for result in scanner.scan(domains):
if result:
if not got_first:
got_first = True
start_time = time.time()
count += 1
# More detailed status reporting
status_str = ''
if result['status'] < 0:
error_type = result.get('error_type', 'UNKNOWN')
error_msg = result.get('error', 'Unknown Error')
status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}"
elif 200 <= result['status'] < 300:
status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
elif 300 <= result['status'] < 400:
status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
# Show protocol and response headers if available
protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else ''
headers_info = ''
if result.get('response_headers'):
important_headers = ['server', 'location', 'content-type']
headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers]
if headers:
headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}"
# Show redirect chain if present
redirect_info = ''
if result.get('redirect_chain'):
redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}"
# Show error details if present
error_info = ''
if result.get('error'):
error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}"
# Show final URL if different from original
url_info = ''
if result.get('url') and result['url'] != f"http(s)://{result['domain']}":
url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}"
f"{test_type}-{concurrency} Result {count}: "
f"{status_str}{protocol_info} "
# Skip generator test
elapsed = time.time() - start_time if start_time else 0
domains_per_sec = count/elapsed if elapsed > 0 else 0
logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}')
return elapsed, domains_per_sec
async def test_list_input(domains: list):
'''Test scanning using a list input'''
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
count = 0
async for result in scanner.scan(domains):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
async def test_generator_input(domains: list):
'''Test scanning using an async generator input'''
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
count = 0
async for result in scanner.scan(domain_generator(domains)):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
async def main() -> None:
'''Main test function'''
# Fetch domains
domains = await get_domains_from_url()
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
# Store benchmark results
results = []
# Run tests with different concurrency levels
for concurrency in [25, 50, 100]:
# Generator tests
gen_result = await run_benchmark('Generator', domains, concurrency)
results.append(('Generator', concurrency, *gen_result))
# List tests
list_result = await run_benchmark('List', domains, concurrency)
results.append(('List', concurrency, *list_result))
# Print benchmark comparison
logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}')
logging.info('-' * 80)
logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}')
logging.info('-' * 80)
# Sort by domains per second (fastest first)
results.sort(key=lambda x: x[3], reverse=True)
for test_type, concurrency, elapsed, domains_per_sec in results:
logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}')
# Highlight fastest result
fastest = results[0]
logging.info('-' * 80)
logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections')
logging.info(f'Time: {fastest[2]:.2f} seconds')
logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}')
logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
except Exception as e:
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
if __name__ == '__main__':
except KeyboardInterrupt:
logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}')
sys.exit(1) |