Compare commits

...

12 Commits
v2.0.7 ... main

Author SHA1 Message Date
19525aec7d
sup roarie 2025-02-12 03:03:20 -05:00
f1f5a78ae0
sup tommyrot 2025-02-12 02:59:51 -05:00
9a4b7e977a
fixed chunk output 2025-02-12 02:57:44 -05:00
e220648a1a
fixed chunk output 2025-02-12 02:55:31 -05:00
7fe571ddad
fixed chunk output 2025-02-12 00:50:02 -05:00
6dacafeee5
fixed chunk output 2025-02-12 00:35:35 -05:00
41d7e53d30
fixed chunk output 2025-02-12 00:32:28 -05:00
db9590f59d
faster processing 2025-02-12 00:28:46 -05:00
90b5134c25
Added unit test 2025-02-11 22:30:22 -05:00
3d8b2d8e4f
fuck 2025-02-11 21:48:34 -05:00
63517430b7
fuck 2025-02-11 21:44:58 -05:00
311d37108a
fuck 2025-02-11 21:40:49 -05:00
11 changed files with 562 additions and 261 deletions

View File

@ -2,8 +2,8 @@
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/__init__.py
from .scanner import HTTPZScanner
from .colors import Colors
from .scanner import HTTPZScanner
__version__ = '2.0.7'
__version__ = '2.1.8'

View File

@ -4,8 +4,11 @@
import asyncio
import sys
from .cli import main
if __name__ == '__main__':
try:
asyncio.run(main())

View File

@ -4,16 +4,19 @@
import argparse
import asyncio
import json
import logging
import os
import sys
import json
from datetime import datetime
from .colors import Colors
from .formatters import format_console_output
from .parsers import parse_status_codes, parse_shard
from .scanner import HTTPZScanner
from .utils import SILENT_MODE, info
from .parsers import parse_status_codes, parse_shard
from .formatters import format_console_output
def setup_logging(level='INFO', log_to_disk=False):
'''
@ -22,16 +25,16 @@ def setup_logging(level='INFO', log_to_disk=False):
:param level: Logging level (INFO or DEBUG)
:param log_to_disk: Whether to also log to file
'''
class ColoredFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
# Format: MM-DD HH:MM
from datetime import datetime
def formatTime(self, record):
dt = datetime.fromtimestamp(record.created)
return f"{Colors.GRAY}{dt.strftime('%m-%d %H:%M')}{Colors.RESET}"
return f'{Colors.GRAY}{dt.strftime("%m-%d %H:%M")}{Colors.RESET}'
def format(self, record):
return f'{self.formatTime(record)} {record.getMessage()}'
# Setup logging handlers
handlers = []
# Console handler
@ -47,48 +50,50 @@ def setup_logging(level='INFO', log_to_disk=False):
handlers.append(file_handler)
# Setup logger
logging.basicConfig(
level=getattr(logging, level.upper()),
handlers=handlers
)
logging.basicConfig(level=getattr(logging, level.upper()), handlers=handlers)
async def main():
parser = argparse.ArgumentParser(
description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
# Add arguments
parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags')
parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks')
parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console')
parser.add_argument('-o', '--output', help='Output file path (JSONL format)')
parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks')
parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console')
parser.add_argument('-o', '--output', help='Output file path (JSONL format)')
# Output field flags
parser.add_argument('-b', '--body', action='store_true', help='Show body preview')
parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records')
parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length')
parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type')
parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers')
parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses')
parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code')
parser.add_argument('-ti', '--title', action='store_true', help='Show page title')
parser.add_argument('-b', '--body', action='store_true', help='Show body preview')
parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records')
parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length')
parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type')
parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
parser.add_argument('-hr', '--show-headers', action='store_true', help='Show response headers')
parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses')
parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code')
parser.add_argument('-ti', '--title', action='store_true', help='Show page title')
parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information')
# Other arguments
parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers')
parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)')
parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)')
parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter')
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)')
parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter')
parser.add_argument('-pd', '--post-data', help='Send POST request with this data')
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)')
parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds')
# Add shard argument
parser.add_argument('-sh','--shard', type=parse_shard, help='Shard index and total shards (e.g., 1/3)')
# Add this to the argument parser section
parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")')
# Add these arguments in the parser section
parser.add_argument('-hd', '--headers', help='Custom headers to send with each request (format: "Header1: value1,Header2: value2")')
# If no arguments provided, print help and exit
if len(sys.argv) == 1:
@ -121,7 +126,7 @@ async def main():
'body' : args.all_flags or args.body,
'ip' : args.all_flags or args.ip,
'favicon' : args.all_flags or args.favicon,
'headers' : args.all_flags or args.headers,
'headers' : args.all_flags or args.show_headers,
'follow_redirects' : args.all_flags or args.follow_redirects,
'cname' : args.all_flags or args.cname,
'tls' : args.all_flags or args.tls_info
@ -145,7 +150,10 @@ async def main():
show_fields=show_fields,
match_codes=args.match_codes,
exclude_codes=args.exclude_codes,
shard=args.shard
shard=args.shard,
paths=args.paths.split(',') if args.paths else None,
custom_headers=dict(h.split(': ', 1) for h in args.headers.split(',')) if args.headers else None,
post_data=args.post_data
)
count = 0
@ -154,10 +162,11 @@ async def main():
if args.output:
with open(args.output, 'a') as f:
f.write(json.dumps(result) + '\n')
f.flush() # Ensure file output is immediate
# Handle JSON output separately
if args.jsonl:
print(json.dumps(result))
print(json.dumps(result), flush=True) # Force flush
continue
# Only output and increment counter if we have content to show for normal output
@ -166,8 +175,9 @@ async def main():
if args.progress:
count += 1
info(f"[{count}] {formatted}")
sys.stdout.flush() # Force flush after each domain
else:
print(formatted)
print(formatted, flush=True) # Force flush
except KeyboardInterrupt:
logging.warning('Process interrupted by user')
@ -176,9 +186,12 @@ async def main():
logging.error(f'Unexpected error: {str(e)}')
sys.exit(1)
def run():
'''Entry point for the CLI'''
asyncio.run(main())
if __name__ == '__main__':
run()

