httpz/unit_test.py
2025-02-12 00:28:46 -05:00

133 lines
4.0 KiB
Python

#!/usr/bin/env python3
# HTTPZ Web Scanner - Unit Tests
# unit_test.py
import asyncio
import logging
import sys
try:
from httpz_scanner import HTTPZScanner
from httpz_scanner.colors import Colors
except ImportError:
raise ImportError('missing httpz_scanner library (pip install httpz_scanner)')
class ColoredFormatter(logging.Formatter):
'''Custom formatter for colored log output'''
def format(self, record):
if record.levelno == logging.INFO:
color = Colors.GREEN
elif record.levelno == logging.WARNING:
color = Colors.YELLOW
elif record.levelno == logging.ERROR:
color = Colors.RED
else:
color = Colors.RESET
record.msg = f'{color}{record.msg}{Colors.RESET}'
return super().format(record)
# Configure logging with colors
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(handler)
async def get_domains_from_url():
'''
Fetch domains from SecLists URL
:return: List of domains
'''
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return [line.strip() for line in content.splitlines() if line.strip()]
async def domain_generator(domains):
'''
Async generator that yields domains
:param domains: List of domains to yield
'''
for domain in domains:
await asyncio.sleep(0) # Allow other coroutines to run
yield domain
async def test_list_input(domains):
'''
Test scanning using a list input
:param domains: List of domains to scan
'''
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=100, timeout=3, show_progress=True, debug_mode=True)
count = 0
async for result in scanner.scan(domains):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
logging.info(f'List Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}')
async def test_generator_input(domains):
'''
Test scanning using an async generator input
:param domains: List of domains to generate from
'''
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=100, timeout=3, show_progress=True, debug_mode=True)
count = 0
async for result in scanner.scan(domain_generator(domains)):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
logging.info(f'Generator Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}')
async def main() -> None:
'''Main test function'''
try:
# Fetch domains
domains = await get_domains_from_url()
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
# Run tests
await test_generator_input(domains)
await test_list_input(domains)
logging.info(f'{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
except Exception as e:
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
sys.exit(1)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}')
sys.exit(1)