Added unit test

This commit is contained in:
Dionysus 2025-02-11 22:30:22 -05:00
parent 3d8b2d8e4f
commit 90b5134c25
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
11 changed files with 254 additions and 97 deletions

View File

@ -2,8 +2,8 @@
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/__init__.py
from .scanner import HTTPZScanner
from .colors import Colors
from .scanner import HTTPZScanner
__version__ = '2.0.11'
__version__ = '2.1.0'

View File

@ -4,8 +4,11 @@
import asyncio
import sys
from .cli import main
if __name__ == '__main__':
try:
asyncio.run(main())

View File

@ -4,16 +4,19 @@
import argparse
import asyncio
import json
import logging
import os
import sys
import json
from datetime import datetime
from .colors import Colors
from .formatters import format_console_output
from .parsers import parse_status_codes, parse_shard
from .scanner import HTTPZScanner
from .utils import SILENT_MODE, info
from .parsers import parse_status_codes, parse_shard
from .formatters import format_console_output
def setup_logging(level='INFO', log_to_disk=False):
'''
@ -22,16 +25,16 @@ def setup_logging(level='INFO', log_to_disk=False):
:param level: Logging level (INFO or DEBUG)
:param log_to_disk: Whether to also log to file
'''
class ColoredFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
# Format: MM-DD HH:MM
from datetime import datetime
def formatTime(self, record):
dt = datetime.fromtimestamp(record.created)
return f"{Colors.GRAY}{dt.strftime('%m-%d %H:%M')}{Colors.RESET}"
return f'{Colors.GRAY}{dt.strftime("%m-%d %H:%M")}{Colors.RESET}'
def format(self, record):
return f'{self.formatTime(record)} {record.getMessage()}'
# Setup logging handlers
handlers = []
# Console handler
@ -47,16 +50,11 @@ def setup_logging(level='INFO', log_to_disk=False):
handlers.append(file_handler)
# Setup logger
logging.basicConfig(
level=getattr(logging, level.upper()),
handlers=handlers
)
logging.basicConfig(level=getattr(logging, level.upper()), handlers=handlers)
async def main():
parser = argparse.ArgumentParser(
description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
# Add arguments
parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
@ -176,9 +174,12 @@ async def main():
logging.error(f'Unexpected error: {str(e)}')
sys.exit(1)
def run():
'''Entry point for the CLI'''
asyncio.run(main())
if __name__ == '__main__':
run()

View File

@ -4,7 +4,8 @@
class Colors:
'''ANSI color codes for terminal output'''
HEADER = '\033[95m' # Light purple
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
@ -12,9 +13,9 @@ class Colors:
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
RESET = '\033[0m'
PURPLE = '\033[35m' # Dark purple
LIGHT_RED = '\033[38;5;203m' # Light red
DARK_GREEN = '\033[38;5;22m' # Dark green
PINK = '\033[38;5;198m' # Bright pink
GRAY = '\033[90m' # Gray color
CYAN = '\033[96m' # Cyan color
PURPLE = '\033[35m'
LIGHT_RED = '\033[38;5;203m'
DARK_GREEN = '\033[38;5;22m'
PINK = '\033[38;5;198m'
GRAY = '\033[90m'
CYAN = '\033[96m'

View File

@ -4,14 +4,23 @@
import asyncio
import os
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
try:
import dns.asyncresolver
import dns.query
import dns.resolver
import dns.zone
except ImportError:
raise ImportError('missing dnspython library (pip install dnspython)')
from .utils import debug, info, SILENT_MODE
async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple:
'''
Resolve all DNS records for a domain
@ -21,36 +30,35 @@ async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None,
:param nameserver: Specific nameserver to use
:param check_axfr: Whether to attempt zone transfer
'''
# Setup resolver
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
if nameserver:
resolver.nameservers = [nameserver]
results = await asyncio.gather(*[resolver.resolve(domain, rtype)
for rtype in ('NS', 'A', 'AAAA', 'CNAME')],
return_exceptions=True)
# Resolve all DNS records
results = await asyncio.gather(*[resolver.resolve(domain, rtype) for rtype in ('NS', 'A', 'AAAA', 'CNAME')], return_exceptions=True)
# Parse results
nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else []
ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + \
([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
# Get NS IPs
ns_ips = {}
if nameservers:
ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype)
for ns in nameservers
for rtype in ('A', 'AAAA')],
return_exceptions=True)
ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) for ns in nameservers for rtype in ('A', 'AAAA')], return_exceptions=True)
for i, ns in enumerate(nameservers):
ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2]
if isinstance(records, dns.resolver.Answer)
for ip in records]
ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] if isinstance(records, dns.resolver.Answer) for ip in records]
# Attempt zone transfer
if check_axfr:
await attempt_axfr(domain, ns_ips, timeout)
return sorted(set(ips)), cname, nameservers, ns_ips
async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
'''
Attempt zone transfer for a domain
@ -59,28 +67,37 @@ async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
:param ns_ips: Dictionary of nameserver hostnames to their IPs
:param timeout: Timeout in seconds
'''
try:
os.makedirs('axfrout', exist_ok=True)
# Loop through each NS
for ns_host, ips in ns_ips.items():
# Loop through each NS IP
for ns_ip in ips:
try:
# Attempt zone transfer
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
# Write zone to file
with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f:
zone.to_text(f)
info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})')
except Exception as e:
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e:
debug(f'Failed AXFR for {domain}: {str(e)}')
async def load_resolvers(resolver_file: str = None) -> list:
'''
Load DNS resolvers from file or default source
:param resolver_file: Path to file containing resolver IPs
:return: List of resolver IPs
'''
# Load from file
if resolver_file:
try:
with open(resolver_file) as f:
@ -90,6 +107,7 @@ async def load_resolvers(resolver_file: str = None) -> list:
except Exception as e:
debug(f'Error loading resolvers from {resolver_file}: {str(e)}')
# Load from GitHub
async with aiohttp.ClientSession() as session:
async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
resolvers = await response.text()