View File

@ -4,7 +4,8 @@
class Colors:
'''ANSI color codes for terminal output'''
HEADER = '\033[95m' # Light purple
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
@ -12,9 +13,9 @@ class Colors:
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
RESET = '\033[0m'
PURPLE = '\033[35m' # Dark purple
LIGHT_RED = '\033[38;5;203m' # Light red
DARK_GREEN = '\033[38;5;22m' # Dark green
PINK = '\033[38;5;198m' # Bright pink
GRAY = '\033[90m' # Gray color
CYAN = '\033[96m' # Cyan color
PURPLE = '\033[35m'
LIGHT_RED = '\033[38;5;203m'
DARK_GREEN = '\033[38;5;22m'
PINK = '\033[38;5;198m'
GRAY = '\033[90m'
CYAN = '\033[96m'

View File

@ -4,14 +4,23 @@
import asyncio
import os
import aiohttp
import dns.asyncresolver
import dns.query
import dns.resolver
import dns.zone
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
try:
import dns.asyncresolver
import dns.query
import dns.resolver
import dns.zone
except ImportError:
raise ImportError('missing dnspython library (pip install dnspython)')
from .utils import debug, info, SILENT_MODE
async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple:
'''
Resolve all DNS records for a domain
@ -21,36 +30,35 @@ async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None,
:param nameserver: Specific nameserver to use
:param check_axfr: Whether to attempt zone transfer
'''
# Setup resolver
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
if nameserver:
resolver.nameservers = [nameserver]
results = await asyncio.gather(*[resolver.resolve(domain, rtype)
for rtype in ('NS', 'A', 'AAAA', 'CNAME')],
return_exceptions=True)
# Resolve all DNS records
results = await asyncio.gather(*[resolver.resolve(domain, rtype) for rtype in ('NS', 'A', 'AAAA', 'CNAME')], return_exceptions=True)
# Parse results
nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else []
ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + \
([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
# Get NS IPs
ns_ips = {}
if nameservers:
ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype)
for ns in nameservers
for rtype in ('A', 'AAAA')],
return_exceptions=True)
ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) for ns in nameservers for rtype in ('A', 'AAAA')], return_exceptions=True)
for i, ns in enumerate(nameservers):
ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2]
if isinstance(records, dns.resolver.Answer)
for ip in records]
ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] if isinstance(records, dns.resolver.Answer) for ip in records]
# Attempt zone transfer
if check_axfr:
await attempt_axfr(domain, ns_ips, timeout)
return sorted(set(ips)), cname, nameservers, ns_ips
async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
'''
Attempt zone transfer for a domain
@ -59,28 +67,37 @@ async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
:param ns_ips: Dictionary of nameserver hostnames to their IPs
:param timeout: Timeout in seconds
'''
try:
os.makedirs('axfrout', exist_ok=True)
# Loop through each NS
for ns_host, ips in ns_ips.items():
# Loop through each NS IP
for ns_ip in ips:
try:
# Attempt zone transfer
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
# Write zone to file
with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f:
zone.to_text(f)
info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})')
except Exception as e:
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e:
debug(f'Failed AXFR for {domain}: {str(e)}')
async def load_resolvers(resolver_file: str = None) -> list:
'''
Load DNS resolvers from file or default source
:param resolver_file: Path to file containing resolver IPs
:return: List of resolver IPs
'''
# Load from file
if resolver_file:
try:
with open(resolver_file) as f:
@ -90,6 +107,7 @@ async def load_resolvers(resolver_file: str = None) -> list:
except Exception as e:
debug(f'Error loading resolvers from {resolver_file}: {str(e)}')
# Load from GitHub
async with aiohttp.ClientSession() as session:
async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
resolvers = await response.text()

