235 lines
9.4 KiB
Python
235 lines
9.4 KiB
Python
#!/usr/bin/env python3
|
|
# HTTPZ Web Scanner - Unit Tests
|
|
# unit_test.py
|
|
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
from httpz_scanner import HTTPZScanner
|
|
from httpz_scanner.colors import Colors
|
|
except ImportError:
|
|
raise ImportError('missing httpz_scanner library (pip install httpz_scanner)')
|
|
|
|
|
|
class ColoredFormatter(logging.Formatter):
|
|
'''Custom formatter for colored log output'''
|
|
|
|
def format(self, record):
|
|
if record.levelno == logging.INFO:
|
|
color = Colors.GREEN
|
|
elif record.levelno == logging.WARNING:
|
|
color = Colors.YELLOW
|
|
elif record.levelno == logging.ERROR:
|
|
color = Colors.RED
|
|
else:
|
|
color = Colors.RESET
|
|
|
|
record.msg = f'{color}{record.msg}{Colors.RESET}'
|
|
return super().format(record)
|
|
|
|
|
|
# Configure logging with colors
|
|
logger = logging.getLogger()
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
logger.setLevel(logging.INFO)
|
|
logger.addHandler(handler)
|
|
|
|
|
|
async def get_domains_from_url() -> list:
|
|
'''
|
|
Fetch domains from SecLists URL
|
|
|
|
:return: List of domains
|
|
'''
|
|
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
raise ImportError('missing aiohttp library (pip install aiohttp)')
|
|
|
|
url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
content = await response.text()
|
|
return [line.strip() for line in content.splitlines() if line.strip()]
|
|
|
|
|
|
async def domain_generator(domains: list):
|
|
'''
|
|
Async generator that yields domains
|
|
|
|
:param domains: List of domains to yield
|
|
'''
|
|
|
|
for domain in domains:
|
|
await asyncio.sleep(0) # Allow other coroutines to run
|
|
yield domain
|
|
|
|
|
|
async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple:
|
|
'''Run a single benchmark test'''
|
|
|
|
logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}')
|
|
scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
|
|
|
|
count = 0
|
|
got_first = False
|
|
start_time = None
|
|
|
|
if test_type == 'List':
|
|
async for result in scanner.scan(domains):
|
|
if result:
|
|
if not got_first:
|
|
got_first = True
|
|
start_time = time.time()
|
|
count += 1
|
|
|
|
# More detailed status reporting
|
|
status_str = ''
|
|
if result['status'] < 0:
|
|
error_type = result.get('error_type', 'UNKNOWN')
|
|
error_msg = result.get('error', 'Unknown Error')
|
|
status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}"
|
|
elif 200 <= result['status'] < 300:
|
|
status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
|
|
elif 300 <= result['status'] < 400:
|
|
status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
|
|
else:
|
|
status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
|
|
|
|
# Show protocol and response headers if available
|
|
protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else ''
|
|
headers_info = ''
|
|
if result.get('response_headers'):
|
|
important_headers = ['server', 'location', 'content-type']
|
|
headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers]
|
|
if headers:
|
|
headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}"
|
|
|
|
# Show redirect chain if present
|
|
redirect_info = ''
|
|
if result.get('redirect_chain'):
|
|
redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}"
|
|
|
|
# Show error details if present
|
|
error_info = ''
|
|
if result.get('error'):
|
|
error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}"
|
|
|
|
# Show final URL if different from original
|
|
url_info = ''
|
|
if result.get('url') and result['url'] != f"http(s)://{result['domain']}":
|
|
url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}"
|
|
|
|
logging.info(
|
|
f"{test_type}-{concurrency} Result {count}: "
|
|
f"{status_str}{protocol_info} "
|
|
f"{Colors.CYAN}{result['domain']}{Colors.RESET}"
|
|
f"{redirect_info}"
|
|
f"{url_info}"
|
|
f"{headers_info}"
|
|
f"{error_info}"
|
|
)
|
|
else:
|
|
# Skip generator test
|
|
pass
|
|
|
|
elapsed = time.time() - start_time if start_time else 0
|
|
domains_per_sec = count/elapsed if elapsed > 0 else 0
|
|
logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}')
|
|
|
|
return elapsed, domains_per_sec
|
|
|
|
|
|
async def test_list_input(domains: list):
|
|
'''Test scanning using a list input'''
|
|
|
|
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
|
|
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
|
|
|
|
start_time = time.time()
|
|
count = 0
|
|
async for result in scanner.scan(domains):
|
|
if result:
|
|
count += 1
|
|
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
|
|
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
|
|
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
|
|
logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
|
|
|
|
|
|
async def test_generator_input(domains: list):
|
|
'''Test scanning using an async generator input'''
|
|
|
|
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
|
|
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
|
|
|
|
start_time = time.time()
|
|
count = 0
|
|
async for result in scanner.scan(domain_generator(domains)):
|
|
if result:
|
|
count += 1
|
|
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
|
|
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
|
|
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
|
|
logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
|
|
|
|
|
|
async def main() -> None:
|
|
'''Main test function'''
|
|
|
|
try:
|
|
# Fetch domains
|
|
domains = await get_domains_from_url()
|
|
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
|
|
|
|
# Store benchmark results
|
|
results = []
|
|
|
|
# Run tests with different concurrency levels
|
|
for concurrency in [25, 50, 100]:
|
|
# Generator tests
|
|
gen_result = await run_benchmark('Generator', domains, concurrency)
|
|
results.append(('Generator', concurrency, *gen_result))
|
|
|
|
# List tests
|
|
list_result = await run_benchmark('List', domains, concurrency)
|
|
results.append(('List', concurrency, *list_result))
|
|
|
|
# Print benchmark comparison
|
|
logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}')
|
|
logging.info('-' * 80)
|
|
logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}')
|
|
logging.info('-' * 80)
|
|
|
|
# Sort by domains per second (fastest first)
|
|
results.sort(key=lambda x: x[3], reverse=True)
|
|
|
|
for test_type, concurrency, elapsed, domains_per_sec in results:
|
|
logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}')
|
|
|
|
# Highlight fastest result
|
|
fastest = results[0]
|
|
logging.info('-' * 80)
|
|
logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections')
|
|
logging.info(f'Time: {fastest[2]:.2f} seconds')
|
|
logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}')
|
|
|
|
logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
|
|
|
|
except Exception as e:
|
|
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}')
|
|
sys.exit(1) |