View File

@ -5,6 +5,7 @@
from .colors import Colors
from .utils import human_size
def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
'''
Format the output with colored sections

View File

@ -2,6 +2,8 @@
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/parsers.py
import argparse
try:
import bs4
except ImportError:
@ -20,7 +22,6 @@ except ImportError:
raise ImportError('missing mmh3 module (pip install mmh3)')
from .utils import debug, error
import argparse
def parse_domain_url(domain: str) -> tuple:

View File

@ -3,7 +3,6 @@
# httpz_scanner/scanner.py
import asyncio
import json
import random
try:
@ -17,10 +16,8 @@ except ImportError:
raise ImportError('missing bs4 module (pip install beautifulsoup4)')
from .dns import resolve_all_dns, load_resolvers
from .formatters import format_console_output
from .colors import Colors
from .parsers import parse_domain_url, get_cert_info, get_favicon_hash, parse_title
from .utils import debug, info, USER_AGENTS, input_generator
from .parsers import parse_domain_url, get_cert_info, get_favicon_hash
from .utils import debug, USER_AGENTS, input_generator
class HTTPZScanner:
@ -77,16 +74,21 @@ class HTTPZScanner:
self.progress_count = 0
async def init(self):
'''Initialize resolvers - must be called before scanning'''
self.resolvers = await load_resolvers(self.resolver_file)
async def check_domain(self, session: aiohttp.ClientSession, domain: str):
'''Check a single domain and return results'''
'''
Check a single domain and return results
:param session: aiohttp.ClientSession
:param domain: str
'''
# Get random nameserver
nameserver = random.choice(self.resolvers) if self.resolvers else None
# Parse domain
base_domain, port, protocols = parse_domain_url(domain)
# Initialize result dictionary
result = {
'domain' : base_domain,
'status' : 0,
@ -100,11 +102,7 @@ class HTTPZScanner:
# Set random user agent for each request
headers = {'User-Agent': random.choice(USER_AGENTS)}
async with session.get(url, timeout=self.timeout,
allow_redirects=self.follow_redirects,
max_redirects=10 if self.follow_redirects else 0,
headers=headers) as response:
async with session.get(url, timeout=self.timeout, allow_redirects=self.follow_redirects, max_redirects=10 if self.follow_redirects else 0, headers=headers) as response:
result['status'] = response.status
# Bail immediately if it's a failed lookup - no point processing further
@ -196,7 +194,7 @@ class HTTPZScanner:
'''
if not self.resolvers:
await self.init()
self.resolvers = await load_resolvers(self.resolver_file)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
tasks = set()
@ -212,15 +210,16 @@ class HTTPZScanner:
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
if result := await task: # Only yield if result is not None
if result := await task:
if self.show_progress:
count += 1
yield result
task = asyncio.create_task(self.check_domain(session, domain))
tasks.add(task)
elif isinstance(input_source, (list, tuple)):
# List/tuple input
elif isinstance(input_source, (list, tuple)):
for line_num, domain in enumerate(input_source):
if domain := str(domain).strip():
if self.shard is None or line_num % self.shard[1] == self.shard[0]:
@ -229,7 +228,7 @@ class HTTPZScanner:
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
if result := await task: # Only yield if result is not None
if result := await task:
if self.show_progress:
count += 1
yield result
@ -251,7 +250,7 @@ class HTTPZScanner:
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
if result := await task: # Only yield if result is not None
if result := await task:
if self.show_progress:
count += 1
yield result
@ -264,7 +263,7 @@ class HTTPZScanner:
if tasks:
done, _ = await asyncio.wait(tasks)
for task in done:
if result := await task: # Only yield if result is not None
if result := await task:
if self.show_progress:
count += 1
yield result