View File

@ -5,6 +5,7 @@
from .colors import Colors
from .utils import human_size
def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
'''
Format the output with colored sections
@ -37,9 +38,17 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
parts.append(status)
# Domain (always shown)
# Domain/URL
parts.append(f"[{result['url']}]")
# Content Type
if show_fields.get('content_type') and result.get('content_type'):
parts.append(f"{Colors.CYAN}[{result['content_type']}]{Colors.RESET}")
# Content Length
if show_fields.get('content_length') and result.get('content_length'):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# Title
if show_fields.get('title') and result.get('title'):
parts.append(f"{Colors.DARK_GREEN}[{result['title']}]{Colors.RESET}")
@ -59,8 +68,8 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
parts.append(f"{Colors.PURPLE}[{result['favicon_hash']}]{Colors.RESET}")
# Headers
if show_fields.get('headers') and result.get('headers'):
headers_text = [f"{k}: {v}" for k, v in result['headers'].items()]
if show_fields.get('headers') and result.get('response_headers'):
headers_text = [f"{k}: {v}" for k, v in result['response_headers'].items()]
parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}")
else:
if show_fields.get('content_type') and result.get('content_type'):
@ -72,18 +81,18 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
parts.append(f"{Colors.PINK}[{size}]{Colors.RESET}")
except (ValueError, TypeError):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# CNAME
if show_fields.get('cname') and result.get('cname'):
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# Redirect Chain
if show_fields.get('follow_redirects') and result.get('redirect_chain'):
chain = ' -> '.join(result['redirect_chain'])
parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}")
# CNAME
if show_fields.get('cname') and result.get('cname'):
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# TLS Certificate Info
if result.get('tls'):
if show_fields.get('tls') and result.get('tls'):
cert = result['tls']
tls_parts = []
if cert.get('common_name'):

View File

@ -2,6 +2,8 @@
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/parsers.py
import argparse
try:
import bs4
except ImportError:
@ -20,7 +22,6 @@ except ImportError:
raise ImportError('missing mmh3 module (pip install mmh3)')
from .utils import debug, error
import argparse
def parse_domain_url(domain: str) -> tuple:
@ -41,20 +42,13 @@ def parse_domain_url(domain: str) -> tuple:
try:
port = int(port_str.split('/')[0])
except ValueError:
port = 443 if protocol == 'https://' else 80
else:
port = 443 if protocol == 'https://' else 80
protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}']
port = None
else:
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else 443
else:
port = 443
protocols = [
f'https://{base_domain}{":" + str(port) if port else ""}',
f'http://{base_domain}{":" + str(port) if port else ""}'
]
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else None
protocols = ['http://', 'https://'] # Always try HTTP first
return base_domain, port, protocols
@ -188,7 +182,7 @@ def parse_title(html: str, content_type: str = None) -> str:
:param html: HTML content of the page
:param content_type: Content-Type header value
'''
# Only parse title for HTML content
if content_type and not any(x in content_type.lower() for x in ['text/html', 'application/xhtml']):
return None

View File

@ -3,8 +3,9 @@
# httpz_scanner/scanner.py
import asyncio
import json
import random
import urllib.parse
import json
try:
import aiohttp
@ -16,17 +17,15 @@ try:
except ImportError:
raise ImportError('missing bs4 module (pip install beautifulsoup4)')
from .dns import resolve_all_dns, load_resolvers
from .formatters import format_console_output
from .colors import Colors
from .parsers import parse_domain_url, get_cert_info, get_favicon_hash, parse_title
from .utils import debug, info, USER_AGENTS, input_generator
from .dns import resolve_all_dns, load_resolvers
from .parsers import parse_domain_url, get_cert_info, get_favicon_hash
from .utils import debug, USER_AGENTS, input_generator
class HTTPZScanner:
'''Core scanner class for HTTP domain checking'''
def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None):
def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None, paths = None, custom_headers=None, post_data=None):
'''
Initialize the HTTPZScanner class
@ -43,6 +42,9 @@ class HTTPZScanner:
:param match_codes: Status codes to match
:param exclude_codes: Status codes to exclude
:param shard: Tuple of (shard_index, total_shards) for distributed scanning
:param paths: List of additional paths to check on each domain
:param custom_headers: Dictionary of custom headers to send with each request
:param post_data: Data to send with POST requests
'''
self.concurrent_limit = concurrent_limit
@ -55,6 +57,9 @@ class HTTPZScanner:
self.debug_mode = debug_mode
self.jsonl_output = jsonl_output
self.shard = shard
self.paths = paths or []
self.custom_headers = custom_headers or {}
self.post_data = post_data
self.show_fields = show_fields or {
'status_code' : True,
@ -77,106 +82,110 @@ class HTTPZScanner:
self.progress_count = 0
async def init(self):
'''Initialize resolvers - must be called before scanning'''
self.resolvers = await load_resolvers(self.resolver_file)
async def check_domain(self, session: aiohttp.ClientSession, domain: str):
'''Check a single domain and return results'''
nameserver = random.choice(self.resolvers) if self.resolvers else None
base_domain, port, protocols = parse_domain_url(domain)
result = {
'domain' : base_domain,
'status' : 0,
'url' : protocols[0],
'port' : port,
}
# Try each protocol
for url in protocols:
try:
# Set random user agent for each request
headers = {'User-Agent': random.choice(USER_AGENTS)}
for protocol in protocols:
url = f'{protocol}{base_domain}'
if port:
url += f':{port}'
async with session.get(url, timeout=self.timeout,
allow_redirects=self.follow_redirects,
max_redirects=10 if self.follow_redirects else 0,
headers=headers) as response:
result['status'] = response.status
# Early exit if status code doesn't match criteria
if self.match_codes and result['status'] not in self.match_codes:
return result
if self.exclude_codes and result['status'] in self.exclude_codes:
return result
# Continue with full processing only if status code matches criteria
result['url'] = str(response.url)
# Add headers if requested
headers = dict(response.headers)
if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')):
result['headers'] = headers
else:
# Only add content type/length if headers aren't included
if content_type := response.headers.get('content-type', '').split(';')[0]:
result['content_type'] = content_type
if content_length := response.headers.get('content-length'):
result['content_length'] = content_length
# Only add redirect chain if it exists
if self.follow_redirects and response.history:
result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
# Do DNS lookups only if we're going to use the result
ips, cname, nameservers, _ = await resolve_all_dns(
base_domain, self.timeout, nameserver, self.check_axfr
)
# Only add DNS fields if they have values
if ips:
result['ips'] = ips
if cname:
result['cname'] = cname
if nameservers:
result['nameservers'] = nameservers
# Only add TLS info if available
if response.url.scheme == 'https':
try:
if ssl_object := response._protocol.transport.get_extra_info('ssl_object'):
if tls_info := await get_cert_info(ssl_object, str(response.url)):
# Only add TLS fields that have values
result['tls'] = {k: v for k, v in tls_info.items() if v}
except AttributeError:
debug(f'Failed to get SSL info for {url}')
content_type = response.headers.get('Content-Type', '')
html = await response.text() if any(x in content_type.lower() for x in ['text/html', 'application/xhtml']) else None
# Only add title if it exists
if soup := bs4.BeautifulSoup(html, 'html.parser'):
if soup.title and soup.title.string:
result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300]
# Only add body if it exists
if body_text := soup.get_text():
result['body'] = ' '.join(body_text.split()).rstrip('.')[:500]
# Only add favicon hash if it exists
if favicon_hash := await get_favicon_hash(session, url, html):
result['favicon_hash'] = favicon_hash
break
try:
debug(f'Trying {url}...')
result = await self._check_url(session, url)
debug(f'Got result for {url}: {result}')
if result and (result['status'] != 400 or result.get('redirect_chain')): # Accept redirects
return result
except Exception as e:
debug(f'Error checking {url}: {str(e)}')
result['status'] = -1
continue
return None
return result
async def _check_url(self, session: aiohttp.ClientSession, url: str):
'''Check a single URL and return results'''
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
headers.update(self.custom_headers)
debug(f'Making request to {url} with headers: {headers}')
async with session.request('GET', url,
timeout=self.timeout,
allow_redirects=True, # Always follow redirects
max_redirects=10,
ssl=False, # Don't verify SSL
headers=headers) as response:
debug(f'Got response from {url}: status={response.status}, headers={dict(response.headers)}')
result = {
'domain': urllib.parse.urlparse(url).hostname,
'status': response.status,
'url': str(response.url),
'response_headers': dict(response.headers)
}
if response.history:
result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
debug(f'Redirect chain for {url}: {result["redirect_chain"]}')
return result
except aiohttp.ClientSSLError as e:
debug(f'SSL Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'SSL Error: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'SSL'
}
except aiohttp.ClientConnectorCertificateError as e:
debug(f'Certificate Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Certificate Error: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'CERT'
}
except aiohttp.ClientConnectorError as e:
debug(f'Connection Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Connection Failed: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'CONN'
}
except aiohttp.ClientError as e:
debug(f'HTTP Error for {url}: {e.__class__.__name__}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'HTTP Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'HTTP'
}
except asyncio.TimeoutError:
debug(f'Timeout for {url}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Connection Timed Out after {self.timeout}s',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'TIMEOUT'
}
except Exception as e:
debug(f'Unexpected error for {url}: {e.__class__.__name__}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'UNKNOWN'
}
async def scan(self, input_source):
@ -192,75 +201,93 @@ class HTTPZScanner:
'''
if not self.resolvers:
await self.init()
self.resolvers = await load_resolvers(self.resolver_file)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
tasks = set()
count = 0 # Move counter here since that's all process_result was doing
# Just use ssl=False, that's all we need
connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = {} # Change to dict to track domain for each task
domain_queue = asyncio.Queue()
queue_empty = False
# Handle different input types
if isinstance(input_source, str):
# File or stdin input
gen = input_generator(input_source, self.shard)
async for domain in gen:
if len(tasks) >= self.concurrent_limit:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
if self.show_progress:
count += 1 # Increment counter here
yield result
task = asyncio.create_task(self.check_domain(session, domain))
tasks.add(task)
elif isinstance(input_source, (list, tuple)):
# List/tuple input
for line_num, domain in enumerate(input_source):
if domain := str(domain).strip():
if self.shard is None or line_num % self.shard[1] == self.shard[0]:
if len(tasks) >= self.concurrent_limit:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
if self.show_progress:
count += 1
yield result
task = asyncio.create_task(self.check_domain(session, domain))
tasks.add(task)
else:
# Async generator input
line_num = 0
async for domain in input_source:
if isinstance(domain, bytes):
domain = domain.decode()
domain = domain.strip()
if domain:
if self.shard is None or line_num % self.shard[1] == self.shard[0]:
if len(tasks) >= self.concurrent_limit:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
if self.show_progress:
count += 1
yield result
task = asyncio.create_task(self.check_domain(session, domain))
tasks.add(task)
line_num += 1
# Process remaining tasks
if tasks:
done, _ = await asyncio.wait(tasks)
for task in done:
result = await task
async def process_domain(domain):
try:
result = await self.check_domain(session, domain)
if self.show_progress:
count += 1
yield result
self.progress_count += 1
if result:
return domain, result
else:
# Create a proper error result if check_domain returns None
return domain, {
'domain': domain,
'status': -1,
'error': 'No successful response from either HTTP or HTTPS',
'protocol': 'unknown',
'error_type': 'NO_RESPONSE'
}
except Exception as e:
debug(f'Error processing {domain}: {e.__class__.__name__}: {str(e)}')
# Return structured error information
return domain, {
'domain': domain,
'status': -1,
'error': f'{e.__class__.__name__}: {str(e)}',
'protocol': 'unknown',
'error_type': 'PROCESS'
}
# Queue processor
async def queue_processor():
async for domain in input_generator(input_source, self.shard):
await domain_queue.put(domain)
self.processed_domains += 1
nonlocal queue_empty
queue_empty = True
# Start queue processor
queue_task = asyncio.create_task(queue_processor())
try:
while not (queue_empty and domain_queue.empty() and not tasks):
# Fill up tasks until we hit concurrent limit
while len(tasks) < self.concurrent_limit and not domain_queue.empty():
domain = await domain_queue.get()
task = asyncio.create_task(process_domain(domain))
tasks[task] = domain
if tasks:
# Wait for at least one task to complete
done, _ = await asyncio.wait(
tasks.keys(),
return_when=asyncio.FIRST_COMPLETED
)
# Process completed tasks
for task in done:
domain = tasks.pop(task)
try:
_, result = await task
if result:
yield result
except Exception as e:
debug(f'Task error for {domain}: {e.__class__.__name__}: {str(e)}')
yield {
'domain': domain,
'status': -1,
'error': f'Task Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'unknown',
'error_type': 'TASK'
}
else:
await asyncio.sleep(0.1) # Prevent CPU spin when no tasks
finally:
# Clean up
for task in tasks:
task.cancel()
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass

View File

@ -68,8 +68,9 @@ USER_AGENTS = [
]
def debug(msg: str):
if not SILENT_MODE: logging.debug(msg)
def debug(msg: str):
if not SILENT_MODE:
logging.debug(msg)
def error(msg: str):
if not SILENT_MODE: logging.error(msg)
def info(msg: str):
@ -117,7 +118,7 @@ async def input_generator(input_source, shard: tuple = None):
# Handle stdin
if input_source == '-' or input_source is None:
for line in sys.stdin:
await asyncio.sleep(0) # Yield control
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
@ -127,7 +128,7 @@ async def input_generator(input_source, shard: tuple = None):
elif isinstance(input_source, str) and os.path.exists(input_source):
with open(input_source, 'r') as f:
for line in f:
await asyncio.sleep(0) # Yield control
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
@ -136,7 +137,7 @@ async def input_generator(input_source, shard: tuple = None):
# Handle iterables (generators, lists, etc)
elif hasattr(input_source, '__iter__') and not isinstance(input_source, (str, bytes)):
for line in input_source:
await asyncio.sleep(0) # Yield control
await asyncio.sleep(0)
if isinstance(line, bytes):
line = line.decode()
if line := line.strip():
@ -149,7 +150,7 @@ async def input_generator(input_source, shard: tuple = None):
if isinstance(input_source, bytes):
input_source = input_source.decode()
for line in input_source.splitlines():
await asyncio.sleep(0) # Yield control
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line

View File

@ -10,7 +10,7 @@ with open('README.md', 'r', encoding='utf-8') as f:
setup(
name='httpz_scanner',
version='2.0.7',
version='2.1.8',
author='acidvegas',
author_email='acid.vegas@acid.vegas',
description='Hyper-fast HTTP Scraping Tool',

235
unit_test.py Normal file
View File

@ -0,0 +1,235 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Unit Tests
# unit_test.py
import asyncio
import logging
import sys
import time
try:
from httpz_scanner import HTTPZScanner
from httpz_scanner.colors import Colors
except ImportError:
raise ImportError('missing httpz_scanner library (pip install httpz_scanner)')
class ColoredFormatter(logging.Formatter):
'''Custom formatter for colored log output'''
def format(self, record):
if record.levelno == logging.INFO:
color = Colors.GREEN
elif record.levelno == logging.WARNING:
color = Colors.YELLOW
elif record.levelno == logging.ERROR:
color = Colors.RED
else:
color = Colors.RESET
record.msg = f'{color}{record.msg}{Colors.RESET}'
return super().format(record)
# Configure logging with colors
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(handler)
async def get_domains_from_url() -> list:
'''
Fetch domains from SecLists URL
:return: List of domains
'''
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return [line.strip() for line in content.splitlines() if line.strip()]
async def domain_generator(domains: list):
'''
Async generator that yields domains
:param domains: List of domains to yield
'''
for domain in domains:
await asyncio.sleep(0) # Allow other coroutines to run
yield domain
async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple:
'''Run a single benchmark test'''
logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
count = 0
got_first = False
start_time = None
if test_type == 'List':
async for result in scanner.scan(domains):
if result:
if not got_first:
got_first = True
start_time = time.time()
count += 1
# More detailed status reporting
status_str = ''
if result['status'] < 0:
error_type = result.get('error_type', 'UNKNOWN')
error_msg = result.get('error', 'Unknown Error')
status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}"
elif 200 <= result['status'] < 300:
status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
elif 300 <= result['status'] < 400:
status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
else:
status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
# Show protocol and response headers if available
protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else ''
headers_info = ''
if result.get('response_headers'):
important_headers = ['server', 'location', 'content-type']
headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers]
if headers:
headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}"
# Show redirect chain if present
redirect_info = ''
if result.get('redirect_chain'):
redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}"
# Show error details if present
error_info = ''
if result.get('error'):
error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}"
# Show final URL if different from original
url_info = ''
if result.get('url') and result['url'] != f"http(s)://{result['domain']}":
url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}"
logging.info(
f"{test_type}-{concurrency} Result {count}: "
f"{status_str}{protocol_info} "
f"{Colors.CYAN}{result['domain']}{Colors.RESET}"
f"{redirect_info}"
f"{url_info}"
f"{headers_info}"
f"{error_info}"
)
else:
# Skip generator test
pass
elapsed = time.time() - start_time if start_time else 0
domains_per_sec = count/elapsed if elapsed > 0 else 0
logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}')
return elapsed, domains_per_sec
async def test_list_input(domains: list):
'''Test scanning using a list input'''
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
count = 0
async for result in scanner.scan(domains):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
async def test_generator_input(domains: list):
'''Test scanning using an async generator input'''
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
count = 0
async for result in scanner.scan(domain_generator(domains)):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
async def main() -> None:
'''Main test function'''
try:
# Fetch domains
domains = await get_domains_from_url()
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
# Store benchmark results
results = []
# Run tests with different concurrency levels
for concurrency in [25, 50, 100]:
# Generator tests
gen_result = await run_benchmark('Generator', domains, concurrency)
results.append(('Generator', concurrency, *gen_result))
# List tests
list_result = await run_benchmark('List', domains, concurrency)
results.append(('List', concurrency, *list_result))
# Print benchmark comparison
logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}')
logging.info('-' * 80)
logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}')
logging.info('-' * 80)
# Sort by domains per second (fastest first)
results.sort(key=lambda x: x[3], reverse=True)
for test_type, concurrency, elapsed, domains_per_sec in results:
logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}')
# Highlight fastest result
fastest = results[0]
logging.info('-' * 80)
logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections')
logging.info(f'Time: {fastest[2]:.2f} seconds')
logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}')
logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
except Exception as e:
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
sys.exit(1)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}')
sys.exit(1)