View File

@ -117,7 +117,7 @@ async def input_generator(input_source, shard: tuple = None):
# Handle stdin
if input_source == '-' or input_source is None:
for line in sys.stdin:
await asyncio.sleep(0) # Yield control
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
@ -127,7 +127,7 @@ async def input_generator(input_source, shard: tuple = None):
elif isinstance(input_source, str) and os.path.exists(input_source):
with open(input_source, 'r') as f:
for line in f:
await asyncio.sleep(0) # Yield control
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
@ -136,7 +136,7 @@ async def input_generator(input_source, shard: tuple = None):
# Handle iterables (generators, lists, etc)
elif hasattr(input_source, '__iter__') and not isinstance(input_source, (str, bytes)):
for line in input_source:
await asyncio.sleep(0) # Yield control
await asyncio.sleep(0)
if isinstance(line, bytes):
line = line.decode()
if line := line.strip():
@ -149,7 +149,7 @@ async def input_generator(input_source, shard: tuple = None):
if isinstance(input_source, bytes):
input_source = input_source.decode()
for line in input_source.splitlines():
await asyncio.sleep(0) # Yield control
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line

View File

@ -10,7 +10,7 @@ with open('README.md', 'r', encoding='utf-8') as f:
setup(
name='httpz_scanner',
version='2.0.11',
version='2.1.0',
author='acidvegas',
author_email='acid.vegas@acid.vegas',
description='Hyper-fast HTTP Scraping Tool',

133
unit_test.py Normal file
View File

@ -0,0 +1,133 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Unit Tests
# unit_test.py
import asyncio
import logging
import sys
try:
from httpz_scanner import HTTPZScanner
from httpz_scanner.colors import Colors
except ImportError:
raise ImportError('missing httpz_scanner library (pip install httpz_scanner)')
class ColoredFormatter(logging.Formatter):
'''Custom formatter for colored log output'''
def format(self, record):
if record.levelno == logging.INFO:
color = Colors.GREEN
elif record.levelno == logging.WARNING:
color = Colors.YELLOW
elif record.levelno == logging.ERROR:
color = Colors.RED
else:
color = Colors.RESET
record.msg = f'{color}{record.msg}{Colors.RESET}'
return super().format(record)
# Configure logging with colors
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(handler)
async def get_domains_from_url():
'''
Fetch domains from SecLists URL
:return: List of domains
'''
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return [line.strip() for line in content.splitlines() if line.strip()]
async def domain_generator(domains):
'''
Async generator that yields domains
:param domains: List of domains to yield
'''
for domain in domains:
await asyncio.sleep(0) # Allow other coroutines to run
yield domain
async def test_list_input(domains):
'''
Test scanning using a list input
:param domains: List of domains to scan
'''
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=20, timeout=3, show_progress=True, debug_mode=True)
count = 0
async for result in scanner.scan(domains):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
logging.info(f'List Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}')
async def test_generator_input(domains):
'''
Test scanning using an async generator input
:param domains: List of domains to generate from
'''
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=20, timeout=3, show_progress=True, debug_mode=True)
count = 0
async for result in scanner.scan(domain_generator(domains)):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
logging.info(f'Generator Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}')
async def main() -> None:
'''Main test function'''
try:
# Fetch domains
domains = await get_domains_from_url()
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
# Run tests
await test_list_input(domains)
await test_generator_input(domains)
logging.info(f'{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
except Exception as e:
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
sys.exit(1)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}')
sys.exit(1)