This commit is contained in:
2026-05-02 00:06:08 -04:00
parent 19525aec7d
commit 44c035aab6
11 changed files with 1201 additions and 972 deletions

308
README.md
View File

@@ -2,213 +2,247 @@
![](./.screens/preview.gif)
A high-performance concurrent web scanner written in Python. HTTPZ efficiently scans domains for HTTP/HTTPS services, extracting valuable information like status codes, titles, SSL certificates, and more.
A high-performance concurrent HTTP recon tool. HTTPZ checks domains for HTTP/HTTPS services and pulls back status codes, titles, body previews, response headers, favicon hashes, TLS certificate info, and resolved IPs — all configurable per scan.
Designed to run as a library inside distributed workers scanning hundreds of millions of domains.
## Requirements
- [Python](https://www.python.org/downloads/)
- [Python](https://www.python.org/downloads/) 3.8+
- [aiodns](https://pypi.org/project/aiodns/)
- [aiofiles](https://pypi.org/project/aiofiles/)
- [aiohttp](https://pypi.org/project/aiohttp/)
- [beautifulsoup4](https://pypi.org/project/beautifulsoup4/)
- [cryptography](https://pypi.org/project/cryptography/)
- [dnspython](https://pypi.org/project/dnspython/)
- [mmh3](https://pypi.org/project/mmh3/)
- [python-dotenv](https://pypi.org/project/python-dotenv/)
## Installation
### Via pip *(recommended)*
```bash
# Install from PyPI
pip install httpz_scanner
# The 'httpz' command will now be available in your terminal
httpz --help
```
### From source
```bash
# Clone the repository
git clone https://github.com/acidvegas/httpz
cd httpz
pip install -r requirements.txt
```
## Usage
## CLI usage
### Command Line Interface
Basic usage:
Basic:
```bash
python -m httpz_scanner domains.txt
```
Scan with all flags enabled and output to JSONL:
All fields, JSONL output to stdout and a file:
```bash
python -m httpz_scanner domains.txt -all -c 100 -o results.jsonl -j -p
python -m httpz_scanner domains.txt -all -c 100 -j -o results.jsonl
```
Read from stdin:
```bash
cat domains.txt | python -m httpz_scanner - -all -c 100
echo "example.com" | python -m httpz_scanner - -all
cat domains.txt | python -m httpz_scanner - -all
echo example.com | python -m httpz_scanner - -all
```
Filter by status codes and follow redirects:
Filter by status code:
```bash
python -m httpz_scanner domains.txt -mc 200,301-399 -ec 404,500 -fr -p
python -m httpz_scanner domains.txt -mc 200,301-399 -ec 404,500
```
Show specific fields with custom timeout and resolvers:
Specific fields with custom timeout and resolvers:
```bash
python -m httpz_scanner domains.txt -sc -ti -i -tls -to 10 -r resolvers.txt
```
Full scan with all options:
```bash
python -m httpz_scanner domains.txt -c 100 -o output.jsonl -j -all -to 10 -mc 200,301 -ec 404,500 -p -ax -r resolvers.txt
```
### Distributed Scanning
Split scanning across multiple machines using the `--shard` argument:
### Distributed scanning
Built-in shard mode splits a file across N workers (line-modulo):
```bash
# Machine 1
httpz domains.txt --shard 1/3
# Machine 2
httpz domains.txt --shard 2/3
# Machine 3
httpz domains.txt --shard 3/3
```
Workers can also handle their own line offsetting and feed domains directly to the library — see below.
Each machine will process a different subset of domains without overlap. For example, with 3 shards:
- Machine 1 processes lines 0,3,6,9,...
- Machine 2 processes lines 1,4,7,10,...
- Machine 3 processes lines 2,5,8,11,...
## Library usage
This allows efficient distribution of large scans across multiple machines.
### Python Library
```python
import asyncio
import urllib.request
from httpz_scanner import HTTPZScanner
async def scan_from_list() -> list:
with urllib.request.urlopen('https://example.com/domains.txt') as response:
content = response.read().decode()
return [line.strip() for line in content.splitlines() if line.strip()][:20]
async def scan_from_url():
with urllib.request.urlopen('https://example.com/domains.txt') as response:
for line in response:
if line := line.strip():
yield line.decode().strip()
async def scan_from_file():
with open('domains.txt', 'r') as file:
for line in file:
if line := line.strip():
yield line
async def domain_source():
# Any of: list, async generator, sync generator, file path string, '-'
for d in ['example.com', 'github.com', 'cloudflare.com']:
yield d
async def main():
# Initialize scanner with all possible options (showing defaults)
scanner = HTTPZScanner(
concurrent_limit=100, # Number of concurrent requests
timeout=5, # Request timeout in seconds
follow_redirects=False, # Follow redirects (max 10)
check_axfr=False, # Try AXFR transfer against nameservers
resolver_file=None, # Path to custom DNS resolvers file
output_file=None, # Path to JSONL output file
show_progress=False, # Show progress counter
debug_mode=False, # Show error states and debug info
jsonl_output=False, # Output in JSONL format
shard=None, # Tuple of (shard_index, total_shards) for distributed scanning
# Control which fields to show (all False by default unless show_fields is None)
show_fields={
'status_code': True, # Show status code
'content_type': True, # Show content type
'content_length': True, # Show content length
'title': True, # Show page title
'body': True, # Show body preview
'ip': True, # Show IP addresses
'favicon': True, # Show favicon hash
'headers': True, # Show response headers
'follow_redirects': True, # Show redirect chain
'cname': True, # Show CNAME records
'tls': True # Show TLS certificate info
},
# Filter results
match_codes={200,301,302}, # Only show these status codes
exclude_codes={404,500,503} # Exclude these status codes
concurrent_limit = 100,
timeout = 5,
retries = 1,
retry_backoff = 0.5,
follow_redirects = True,
# Feature toggles — all default OFF
fetch_headers = True,
fetch_content_type = True,
fetch_content_length = True,
fetch_title = True,
fetch_body = True,
fetch_favicon = True,
fetch_tls = True,
fetch_ips = True,
fetch_cname = True, # follow CNAME chain (max 3) and scan the final hop
# Optional filters
match_codes = None, # e.g. {200, 301, 302}
exclude_codes = None, # e.g. {404, 500}
# Optional knobs
custom_headers = None, # {'X-Foo': 'bar'}
post_data = None,
shard = None, # (index, total) — workers usually do this themselves
resolvers = None, # ['1.1.1.1', '8.8.8.8'] for A/AAAA lookups
dns_timeout = 2.0,
)
# Example 1: Process file
print('\nProcessing file:')
async for result in scanner.scan(scan_from_file()):
print(f"{result['domain']}: {result['status']}")
async for result in scanner.scan(domain_source()):
print(result['domain'], result['status'])
# Example 2: Stream URLs
print('\nStreaming URLs:')
async for result in scanner.scan(scan_from_url()):
print(f"{result['domain']}: {result['status']}")
# Example 3: Process list
print('\nProcessing list:')
domains = await scan_from_list()
async for result in scanner.scan(domains):
print(f"{result['domain']}: {result['status']}")
if __name__ == '__main__':
asyncio.run(main())
asyncio.run(main())
```
The scanner accepts various input types:
- File paths (string)
- Lists/tuples of domains
- stdin (using '-')
- Async generators that yield domains
The scanner accepts:
- a file path (string)
- `'-'` for stdin
- a list/tuple of domains
- a sync iterator/generator
- an async generator
All inputs support sharding for distributed scanning using the `shard` parameter.
### Graceful shutdown
## Arguments
Workers receiving SIGTERM (or any orchestrator signal) can drain cleanly:
| Argument | Long Form | Description |
|---------------|------------------|-------------------------------------------------------------|
| `file` | | File containing domains *(one per line)*, use `-` for stdin |
| `-d` | `--debug` | Show error states and debug information |
| `-c N` | `--concurrent N` | Number of concurrent checks *(default: 100)* |
| `-o FILE` | `--output FILE` | Output file path *(JSONL format)* |
| `-j` | `--jsonl` | Output JSON Lines format to console |
| `-all` | `--all-flags` | Enable all output flags |
| `-sh` | `--shard N/T` | Process shard N of T total shards *(e.g., 1/3)* |
```python
async def supervisor(scanner, scan_iterator):
async for result in scan_iterator:
...
### Output Field Flags
scanner = HTTPZScanner(...)
scan_task = asyncio.create_task(supervisor(scanner, scanner.scan(domains)))
| Flag | Long Form | Description |
|--------| ---------------------|----------------------------------|
| `-sc` | `--status-code` | Show status code |
| `-ct` | `--content-type` | Show content type |
| `-ti` | `--title` | Show page title |
| `-b` | `--body` | Show body preview |
| `-i` | `--ip` | Show IP addresses |
| `-f` | `--favicon` | Show favicon hash |
| `-hr` | `--headers` | Show response headers |
| `-cl` | `--content-length` | Show content length |
| `-fr` | `--follow-redirects` | Follow redirects *(max 10)* |
| `-cn` | `--cname` | Show CNAME records |
| `-tls` | `--tls-info` | Show TLS certificate information |
# Later, on shutdown signal:
await scanner.stop() # drops queued domains, lets in-flight finish, exits
await scan_task
```
### Other Options
`stop()` is idempotent and async-safe.
| Option | Long Form | Description |
|-------------|-------------------------|-----------------------------------------------------|
| `-to N` | `--timeout N` | Request timeout in seconds *(default: 5)* |
| `-mc CODES` | `--match-codes CODES` | Only show specific status codes *(comma-separated)* |
| `-ec CODES` | `--exclude-codes CODES` | Exclude specific status codes *(comma-separated)* |
| `-p` | `--progress` | Show progress counter |
| `-ax` | `--axfr` | Try AXFR transfer against nameservers |
| `-r FILE` | `--resolvers FILE` | File containing DNS resolvers *(one per line)* |
## Result schema
Each yielded result is a dict. Fields appear only when their feature toggle is on and data is available.
```jsonc
{
"domain": "example.com",
"url": "https://example.com/",
"status": 200, // -1 on error
"protocol": "https", // or "http"
// -- toggleable fields --
"response_headers": {"Server": "...", ...}, // fetch_headers
"content_type": "text/html; charset=utf-8",
"content_length": 1234,
"redirect_chain": ["https://example.com", "https://www.example.com/"],
"cname_chain": ["example.com", "edge.example.net", "akamai.net"], // up to 3 entries
"title": "Example Domain", // single line, max 1024 chars
"body_preview": "<!doctype html>...", // first 1024 raw bytes, normalized
"body_clean": "Example Domain ...", // HTML-stripped, max 1024 chars
"favicon_hash": "1014476666658474844", // mmh3 64-bit, capped at 256 KB
"ips": ["93.184.216.34", "..."],
"tls": {
"fingerprint": "<sha256 hex>",
"subject": "*.example.com",
"issuer": "DigiCert TLS RSA SHA256 2020 CA1",
"email": null,
"alt_names": ["*.example.com", "example.com"],
"not_before": "2026-01-15T00:00:00",
"not_after": "2027-02-14T23:59:59"
},
// -- only on failure --
"error": "Connection timed out",
"error_type": "TIMEOUT" // CONN | SSL | CERT | TIMEOUT | HTTP | UNKNOWN | PROCESS | TASK | NO_RESPONSE
}
```
## Protocol fallback
- `https://x` → tries https, falls back to http on connection failure
- `http://x` → tries http, falls back to https on connection failure
- `x` (no scheme) → tries https, falls back to http
Any HTTP response (including 4xx/5xx) is accepted — only connection-level errors trigger fallback.
## Retries
`retries` is per protocol, applied only to transient errors (TIMEOUT, CONN, HTTP). Cert errors, DNS failures, and HTTP responses do not retry. Backoff is linear: `retry_backoff * (attempt + 1)`.
## Performance notes for distributed use
- `force_close=True` on the connector — keep-alive is disabled (you're scanning unique hosts).
- TLS cert is captured from the *original* request's connection via a connector subclass, no second handshake per https domain.
- DNS uses `aiodns` + 5-minute in-process cache.
- Bounded internal queue (`concurrent_limit * 2`) keeps memory flat regardless of input size.
- Ensure your worker's `ulimit -n` is high enough for `concurrent_limit * 2` sockets.
## CLI arguments
| Argument | Long form | Description |
|------------------|-------------------------|----------------------------------------------|
| `file` | | Domain file (one per line) or `-` for stdin |
| `-c N` | `--concurrent N` | Concurrent in-flight checks (default 100) |
| `-to N` | `--timeout N` | Request timeout in seconds (default 5) |
| `-rt N` | `--retries N` | Retry attempts per protocol (default 1) |
| `-rb N` | `--retry-backoff N` | Linear backoff base seconds (default 0.5) |
| `-dt N` | `--dns-timeout N` | DNS query timeout (default 2.0) |
| `-fr` | `--follow-redirects` | Follow redirects (max 10) |
| `-r FILE` | `--resolvers FILE` | DNS resolver IP list for IP lookups |
| `-hd "k: v,..."` | `--headers "k: v,..."` | Custom request headers |
| `-pd DATA` | `--post-data DATA` | Send POST with this body |
| `-sh N/T` | `--shard N/T` | Shard `N` of `T` (line-modulo) |
| `-mc CODES` | `--match-codes CODES` | Only show these status codes |
| `-ec CODES` | `--exclude-codes CODES` | Exclude these status codes |
| `-o FILE` | `--output FILE` | Append-write JSONL to file |
| `-j` | `--jsonl` | Print JSONL to stdout |
| `-p` | `--progress` | Show numeric counter alongside output |
| `-d` | `--debug` | Show error states and debug logs |
| `-all` | `--all-flags` | Enable every output field |
### Field flags
| Flag | Long form | Description |
|--------|---------------------|------------------------------|
| `-sc` | `--status-code` | Status code |
| `-ct` | `--content-type` | Content-Type header |
| `-cl` | `--content-length` | Content-Length header |
| `-ti` | `--title` | Page title (≤1024 chars) |
| `-b` | `--body` | body_preview + body_clean |
| `-i` | `--ip` | A/AAAA records |
| `-f` | `--favicon` | mmh3 favicon hash |
| `-hr` | `--show-headers` | Full response headers |
| `-tls` | `--tls-info` | TLS certificate fields |
| `-cn` | `--cname` | CNAME chain (max 3) + scan target hostname |
---
###### Mirrors: [SuperNETs](https://git.supernets.org/acidvegas/) • [GitHub](https://github.com/acidvegas/) • [GitLab](https://gitlab.com/acidvegas/) • [Codeberg](https://codeberg.org/acidvegas/)

View File

@@ -6,4 +6,4 @@ from .colors import Colors
from .scanner import HTTPZScanner
__version__ = '2.1.8'
__version__ = '3.1.1'

View File

@@ -11,102 +11,93 @@ import sys
from datetime import datetime
from . import utils
from .colors import Colors
from .formatters import format_console_output
from .parsers import parse_status_codes, parse_shard
from .scanner import HTTPZScanner
from .utils import SILENT_MODE, info
from .utils import info
def setup_logging(level='INFO', log_to_disk=False):
'''
Setup logging configuration
:param level: Logging level (INFO or DEBUG)
:param log_to_disk: Whether to also log to file
Setup logging configuration.
:param level: logging level (INFO or DEBUG)
:param log_to_disk: also log to logs/httpz.log
'''
class ColoredFormatter(logging.Formatter):
def formatTime(self, record):
dt = datetime.fromtimestamp(record.created)
return f'{Colors.GRAY}{dt.strftime("%m-%d %H:%M")}{Colors.RESET}'
def format(self, record):
return f'{self.formatTime(record)} {record.getMessage()}'
# Setup logging handlers
handlers = []
# Console handler
console = logging.StreamHandler()
console.setFormatter(ColoredFormatter())
handlers.append(console)
# File handler
if log_to_disk:
os.makedirs('logs', exist_ok=True)
file_handler = logging.FileHandler(f'logs/httpz.log')
file_handler = logging.FileHandler('logs/httpz.log')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
handlers.append(file_handler)
# Setup logger
logging.basicConfig(level=getattr(logging, level.upper()), handlers=handlers)
async def main():
parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
# Add arguments
parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags')
parser.add_argument('file', nargs='?', default='-', help='File of domains (one per line), or - for stdin')
parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output fields')
parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks')
parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console')
parser.add_argument('-o', '--output', help='Output file path (JSONL format)')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Concurrent in-flight checks')
parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSONL to stdout')
parser.add_argument('-o', '--output', help='Output file path (JSONL)')
# Output field flags
parser.add_argument('-b', '--body', action='store_true', help='Show body preview')
parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records')
parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length')
parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type')
parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
parser.add_argument('-hr', '--show-headers', action='store_true', help='Show response headers')
parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses')
parser.add_argument('-b', '--body', action='store_true', help='Include body_preview/body_clean')
parser.add_argument('-cl', '--content-length', action='store_true', help='Include content_length')
parser.add_argument('-cn', '--cname', action='store_true', help='Detect CNAME chain (up to 3) and scan the final hop')
parser.add_argument('-ct', '--content-type', action='store_true', help='Include content_type')
parser.add_argument('-f', '--favicon', action='store_true', help='Include favicon hash')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help=f'Follow redirects (max {10})')
parser.add_argument('-hr', '--show-headers', action='store_true', help='Include response headers')
parser.add_argument('-i', '--ip', action='store_true', help='Include resolved A/AAAA IPs')
parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code')
parser.add_argument('-ti', '--title', action='store_true', help='Show page title')
parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information')
# Other arguments
parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers')
parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)')
parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)')
parser.add_argument('-ti', '--title', action='store_true', help='Include page title')
parser.add_argument('-tls', '--tls-info', action='store_true', help='Include TLS certificate info')
# Tunables
parser.add_argument('-rt', '--retries', type=int, default=1, help='Retry attempts per protocol on transient errors')
parser.add_argument('-rb', '--retry-backoff', type=float, default=0.5, help='Linear backoff base seconds between retries')
# Filters / misc
parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (e.g., 404,500)')
parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (e.g., 200,301,404)')
parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter')
parser.add_argument('-pd', '--post-data', help='Send POST request with this data')
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)')
parser.add_argument('-r', '--resolvers', help='File of DNS resolvers (one per line) for IP lookups')
parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds')
# Add shard argument
parser.add_argument('-sh','--shard', type=parse_shard, help='Shard index and total shards (e.g., 1/3)')
parser.add_argument('-dt', '--dns-timeout', type=float, default=2.0, help='DNS query timeout in seconds')
parser.add_argument('-sh', '--shard', type=parse_shard, help='Shard index/total (e.g., 1/3)')
parser.add_argument('-hd', '--headers', help='Custom headers ("H1: v1,H2: v2")')
# Add this to the argument parser section
parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")')
# Add these arguments in the parser section
parser.add_argument('-hd', '--headers', help='Custom headers to send with each request (format: "Header1: value1,Header2: value2")')
# If no arguments provided, print help and exit
if len(sys.argv) == 1:
parser.print_help()
sys.exit(0)
args = parser.parse_args()
# Setup logging based on arguments
global SILENT_MODE
SILENT_MODE = args.jsonl
# SILENT_MODE controls library log helpers; mutate the module attribute, not a local global.
utils.SILENT_MODE = args.jsonl
if not SILENT_MODE:
if not utils.SILENT_MODE:
if args.debug:
setup_logging(level='DEBUG', log_to_disk=True)
else:
@@ -117,7 +108,6 @@ async def main():
else:
info(f'Processing file: {args.file}')
# Setup show_fields
show_fields = {
'status_code' : args.all_flags or args.status_code,
'content_type' : args.all_flags or args.content_type,
@@ -128,63 +118,78 @@ async def main():
'favicon' : args.all_flags or args.favicon,
'headers' : args.all_flags or args.show_headers,
'follow_redirects' : args.all_flags or args.follow_redirects,
'tls' : args.all_flags or args.tls_info,
'cname' : args.all_flags or args.cname,
'tls' : args.all_flags or args.tls_info
}
# If no fields specified show all
if not any(show_fields.values()):
show_fields = {k: True for k in show_fields}
resolvers = None
if args.resolvers:
try:
with open(args.resolvers) as f:
resolvers = [line.strip() for line in f if line.strip()]
except Exception as e:
logging.error(f'Failed to load resolvers from {args.resolvers}: {e}')
sys.exit(1)
custom_headers = None
if args.headers:
custom_headers = dict(h.split(': ', 1) for h in args.headers.split(','))
out_fh = open(args.output, 'a', buffering=1) if args.output else None
try:
scanner = HTTPZScanner(
concurrent_limit=args.concurrent,
timeout=args.timeout,
follow_redirects=args.all_flags or args.follow_redirects,
check_axfr=args.axfr,
resolver_file=args.resolvers,
output_file=args.output,
show_progress=args.progress,
debug_mode=args.debug,
jsonl_output=args.jsonl,
show_fields=show_fields,
match_codes=args.match_codes,
exclude_codes=args.exclude_codes,
shard=args.shard,
paths=args.paths.split(',') if args.paths else None,
custom_headers=dict(h.split(': ', 1) for h in args.headers.split(',')) if args.headers else None,
post_data=args.post_data
concurrent_limit = args.concurrent,
timeout = args.timeout,
retries = args.retries,
retry_backoff = args.retry_backoff,
follow_redirects = args.all_flags or args.follow_redirects,
fetch_headers = show_fields['headers'],
fetch_content_type = show_fields['content_type'],
fetch_content_length = show_fields['content_length'],
fetch_title = show_fields['title'],
fetch_body = show_fields['body'],
fetch_favicon = show_fields['favicon'],
fetch_tls = show_fields['tls'],
fetch_ips = show_fields['ip'],
fetch_cname = show_fields['cname'],
match_codes = args.match_codes,
exclude_codes = args.exclude_codes,
custom_headers = custom_headers,
post_data = args.post_data,
shard = args.shard,
resolvers = resolvers,
dns_timeout = args.dns_timeout,
)
count = 0
async for result in scanner.scan(args.file):
# Write to output file if specified
if args.output:
with open(args.output, 'a') as f:
f.write(json.dumps(result) + '\n')
f.flush() # Ensure file output is immediate
# Handle JSON output separately
if out_fh is not None:
out_fh.write(json.dumps(result) + '\n')
if args.jsonl:
print(json.dumps(result), flush=True) # Force flush
print(json.dumps(result), flush=True)
continue
# Only output and increment counter if we have content to show for normal output
formatted = format_console_output(result, args.debug, show_fields, args.match_codes, args.exclude_codes)
if formatted:
if args.progress:
count += 1
info(f"[{count}] {formatted}")
sys.stdout.flush() # Force flush after each domain
info(f'[{count}] {formatted}')
sys.stdout.flush()
else:
print(formatted, flush=True) # Force flush
print(formatted, flush=True)
except KeyboardInterrupt:
logging.warning('Process interrupted by user')
sys.exit(1)
except Exception as e:
logging.error(f'Unexpected error: {str(e)}')
logging.error(f'Unexpected error: {e}')
sys.exit(1)
finally:
if out_fh is not None:
out_fh.close()
def run():
@@ -192,6 +197,5 @@ def run():
asyncio.run(main())
if __name__ == '__main__':
run()
run()

View File

@@ -1,116 +0,0 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/dns.py
import asyncio
import os
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
try:
import dns.asyncresolver
import dns.query
import dns.resolver
import dns.zone
except ImportError:
raise ImportError('missing dnspython library (pip install dnspython)')
from .utils import debug, info, SILENT_MODE
async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple:
'''
Resolve all DNS records for a domain
:param domain: Domain to resolve
:param timeout: Timeout in seconds
:param nameserver: Specific nameserver to use
:param check_axfr: Whether to attempt zone transfer
'''
# Setup resolver
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
if nameserver:
resolver.nameservers = [nameserver]
# Resolve all DNS records
results = await asyncio.gather(*[resolver.resolve(domain, rtype) for rtype in ('NS', 'A', 'AAAA', 'CNAME')], return_exceptions=True)
# Parse results
nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else []
ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
# Get NS IPs
ns_ips = {}
if nameservers:
ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) for ns in nameservers for rtype in ('A', 'AAAA')], return_exceptions=True)
for i, ns in enumerate(nameservers):
ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] if isinstance(records, dns.resolver.Answer) for ip in records]
# Attempt zone transfer
if check_axfr:
await attempt_axfr(domain, ns_ips, timeout)
return sorted(set(ips)), cname, nameservers, ns_ips
async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
'''
Attempt zone transfer for a domain
:param domain: Domain to attempt AXFR transfer
:param ns_ips: Dictionary of nameserver hostnames to their IPs
:param timeout: Timeout in seconds
'''
try:
os.makedirs('axfrout', exist_ok=True)
# Loop through each NS
for ns_host, ips in ns_ips.items():
# Loop through each NS IP
for ns_ip in ips:
try:
# Attempt zone transfer
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
# Write zone to file
with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f:
zone.to_text(f)
info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})')
except Exception as e:
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e:
debug(f'Failed AXFR for {domain}: {str(e)}')
async def load_resolvers(resolver_file: str = None) -> list:
'''
Load DNS resolvers from file or default source
:param resolver_file: Path to file containing resolver IPs
'''
# Load from file
if resolver_file:
try:
with open(resolver_file) as f:
resolvers = [line.strip() for line in f if line.strip()]
if resolvers:
return resolvers
except Exception as e:
debug(f'Error loading resolvers from {resolver_file}: {str(e)}')
# Load from GitHub
async with aiohttp.ClientSession() as session:
async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
resolvers = await response.text()
if not SILENT_MODE:
info(f'Loaded {len(resolvers.splitlines()):,} resolvers.')
return [resolver.strip() for resolver in resolvers.splitlines()]

View File

@@ -8,24 +8,26 @@ from .utils import human_size
def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
'''
Format the output with colored sections
:param result: Dictionary containing domain check results
:param debug: Whether to show error states
:param show_fields: Dictionary of fields to show
:param match_codes: Set of status codes to match
:param exclude_codes: Set of status codes to exclude
Format a result dict into a colored single-line console string.
:param result: result dict from HTTPZScanner
:param debug: include error rows when True
:param show_fields: dict toggling which fields to render
:param match_codes: only render rows whose status is in this set
:param exclude_codes: skip rows whose status is in this set
'''
if result['status'] < 0 and not debug:
return ''
if match_codes and result['status'] not in match_codes:
return ''
if exclude_codes and result['status'] in exclude_codes:
return ''
show_fields = show_fields or {}
parts = []
# Status code
if show_fields.get('status_code'):
if result['status'] < 0:
@@ -37,80 +39,68 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
else:
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
parts.append(status)
# Domain/URL
parts.append(f"[{result['url']}]")
# Content Type
# URL / domain
parts.append(f"[{result.get('url') or result.get('domain')}]")
# Error (when debug)
if result['status'] < 0 and result.get('error'):
parts.append(f"{Colors.RED}[{result.get('error_type','')}: {result['error']}]{Colors.RESET}")
if show_fields.get('content_type') and result.get('content_type'):
parts.append(f"{Colors.CYAN}[{result['content_type']}]{Colors.RESET}")
# Content Length
if show_fields.get('content_length') and result.get('content_length'):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# Title
if show_fields.get('content_length') and result.get('content_length') is not None:
try:
size = human_size(int(result['content_length']))
parts.append(f"{Colors.PINK}[{size}]{Colors.RESET}")
except (ValueError, TypeError):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
if show_fields.get('title') and result.get('title'):
parts.append(f"{Colors.DARK_GREEN}[{result['title']}]{Colors.RESET}")
# Body preview
if show_fields.get('body') and result.get('body'):
body = result['body'][:100] + ('...' if len(result['body']) > 100 else '')
parts.append(f"{Colors.BLUE}[{body}]{Colors.RESET}")
# IPs
if show_fields.get('ip') and result.get('ips'):
ips_text = ', '.join(result['ips'])
parts.append(f"{Colors.YELLOW}[{ips_text}]{Colors.RESET}")
# Favicon hash
if show_fields.get('body'):
if result.get('body_clean'):
preview = result['body_clean'][:100] + ('...' if len(result['body_clean']) > 100 else '')
parts.append(f"{Colors.BLUE}[{preview}]{Colors.RESET}")
elif result.get('body_preview'):
preview = result['body_preview'][:100] + ('...' if len(result['body_preview']) > 100 else '')
parts.append(f"{Colors.BLUE}[{preview}]{Colors.RESET}")
if show_fields.get('cname') and result.get('cname_chain'):
parts.append(f"{Colors.PURPLE}[CNAME: {' -> '.join(result['cname_chain'])}]{Colors.RESET}")
if show_fields.get('ip') and result.get('ips'):
parts.append(f"{Colors.YELLOW}[{', '.join(result['ips'])}]{Colors.RESET}")
if show_fields.get('favicon') and result.get('favicon_hash'):
parts.append(f"{Colors.PURPLE}[{result['favicon_hash']}]{Colors.RESET}")
# Headers
if show_fields.get('headers') and result.get('response_headers'):
headers_text = [f"{k}: {v}" for k, v in result['response_headers'].items()]
parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}")
else:
if show_fields.get('content_type') and result.get('content_type'):
parts.append(f"{Colors.HEADER}[{result['content_type']}]{Colors.RESET}")
if show_fields.get('content_length') and result.get('content_length'):
try:
size = human_size(int(result['content_length']))
parts.append(f"{Colors.PINK}[{size}]{Colors.RESET}")
except (ValueError, TypeError):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
headers_text = ', '.join(f'{k}: {v}' for k, v in result['response_headers'].items())
parts.append(f"{Colors.CYAN}[{headers_text}]{Colors.RESET}")
# Redirect Chain
if show_fields.get('follow_redirects') and result.get('redirect_chain'):
chain = ' -> '.join(result['redirect_chain'])
parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}")
# CNAME
if show_fields.get('cname') and result.get('cname'):
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# TLS Certificate Info
if show_fields.get('tls') and result.get('tls'):
cert = result['tls']
tls_parts = []
if cert.get('common_name'):
tls_parts.append(f"Subject: {cert['common_name']}")
if cert.get('subject'):
tls_parts.append(f"Subject: {cert['subject']}")
if cert.get('issuer'):
tls_parts.append(f"Issuer: {cert['issuer']}")
if cert.get('email'):
tls_parts.append(f"Email: {cert['email']}")
if cert.get('fingerprint'):
tls_parts.append(f"Fingerprint: {cert['fingerprint'][:16]}...")
if cert.get('alt_names'):
tls_parts.append(f"SANs: {', '.join(cert['alt_names'][:3])}")
if cert.get('not_before') and cert.get('not_after'):
tls_parts.append(f"Valid: {cert['not_before'].split('T')[0]} to {cert['not_after'].split('T')[0]}")
if cert.get('version'):
tls_parts.append(f"Version: {cert['version']}")
if cert.get('serial_number'):
tls_parts.append(f"Serial: {cert['serial_number'][:16]}...")
if tls_parts: # Only add TLS info if we have any parts
if tls_parts:
parts.append(f"{Colors.GREEN}[{' | '.join(tls_parts)}]{Colors.RESET}")
return ' '.join(parts)
return ' '.join(parts)

View File

@@ -3,6 +3,8 @@
# httpz_scanner/parsers.py
import argparse
import re
import urllib.parse
try:
import bs4
@@ -12,7 +14,7 @@ except ImportError:
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.x509.oid import NameOID
from cryptography.x509.oid import NameOID, ExtensionOID
except ImportError:
raise ImportError('missing cryptography module (pip install cryptography)')
@@ -24,128 +26,241 @@ except ImportError:
from .utils import debug, error
_WS_RE = re.compile(r'\s+')
TITLE_MAX_CHARS = 1024
BODY_PREVIEW_BYTES = 1024
BODY_CLEAN_CHARS = 1024
MAX_BODY_BYTES = 1024 * 1024 # hard cap on response body read
FAVICON_MAX_BYTES = 256 * 1024 # hard cap on favicon read
def parse_domain_url(domain: str) -> tuple:
'''
Parse domain string into base domain, port, and protocol list
:param domain: Raw domain string to parse
Parse a raw domain string into (base_domain, port, ordered_protocol_list).
Protocol order:
- explicit https:// → ['https', 'http']
- explicit http:// → ['http', 'https']
- no scheme → ['https', 'http']
:param domain: raw domain string
'''
raw = domain.strip().rstrip('/')
port = None
base_domain = domain.rstrip('/')
if base_domain.startswith(('http://', 'https://')):
protocol = 'https://' if base_domain.startswith('https://') else 'http://'
base_domain = base_domain.split('://', 1)[1]
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
try:
port = int(port_str.split('/')[0])
except ValueError:
port = None
if raw.startswith('https://'):
protocols = ['https', 'http']
rest = raw[len('https://'):]
elif raw.startswith('http://'):
protocols = ['http', 'https']
rest = raw[len('http://'):]
else:
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else None
protocols = ['http://', 'https://'] # Always try HTTP first
protocols = ['https', 'http']
rest = raw
host_part = rest.split('/', 1)[0]
if ':' in host_part:
host, port_str = host_part.rsplit(':', 1)
if port_str.isdigit():
port = int(port_str)
base_domain = host
else:
base_domain = host_part
else:
base_domain = host_part
return base_domain, port, protocols
async def get_cert_info(ssl_object, url: str) -> dict:
def _normalize_text(text: str) -> str:
'''Collapse all runs of whitespace (including newlines) into single spaces and strip.'''
if not text:
return ''
return _WS_RE.sub(' ', text).strip()
def parse_title(html: str, content_type: str = None) -> str:
'''
Get SSL certificate information for a domain
:param ssl_object: SSL object to get certificate info from
:param url: URL to get certificate info from
Extract the page title as a single line, max TITLE_MAX_CHARS.
:param html: HTML content
:param content_type: Content-Type header value (used to skip non-HTML)
'''
try:
if not ssl_object or not (cert_der := ssl_object.getpeercert(binary_form=True)):
if content_type and not any(x in content_type.lower() for x in ('text/html', 'application/xhtml')):
return None
try:
soup = bs4.BeautifulSoup(html, 'html.parser')
if soup.title and soup.title.string:
title = _normalize_text(soup.title.string)
return title[:TITLE_MAX_CHARS] if title else None
except Exception as e:
debug(f'Error parsing title: {e}')
return None
def body_preview(raw_bytes: bytes, encoding: str = 'utf-8') -> str:
'''
Decode the first BODY_PREVIEW_BYTES bytes of the raw body, normalize whitespace.
:param raw_bytes: raw response body
:param encoding: encoding to attempt for decoding
'''
if not raw_bytes:
return None
chunk = raw_bytes[:BODY_PREVIEW_BYTES]
try:
text = chunk.decode(encoding, errors='replace')
except Exception:
text = chunk.decode('utf-8', errors='replace')
text = _normalize_text(text)
return text or None
def body_clean(html: str) -> str:
'''
Strip HTML/script/style, normalize whitespace, return first BODY_CLEAN_CHARS chars.
:param html: HTML content
'''
if not html:
return None
try:
soup = bs4.BeautifulSoup(html, 'html.parser')
for tag in soup(('script', 'style', 'noscript')):
tag.decompose()
text = soup.get_text(separator=' ')
except Exception as e:
debug(f'Error cleaning body: {e}')
return None
text = _normalize_text(text)
if not text:
return None
return text[:BODY_CLEAN_CHARS]
def parse_cert(ssl_object) -> dict:
'''
Parse a TLS certificate from a live ssl_object captured on the connected socket.
:param ssl_object: SSLObject from the live connection (via TraceConfig hook)
'''
try:
if ssl_object is None:
return None
cert_der = ssl_object.getpeercert(binary_form=True)
if not cert_der:
return None
cert = x509.load_der_x509_certificate(cert_der)
try:
san_extension = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
alt_names = [name.value for name in san_extension.value] if san_extension else []
except x509.extensions.ExtensionNotFound:
san_ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
alt_names = [name.value for name in san_ext.value]
except x509.ExtensionNotFound:
alt_names = []
try:
common_name = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except IndexError:
common_name = None
def _attr(subject_or_issuer, oid):
attrs = subject_or_issuer.get_attributes_for_oid(oid)
return attrs[0].value if attrs else None
try:
issuer = cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except IndexError:
issuer = None
common_name = _attr(cert.subject, NameOID.COMMON_NAME)
issuer = _attr(cert.issuer, NameOID.COMMON_NAME)
# Email: prefer subject EMAIL_ADDRESS attribute, fall back to rfc822Name in SANs.
email = _attr(cert.subject, NameOID.EMAIL_ADDRESS)
if not email:
try:
rfc822 = san_ext.value.get_values_for_type(x509.RFC822Name)
if rfc822:
email = rfc822[0]
except Exception:
pass
not_before = getattr(cert, 'not_valid_before_utc', None) or cert.not_valid_before
not_after = getattr(cert, 'not_valid_after_utc', None) or cert.not_valid_after
return {
'fingerprint' : cert.fingerprint(hashes.SHA256()).hex(),
'common_name' : common_name,
'issuer' : issuer,
'alt_names' : alt_names,
'not_before' : cert.not_valid_before_utc.isoformat(),
'not_after' : cert.not_valid_after_utc.isoformat(),
'version' : cert.version.value,
'serial_number' : format(cert.serial_number, 'x'),
'fingerprint' : cert.fingerprint(hashes.SHA256()).hex(),
'subject' : common_name,
'issuer' : issuer,
'email' : email,
'alt_names' : alt_names,
'not_before' : not_before.isoformat(),
'not_after' : not_after.isoformat(),
}
except Exception as e:
error(f'Error getting cert info for {url}: {str(e)}')
debug(f'Error parsing cert: {e}')
return None
async def get_favicon_hash(session, base_url: str, html: str) -> str:
async def get_favicon_hash(session, base_url: str, html: str, timeout: float = 5.0) -> str:
'''
Get favicon hash from a webpage
:param session: aiohttp client session
:param base_url: base URL of the website
:param html: HTML content of the page
Fetch the favicon (capped to FAVICON_MAX_BYTES) and return its mmh3 64-bit hash as a string.
:param session: aiohttp ClientSession
:param base_url: base URL of the page (scheme + host)
:param html: HTML content (used to discover <link rel="icon">)
:param timeout: request timeout in seconds
'''
try:
soup = bs4.BeautifulSoup(html, 'html.parser')
favicon_url = None
for link in soup.find_all('link'):
if link.get('rel') and any(x.lower() == 'icon' for x in link.get('rel')):
favicon_url = link.get('href')
break
try:
soup = bs4.BeautifulSoup(html, 'html.parser')
for link in soup.find_all('link'):
rels = link.get('rel') or []
if any(r.lower() == 'icon' for r in rels):
favicon_url = link.get('href')
break
except Exception:
pass
if not favicon_url:
favicon_url = '/favicon.ico'
if favicon_url.startswith('//'):
favicon_url = 'https:' + favicon_url
elif favicon_url.startswith('/'):
favicon_url = base_url + favicon_url
elif not favicon_url.startswith(('http://', 'https://')):
favicon_url = base_url + '/' + favicon_url
async with session.get(favicon_url, timeout=10) as response:
if response.status == 200:
content = (await response.read())[:1024*1024]
hash_value = mmh3.hash64(content)[0]
if hash_value != 0:
return str(hash_value)
favicon_url = urllib.parse.urljoin(base_url, favicon_url)
try:
import aiohttp
client_timeout = aiohttp.ClientTimeout(total=timeout)
except ImportError:
client_timeout = timeout
async with session.get(favicon_url, timeout=client_timeout, ssl=False) as response:
if response.status != 200:
return None
content = b''
async for chunk in response.content.iter_chunked(8192):
content += chunk
if len(content) >= FAVICON_MAX_BYTES:
content = content[:FAVICON_MAX_BYTES]
break
if not content:
return None
hash_value = mmh3.hash64(content)[0]
return str(hash_value) if hash_value != 0 else None
except Exception as e:
debug(f'Error getting favicon for {base_url}: {str(e)}')
return None
debug(f'Error getting favicon for {base_url}: {e}')
return None
def parse_status_codes(codes_str: str) -> set:
'''
Parse comma-separated status codes and ranges into a set of integers
:param codes_str: Comma-separated status codes (e.g., "200,301-399,404,500-503")
Parse comma-separated status codes and ranges into a set of ints.
:param codes_str: e.g. "200,301-399,404,500-503"
'''
codes = set()
try:
for part in codes_str.split(','):
@@ -161,37 +276,15 @@ def parse_status_codes(codes_str: str) -> set:
def parse_shard(shard_str: str) -> tuple:
'''
Parse shard argument in format INDEX/TOTAL
:param shard_str: Shard string in format "INDEX/TOTAL"
Parse a shard argument in the form "INDEX/TOTAL" (1-based index).
:param shard_str: shard string "INDEX/TOTAL"
'''
try:
shard_index, total_shards = map(int, shard_str.split('/'))
if shard_index < 1 or total_shards < 1 or shard_index > total_shards:
raise ValueError
return shard_index - 1, total_shards # Convert to 0-based index
return shard_index - 1, total_shards
except (ValueError, TypeError):
raise argparse.ArgumentTypeError('Shard must be in format INDEX/TOTAL where INDEX <= TOTAL')
def parse_title(html: str, content_type: str = None) -> str:
'''
Parse title from HTML content
:param html: HTML content of the page
:param content_type: Content-Type header value
'''
# Only parse title for HTML content
if content_type and not any(x in content_type.lower() for x in ['text/html', 'application/xhtml']):
return None
try:
soup = bs4.BeautifulSoup(html, 'html.parser', from_encoding='utf-8', features='lxml')
if title := soup.title:
return title.string.strip()
except:
pass
return None
raise argparse.ArgumentTypeError('Shard must be in format INDEX/TOTAL where INDEX <= TOTAL')

View File

@@ -3,291 +3,462 @@
# httpz_scanner/scanner.py
import asyncio
import contextvars
import random
import urllib.parse
import json
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp module (pip install aiohttp)')
try:
import bs4
except ImportError:
raise ImportError('missing bs4 module (pip install beautifulsoup4)')
from .dns import resolve_all_dns, load_resolvers
from .parsers import parse_domain_url, get_cert_info, get_favicon_hash
from .utils import debug, USER_AGENTS, input_generator
# Per-task bucket for capturing the ssl_object from the live aiohttp connection.
# Set on the request side, populated by the connector subclass on connection create.
_ssl_capture: contextvars.ContextVar = contextvars.ContextVar('_httpz_ssl_capture', default=None)
class _CertCapturingConnector(aiohttp.TCPConnector):
'''
TCPConnector subclass that captures the live ssl_object on every newly-created
connection into the calling task's _ssl_capture bucket. Used to grab the peer
cert without opening a second TLS handshake per https domain.
'''
async def _wrap_create_connection(self, *args, **kwargs):
transport, proto = await super()._wrap_create_connection(*args, **kwargs)
bucket = _ssl_capture.get()
if bucket is not None:
ssl_obj = transport.get_extra_info('ssl_object')
if ssl_obj is not None:
bucket['ssl_object'] = ssl_obj
return transport, proto
from .parsers import (
parse_domain_url,
parse_cert,
get_favicon_hash,
parse_title,
body_preview,
body_clean,
MAX_BODY_BYTES,
)
from .utils import debug, USER_AGENTS, input_generator, resolve_ips, resolve_cname_chain
# Hard cap on CNAME chain length (including the original hostname).
MAX_CNAME_CHAIN = 3
# Errors that are worth retrying / falling back on. Cert errors fall back but don't retry.
_TRANSIENT_ERROR_TYPES = ('TIMEOUT', 'CONN', 'HTTP')
_FALLBACK_ERROR_TYPES = ('TIMEOUT', 'CONN', 'SSL', 'CERT', 'HTTP', 'UNKNOWN')
class HTTPZScanner:
'''Core scanner class for HTTP domain checking'''
def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None, paths = None, custom_headers=None, post_data=None):
def __init__(
self,
concurrent_limit = 100,
timeout = 5,
retries = 1,
retry_backoff = 0.5,
max_redirects = 10,
follow_redirects = True,
# feature toggles (all default OFF)
fetch_headers = False,
fetch_content_type = False,
fetch_content_length = False,
fetch_title = False,
fetch_body = False,
fetch_favicon = False,
fetch_tls = False,
fetch_ips = False,
fetch_cname = False,
# filtering / misc
match_codes = None,
exclude_codes = None,
custom_headers = None,
post_data = None,
shard = None,
resolvers = None,
dns_timeout = 2.0,
):
'''
Initialize the HTTPZScanner class
:param concurrent_limit: Maximum number of concurrent requests
:param timeout: Request timeout in seconds
:param follow_redirects: Follow redirects
:param check_axfr: Check for AXFR
:param resolver_file: Path to resolver file
:param output_file: Path to output file
:param show_progress: Show progress bar
:param debug_mode: Enable debug mode
:param jsonl_output: Output in JSONL format
:param show_fields: Fields to show
:param match_codes: Status codes to match
:param exclude_codes: Status codes to exclude
:param shard: Tuple of (shard_index, total_shards) for distributed scanning
:param paths: List of additional paths to check on each domain
:param custom_headers: Dictionary of custom headers to send with each request
:param post_data: Data to send with POST requests
:param concurrent_limit: max in-flight domain checks
:param timeout: per-request timeout in seconds
:param retries: retry attempts per protocol on transient errors (after the first try)
:param retry_backoff: base seconds for linear backoff between retries
:param max_redirects: redirect chain cap when follow_redirects is True
:param follow_redirects: whether aiohttp follows redirects
:param fetch_headers: include response_headers in result
:param fetch_content_type: include content_type in result
:param fetch_content_length: include content_length in result
:param fetch_title: include title in result (requires body read)
:param fetch_body: include body_preview and body_clean
:param fetch_favicon: include favicon_hash (extra HTTP request)
:param fetch_tls: include tls cert info (https only)
:param fetch_ips: include resolved A/AAAA in result
:param fetch_cname: detect CNAME chain (up to MAX_CNAME_CHAIN hostnames),
scan against the final hop, and attach `cname_chain` to the result
:param match_codes: only yield results with these status codes
:param exclude_codes: skip results with these status codes
:param custom_headers: dict of extra headers
:param post_data: if set, send POST with this body
:param shard: (index, total) for distributed scanning
:param resolvers: optional list of DNS resolver IPs (used for fetch_ips)
:param dns_timeout: per-query DNS timeout in seconds
'''
self.concurrent_limit = concurrent_limit
self.timeout = timeout
self.retries = retries
self.retry_backoff = retry_backoff
self.max_redirects = max_redirects
self.follow_redirects = follow_redirects
self.check_axfr = check_axfr
self.resolver_file = resolver_file
self.output_file = output_file
self.show_progress = show_progress
self.debug_mode = debug_mode
self.jsonl_output = jsonl_output
self.shard = shard
self.paths = paths or []
self.custom_headers = custom_headers or {}
self.post_data = post_data
self.show_fields = show_fields or {
'status_code' : True,
'content_type' : True,
'content_length' : True,
'title' : True,
'body' : True,
'ip' : True,
'favicon' : True,
'headers' : True,
'follow_redirects' : True,
'cname' : True,
'tls' : True
self.fetch_headers = fetch_headers
self.fetch_content_type = fetch_content_type
self.fetch_content_length = fetch_content_length
self.fetch_title = fetch_title
self.fetch_body = fetch_body
self.fetch_favicon = fetch_favicon
self.fetch_tls = fetch_tls
self.fetch_ips = fetch_ips
self.fetch_cname = fetch_cname
self.match_codes = match_codes
self.exclude_codes = exclude_codes
self.custom_headers = custom_headers or {}
self.post_data = post_data
self.shard = shard
self.resolvers = resolvers
self.dns_timeout = dns_timeout
self._needs_body = fetch_title or fetch_body or fetch_favicon
self._stop_event = None # set in scan(), used by stop()
def _make_connector(self) -> aiohttp.TCPConnector:
'''
Build the TCP connector. Uses _CertCapturingConnector when fetch_tls is on
so the peer cert can be parsed from the live ssl_object — no second handshake.
'''
kwargs = {
'ssl' : False,
'limit' : self.concurrent_limit * 2,
'limit_per_host': 0,
'ttl_dns_cache' : 300,
'use_dns_cache' : True,
'force_close' : True, # unique-host scan: keep-alive is wasted FDs
}
try:
import aiodns # noqa: F401
from aiohttp.resolver import AsyncResolver
kwargs['resolver'] = AsyncResolver()
except ImportError:
pass
cls = _CertCapturingConnector if self.fetch_tls else aiohttp.TCPConnector
return cls(**kwargs)
async def stop(self):
'''
Signal the scan loop to drain. The producer is cancelled, no new domains
are pulled from the queue, and in-flight requests are awaited (or cancelled
when the session exits). Idempotent.
'''
if self._stop_event is not None:
self._stop_event.set()
@staticmethod
def _err_result(domain: str, protocol: str, err_type: str, message: str) -> dict:
return {
'domain' : domain,
'protocol' : protocol,
'status' : -1,
'error' : message,
'error_type' : err_type,
}
self.match_codes = match_codes
self.exclude_codes = exclude_codes
self.resolvers = None
self.processed_domains = 0
self.progress_count = 0
@staticmethod
def _classify_exception(exc: BaseException):
'''Map an aiohttp/asyncio exception to (error_type, message).'''
if isinstance(exc, asyncio.TimeoutError):
return 'TIMEOUT', 'Connection timed out'
if isinstance(exc, aiohttp.ClientConnectorCertificateError):
return 'CERT', f'Certificate Error: {exc}'
if isinstance(exc, aiohttp.ClientSSLError):
return 'SSL', f'SSL Error: {exc}'
if isinstance(exc, aiohttp.ClientConnectorError):
return 'CONN', f'Connection Failed: {exc}'
if isinstance(exc, aiohttp.ClientError):
return 'HTTP', f'HTTP Error: {exc.__class__.__name__}: {exc}'
return 'UNKNOWN', f'Error: {exc.__class__.__name__}: {exc}'
async def check_domain(self, session: aiohttp.ClientSession, domain: str):
'''Check a single domain and return results'''
base_domain, port, protocols = parse_domain_url(domain)
for protocol in protocols:
url = f'{protocol}{base_domain}'
if port:
url += f':{port}'
try:
debug(f'Trying {url}...')
result = await self._check_url(session, url)
debug(f'Got result for {url}: {result}')
if result and (result['status'] != 400 or result.get('redirect_chain')): # Accept redirects
return result
except Exception as e:
debug(f'Error checking {url}: {str(e)}')
continue
return None
async def _check_url(self, session: aiohttp.ClientSession, url: str, protocol: str, domain: str) -> dict:
'''Single attempt against a URL. Returns a result dict (success or error shape).'''
headers = {'User-Agent': random.choice(USER_AGENTS)}
headers.update(self.custom_headers)
method = 'POST' if self.post_data is not None else 'GET'
timeout = aiohttp.ClientTimeout(total=self.timeout)
# Set up a per-task bucket that the cert-capturing connector will fill
# in with the live ssl_object during connection creation.
ssl_bucket = {} if self.fetch_tls and protocol == 'https' else None
token = _ssl_capture.set(ssl_bucket) if ssl_bucket is not None else None
async def _check_url(self, session: aiohttp.ClientSession, url: str):
'''Check a single URL and return results'''
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
headers.update(self.custom_headers)
debug(f'Making request to {url} with headers: {headers}')
async with session.request('GET', url,
timeout=self.timeout,
allow_redirects=True, # Always follow redirects
max_redirects=10,
ssl=False, # Don't verify SSL
headers=headers) as response:
debug(f'Got response from {url}: status={response.status}, headers={dict(response.headers)}')
async with session.request(
method,
url,
data = self.post_data,
timeout = timeout,
allow_redirects = self.follow_redirects,
max_redirects = self.max_redirects,
ssl = False,
headers = headers,
) as response:
debug(f'{url} -> {response.status}')
result = {
'domain': urllib.parse.urlparse(url).hostname,
'status': response.status,
'url': str(response.url),
'response_headers': dict(response.headers)
'domain' : domain,
'url' : str(response.url),
'status' : response.status,
'protocol' : protocol,
}
if self.fetch_headers:
result['response_headers'] = dict(response.headers)
if self.fetch_content_type:
result['content_type'] = response.headers.get('Content-Type')
if self.fetch_content_length:
cl = response.headers.get('Content-Length')
if cl is not None:
try:
result['content_length'] = int(cl)
except ValueError:
result['content_length'] = cl
if response.history:
result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
debug(f'Redirect chain for {url}: {result["redirect_chain"]}')
# TLS cert: parsed from ssl_object captured by the connector
# subclass during connection creation — no extra handshake.
if ssl_bucket is not None:
cert = parse_cert(ssl_bucket.get('ssl_object'))
if cert:
result['tls'] = cert
# Body read (capped)
raw_body = None
if self._needs_body:
try:
raw_body = await response.content.read(MAX_BODY_BYTES)
except Exception as e:
debug(f'Body read error for {url}: {e}')
raw_body = None
if raw_body is not None:
encoding = response.charset or 'utf-8'
if self.fetch_body:
result['body_preview'] = body_preview(raw_body, encoding=encoding)
if self.fetch_title or self.fetch_body or self.fetch_favicon:
try:
html_text = raw_body.decode(encoding, errors='replace')
except Exception:
html_text = raw_body.decode('utf-8', errors='replace')
if self.fetch_body:
result['body_clean'] = body_clean(html_text)
if self.fetch_title:
ct = response.headers.get('Content-Type')
title = parse_title(html_text, ct)
if title:
result['title'] = title
if self.fetch_favicon:
parsed = urllib.parse.urlparse(str(response.url))
base = f'{parsed.scheme}://{parsed.netloc}'
fav = await get_favicon_hash(session, base, html_text, timeout=self.timeout)
if fav:
result['favicon_hash'] = fav
return result
except aiohttp.ClientSSLError as e:
debug(f'SSL Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'SSL Error: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'SSL'
}
except aiohttp.ClientConnectorCertificateError as e:
debug(f'Certificate Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Certificate Error: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'CERT'
}
except aiohttp.ClientConnectorError as e:
debug(f'Connection Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Connection Failed: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'CONN'
}
except aiohttp.ClientError as e:
debug(f'HTTP Error for {url}: {e.__class__.__name__}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'HTTP Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'HTTP'
}
except asyncio.TimeoutError:
debug(f'Timeout for {url}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Connection Timed Out after {self.timeout}s',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'TIMEOUT'
}
except Exception as e:
debug(f'Unexpected error for {url}: {e.__class__.__name__}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'UNKNOWN'
}
err_type, msg = self._classify_exception(e)
debug(f'{url} {err_type}: {msg}')
return self._err_result(domain, protocol, err_type, msg)
finally:
if token is not None:
_ssl_capture.reset(token)
async def _check_url_with_retries(self, session, url, protocol, domain) -> dict:
'''Try _check_url up to (1 + retries) times on transient failures.'''
attempts = 1 + max(0, self.retries)
last = None
for attempt in range(attempts):
result = await self._check_url(session, url, protocol, domain)
if result.get('status', -1) >= 0:
return result
last = result
if result.get('error_type') not in _TRANSIENT_ERROR_TYPES:
return result
if attempt < attempts - 1:
await asyncio.sleep(self.retry_backoff * (attempt + 1))
return last
async def check_domain(self, session: aiohttp.ClientSession, domain: str) -> dict:
'''Try the preferred protocol; fall back to the other on failure.'''
base_domain, port, protocols = parse_domain_url(domain)
original_domain = base_domain
# CNAME chain (optional): resolve up to MAX_CNAME_CHAIN entries, then scan
# against the final hop's hostname. Chain is reported even if length is 1.
cname_chain = None
scan_target = base_domain
if self.fetch_cname:
try:
chain = await resolve_cname_chain(base_domain, self.resolvers, self.dns_timeout, MAX_CNAME_CHAIN)
if len(chain) > 1:
cname_chain = chain
scan_target = chain[-1]
except Exception as e:
debug(f'CNAME resolve error for {base_domain}: {e}')
ips_task = None
if self.fetch_ips:
ips_task = asyncio.create_task(resolve_ips(scan_target, self.resolvers, self.dns_timeout))
last_error = None
success = None
for protocol in protocols:
url = f'{protocol}://{scan_target}'
if port:
url += f':{port}'
result = await self._check_url_with_retries(session, url, protocol, original_domain)
if result.get('status', -1) >= 0:
success = result
break
last_error = result
if result.get('error_type') not in _FALLBACK_ERROR_TYPES:
break
final = success if success is not None else last_error
if cname_chain is not None:
final['cname_chain'] = cname_chain
if ips_task is not None:
try:
ips = await ips_task
if ips:
final['ips'] = ips
except Exception as e:
debug(f'IP resolve error for {scan_target}: {e}')
return final
async def scan(self, input_source):
'''
Scan domains from a file, stdin, or async generator
:param input_source: Can be:
- Path to file (str)
- stdin ('-')
- List/tuple of domains
- Async generator yielding domains
:yields: Result dictionary for each domain scanned
Scan domains from a file path, '-' for stdin, an iterable, or an async iterable.
Yields one result dict per domain.
:param input_source: see utils.input_generator
'''
if not self.resolvers:
self.resolvers = await load_resolvers(self.resolver_file)
# Just use ssl=False, that's all we need
connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True)
connector = self._make_connector()
self._stop_event = asyncio.Event()
stop_event = self._stop_event
async with aiohttp.ClientSession(connector=connector) as session:
tasks = {} # Change to dict to track domain for each task
domain_queue = asyncio.Queue()
queue_empty = False
async def process_domain(domain):
domain_queue = asyncio.Queue(maxsize=self.concurrent_limit * 2)
tasks = {}
queue_done = False
async def producer():
nonlocal queue_done
try:
result = await self.check_domain(session, domain)
if self.show_progress:
self.progress_count += 1
if result:
return domain, result
else:
# Create a proper error result if check_domain returns None
return domain, {
'domain': domain,
'status': -1,
'error': 'No successful response from either HTTP or HTTPS',
'protocol': 'unknown',
'error_type': 'NO_RESPONSE'
}
async for domain in input_generator(input_source, self.shard):
if stop_event.is_set():
break
await domain_queue.put(domain)
finally:
queue_done = True
async def process(domain):
try:
return await self.check_domain(session, domain)
except Exception as e:
debug(f'Error processing {domain}: {e.__class__.__name__}: {str(e)}')
# Return structured error information
return domain, {
'domain': domain,
'status': -1,
'error': f'{e.__class__.__name__}: {str(e)}',
'protocol': 'unknown',
'error_type': 'PROCESS'
}
debug(f'process error for {domain}: {e.__class__.__name__}: {e}')
return self._err_result(domain, 'unknown', 'PROCESS', f'{e.__class__.__name__}: {e}')
# Queue processor
async def queue_processor():
async for domain in input_generator(input_source, self.shard):
await domain_queue.put(domain)
self.processed_domains += 1
nonlocal queue_empty
queue_empty = True
# Start queue processor
queue_task = asyncio.create_task(queue_processor())
producer_task = asyncio.create_task(producer())
try:
while not (queue_empty and domain_queue.empty() and not tasks):
# Fill up tasks until we hit concurrent limit
while len(tasks) < self.concurrent_limit and not domain_queue.empty():
domain = await domain_queue.get()
task = asyncio.create_task(process_domain(domain))
tasks[task] = domain
if tasks:
# Wait for at least one task to complete
done, _ = await asyncio.wait(
tasks.keys(),
return_when=asyncio.FIRST_COMPLETED
)
# Process completed tasks
for task in done:
domain = tasks.pop(task)
while not (queue_done and domain_queue.empty() and not tasks):
# On stop: drop any queued domains, finish in-flight, then exit.
if stop_event.is_set():
while not domain_queue.empty():
try:
_, result = await task
if result:
yield result
except Exception as e:
debug(f'Task error for {domain}: {e.__class__.__name__}: {str(e)}')
yield {
'domain': domain,
'status': -1,
'error': f'Task Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'unknown',
'error_type': 'TASK'
}
else:
await asyncio.sleep(0.1) # Prevent CPU spin when no tasks
domain_queue.get_nowait()
except asyncio.QueueEmpty:
break
if not tasks:
break
if not stop_event.is_set():
while len(tasks) < self.concurrent_limit and not domain_queue.empty():
domain = domain_queue.get_nowait()
t = asyncio.create_task(process(domain))
tasks[t] = domain
if not tasks:
await asyncio.sleep(0.05)
continue
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
for t in done:
domain = tasks.pop(t)
try:
result = t.result()
except Exception as e:
result = self._err_result(domain, 'unknown', 'TASK', f'{e.__class__.__name__}: {e}')
if result is None:
result = self._err_result(domain, 'unknown', 'NO_RESPONSE', 'No response from either protocol')
if self.match_codes is not None and result.get('status') not in self.match_codes:
continue
if self.exclude_codes is not None and result.get('status') in self.exclude_codes:
continue
yield result
finally:
# Clean up
for task in tasks:
task.cancel()
queue_task.cancel()
for t in tasks:
t.cancel()
producer_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass
await producer_task
except (asyncio.CancelledError, Exception):
pass

View File

@@ -2,16 +2,25 @@
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/utils.py
import asyncio
import logging
import os
import sys
import asyncio
try:
import aiofiles
except ImportError:
raise ImportError('missing aiofiles module (pip install aiofiles)')
try:
import dns.asyncresolver
import dns.resolver
except ImportError:
raise ImportError('missing dnspython module (pip install dnspython)')
# Global for silent mode
SILENT_MODE = False
# List of user agents to randomize requests
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
@@ -23,14 +32,11 @@ USER_AGENTS = [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.6.5 Chrome/124.0.6367.243 Electron/30.1.2 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 OPR/116.0.0.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:134.0) Gecko/20100101 Firefox/134.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.8.3 Chrome/130.0.6723.191 Electron/33.3.2 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.3 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.6613.137 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
@@ -39,12 +45,9 @@ USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1.1 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64; rv:134.0) Gecko/20100101 Firefox/134.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:135.0) Gecko/20100101 Firefox/135.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.5.12 Chrome/120.0.6099.283 Electron/28.2.3 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
@@ -55,103 +58,181 @@ USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36 OPR/114.0.0.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.7.7 Chrome/128.0.6613.186 Electron/32.2.5 Safari/537.36'
]
def _silent() -> bool:
# Read at call time so cli.py can flip the module attribute.
from . import utils as _self
return _self.SILENT_MODE
def debug(msg: str):
if not SILENT_MODE:
logging.debug(msg)
if not _silent(): logging.debug(msg)
def error(msg: str):
if not SILENT_MODE: logging.error(msg)
if not _silent(): logging.error(msg)
def info(msg: str):
if not SILENT_MODE: logging.info(msg)
if not _silent(): logging.info(msg)
def warning(msg: str):
if not SILENT_MODE: logging.warning(msg)
if not _silent(): logging.warning(msg)
def human_size(size_bytes: int) -> str:
'''
Convert bytes to human readable string
:param size_bytes: size in bytes
'''
if not size_bytes:
return '0B'
units = ('B', 'KB', 'MB', 'GB')
size = float(size_bytes)
unit_index = 0
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
return f'{size:.1f}{units[unit_index]}'
async def resolve_cname_chain(domain: str, resolvers: list = None, timeout: float = 2.0, max_chain: int = 3) -> list:
'''
Follow CNAME records starting from `domain`. Returns the chain as a list of
hostnames including the original (e.g. ['foo.com', 'bar.com', 'baz.com']).
Length is capped at max_chain. Stops on no CNAME, max length, or loop.
:param domain: starting hostname
:param resolvers: optional resolver IPs
:param timeout: per-query timeout in seconds
:param max_chain: maximum total chain length (including the original)
'''
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
resolver.timeout = timeout
if resolvers:
resolver.nameservers = resolvers
chain = [domain]
seen = {domain.lower()}
current = domain
while len(chain) < max_chain:
try:
answer = await resolver.resolve(current, 'CNAME')
except Exception:
break
target = str(answer[0].target).rstrip('.')
if not target or target.lower() in seen:
break
chain.append(target)
seen.add(target.lower())
current = target
return chain
async def resolve_ips(domain: str, resolvers: list = None, timeout: float = 2.0) -> list:
'''
Resolve A and AAAA records for a domain. Returns sorted unique list of IPs.
:param domain: domain to resolve
:param resolvers: optional list of resolver IPs to use
:param timeout: per-query timeout in seconds
'''
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
resolver.timeout = timeout
if resolvers:
resolver.nameservers = resolvers
results = await asyncio.gather(
resolver.resolve(domain, 'A'),
resolver.resolve(domain, 'AAAA'),
return_exceptions=True,
)
ips = []
for r in results:
if isinstance(r, dns.resolver.Answer):
ips.extend(str(rec) for rec in r)
return sorted(set(ips))
async def input_generator(input_source, shard: tuple = None):
'''
Async generator function to yield domains from various input sources with optional sharding
:param input_source: Can be:
- string path to local file
- "-" for stdin
- list/tuple of domains
- generator/iterator yielding domains
- string content with newlines
:param shard: Tuple of (shard_index, total_shards) for distributed scanning
Async generator yielding domains from various input sources with optional sharding.
:param input_source: string path to file, "-" for stdin, or any sync/async iterable of domains
:param shard: tuple of (shard_index, total_shards) for distributed scanning
'''
line_num = 0
# Handle stdin
def _shard_ok(n):
return shard is None or n % shard[1] == shard[0]
# stdin (read in executor so we don't block the loop)
if input_source == '-' or input_source is None:
for line in sys.stdin:
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
loop = asyncio.get_event_loop()
while True:
line = await loop.run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if line and _shard_ok(line_num):
yield line
line_num += 1
# Handle local files
elif isinstance(input_source, str) and os.path.exists(input_source):
with open(input_source, 'r') as f:
for line in f:
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
return
# local file
if isinstance(input_source, str) and os.path.exists(input_source):
async with aiofiles.open(input_source, 'r') as f:
async for line in f:
line = line.strip()
if line and _shard_ok(line_num):
yield line
line_num += 1
# Handle iterables (generators, lists, etc)
elif hasattr(input_source, '__iter__') and not isinstance(input_source, (str, bytes)):
for line in input_source:
await asyncio.sleep(0)
return
# async iterable
if hasattr(input_source, '__aiter__'):
async for line in input_source:
if isinstance(line, bytes):
line = line.decode()
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
line = line.strip()
if line and _shard_ok(line_num):
yield line
line_num += 1
# Handle string content with newlines
elif isinstance(input_source, (str, bytes)):
return
# sync iterable (list, tuple, generator)
if hasattr(input_source, '__iter__') and not isinstance(input_source, (str, bytes)):
for line in input_source:
if isinstance(line, bytes):
line = line.decode()
line = line.strip()
if line and _shard_ok(line_num):
yield line
line_num += 1
return
# raw string content with newlines
if isinstance(input_source, (str, bytes)):
if isinstance(input_source, bytes):
input_source = input_source.decode()
for line in input_source.splitlines():
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
line_num += 1
line = line.strip()
if line and _shard_ok(line_num):
yield line
line_num += 1

View File

@@ -1,5 +1,7 @@
aiodns>=3.0.0
aiofiles>=23.0.0
aiohttp>=3.8.0
beautifulsoup4>=4.9.3
cryptography>=3.4.7
dnspython>=2.1.0
mmh3>=3.0.0
mmh3>=3.0.0

View File

@@ -10,7 +10,7 @@ with open('README.md', 'r', encoding='utf-8') as f:
setup(
name='httpz_scanner',
version='2.1.8',
version='3.1.1',
author='acidvegas',
author_email='acid.vegas@acid.vegas',
description='Hyper-fast HTTP Scraping Tool',
@@ -34,6 +34,8 @@ setup(
],
python_requires='>=3.8',
install_requires=[
'aiodns>=3.0.0',
'aiofiles>=23.0.0',
'aiohttp>=3.8.0',
'beautifulsoup4>=4.9.3',
'cryptography>=3.4.7',
@@ -45,4 +47,4 @@ setup(
'httpz=httpz_scanner.cli:run',
],
},
)
)

View File

@@ -1,3 +1,6 @@
# httpz - Developed by acidvegas in Python (https://github.com/acidvegas)
# unit_test.py
#!/usr/bin/env python3
# HTTPZ Web Scanner - Unit Tests
# unit_test.py
@@ -16,7 +19,7 @@ except ImportError:
class ColoredFormatter(logging.Formatter):
'''Custom formatter for colored log output'''
def format(self, record):
if record.levelno == logging.INFO:
color = Colors.GREEN
@@ -26,204 +29,169 @@ class ColoredFormatter(logging.Formatter):
color = Colors.RED
else:
color = Colors.RESET
record.msg = f'{color}{record.msg}{Colors.RESET}'
return super().format(record)
# Configure logging with colors
logger = logging.getLogger()
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(handler)
TEST_DOMAINS_URL = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
async def get_domains_from_url() -> list:
'''
Fetch domains from SecLists URL
:return: List of domains
'''
'''Fetch a small known-good set of domains for testing.'''
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async with session.get(TEST_DOMAINS_URL) as response:
content = await response.text()
return [line.strip() for line in content.splitlines() if line.strip()]
async def domain_generator(domains: list):
'''
Async generator that yields domains
:param domains: List of domains to yield
'''
'''Async generator yielding domains one at a time.'''
for domain in domains:
await asyncio.sleep(0) # Allow other coroutines to run
await asyncio.sleep(0)
yield domain
async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple:
'''Run a single benchmark test'''
def _make_scanner(concurrency: int) -> HTTPZScanner:
'''Build a scanner with every feature toggle on.'''
return HTTPZScanner(
concurrent_limit = concurrency,
timeout = 5,
retries = 1,
follow_redirects = True,
fetch_headers = True,
fetch_content_type = True,
fetch_content_length = True,
fetch_title = True,
fetch_body = True,
fetch_favicon = True,
fetch_tls = True,
fetch_ips = True,
fetch_cname = True,
)
async def run_benchmark(test_type: str, source, total: int, concurrency: int) -> tuple:
'''Scan a source, log each result, return (elapsed_seconds, throughput).'''
logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
count = 0
got_first = False
scanner = _make_scanner(concurrency)
count = 0
got_first = False
start_time = None
if test_type == 'List':
async for result in scanner.scan(domains):
if result:
if not got_first:
got_first = True
start_time = time.time()
count += 1
# More detailed status reporting
status_str = ''
if result['status'] < 0:
error_type = result.get('error_type', 'UNKNOWN')
error_msg = result.get('error', 'Unknown Error')
status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}"
elif 200 <= result['status'] < 300:
status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
elif 300 <= result['status'] < 400:
status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
else:
status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
# Show protocol and response headers if available
protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else ''
headers_info = ''
if result.get('response_headers'):
important_headers = ['server', 'location', 'content-type']
headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers]
if headers:
headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}"
# Show redirect chain if present
redirect_info = ''
if result.get('redirect_chain'):
redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}"
# Show error details if present
error_info = ''
if result.get('error'):
error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}"
# Show final URL if different from original
url_info = ''
if result.get('url') and result['url'] != f"http(s)://{result['domain']}":
url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}"
logging.info(
f"{test_type}-{concurrency} Result {count}: "
f"{status_str}{protocol_info} "
f"{Colors.CYAN}{result['domain']}{Colors.RESET}"
f"{redirect_info}"
f"{url_info}"
f"{headers_info}"
f"{error_info}"
)
else:
# Skip generator test
pass
elapsed = time.time() - start_time if start_time else 0
domains_per_sec = count/elapsed if elapsed > 0 else 0
logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}')
return elapsed, domains_per_sec
async for result in scanner.scan(source):
if not result:
continue
if not got_first:
got_first = True
start_time = time.time()
count += 1
if result['status'] < 0:
status_str = f"{Colors.RED}[{result['status']} - {result.get('error_type','UNKNOWN')}: {result.get('error','')}]{Colors.RESET}"
elif 200 <= result['status'] < 300:
status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
elif 300 <= result['status'] < 400:
status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
else:
status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
proto = f" {Colors.CYAN}({result.get('protocol','unknown')}){Colors.RESET}"
title = f" {Colors.DARK_GREEN}{result['title']}{Colors.RESET}" if result.get('title') else ''
ips = f" {Colors.YELLOW}{','.join(result['ips'])}{Colors.RESET}" if result.get('ips') else ''
tls = f" {Colors.GREEN}TLS:{result['tls']['subject']}{Colors.RESET}" if result.get('tls') else ''
favicon = f" {Colors.PURPLE}fav:{result['favicon_hash']}{Colors.RESET}" if result.get('favicon_hash') else ''
redirect = f" {Colors.YELLOW}({len(result['redirect_chain'])} hops){Colors.RESET}" if result.get('redirect_chain') else ''
cname = f" {Colors.PURPLE}CNAME:{'->'.join(result['cname_chain'])}{Colors.RESET}" if result.get('cname_chain') else ''
logging.info(
f'{test_type}-{concurrency} #{count}: '
f'{status_str}{proto} '
f'{Colors.CYAN}{result["domain"]}{Colors.RESET}'
f'{redirect}{cname}{title}{tls}{ips}{favicon}'
)
elapsed = (time.time() - start_time) if start_time else 0
rps = (count / elapsed) if elapsed > 0 else 0
logging.info(f'{Colors.YELLOW}{test_type} {concurrency}c: {count}/{total} in {elapsed:.2f}s ({rps:.2f}/s){Colors.RESET}')
return elapsed, rps
async def test_list_input(domains: list):
'''Test scanning using a list input'''
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
async def test_stop(domains: list) -> None:
'''Confirm scanner.stop() drains in-flight tasks and exits cleanly.'''
logging.info(f'{Colors.BOLD}Testing graceful stop()...{Colors.RESET}')
scanner = _make_scanner(concurrency=20)
t0 = time.time()
count = 0
async for result in scanner.scan(domains):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
async def kicker():
await asyncio.sleep(1.0)
logging.info(f'{Colors.YELLOW}stop() called at {time.time()-t0:.2f}s{Colors.RESET}')
await scanner.stop()
async def test_generator_input(domains: list):
'''Test scanning using an async generator input'''
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
count = 0
async for result in scanner.scan(domain_generator(domains)):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
k = asyncio.create_task(kicker())
async for _ in scanner.scan(domains):
count += 1
await k
elapsed = time.time() - t0
if count >= len(domains):
raise AssertionError(f'stop() did not interrupt scan ({count}/{len(domains)})')
logging.info(f'{Colors.GREEN}stop() OK: drained {count}/{len(domains)} in {elapsed:.2f}s{Colors.RESET}')
async def main() -> None:
'''Main test function'''
'''Run the full test suite.'''
try:
# Fetch domains
domains = await get_domains_from_url()
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
# Store benchmark results
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} test domains')
results = []
# Run tests with different concurrency levels
for concurrency in [25, 50, 100]:
# Generator tests
gen_result = await run_benchmark('Generator', domains, concurrency)
results.append(('Generator', concurrency, *gen_result))
# List tests
list_result = await run_benchmark('List', domains, concurrency)
results.append(('List', concurrency, *list_result))
# Print benchmark comparison
for concurrency in (25, 50, 100):
gen_elapsed, gen_rps = await run_benchmark('Generator', domain_generator(domains), len(domains), concurrency)
results.append(('Generator', concurrency, gen_elapsed, gen_rps))
list_elapsed, list_rps = await run_benchmark('List', domains, len(domains), concurrency)
results.append(('List', concurrency, list_elapsed, list_rps))
await test_stop(domains)
logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}')
logging.info('-' * 80)
logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}')
logging.info('-' * 80)
# Sort by domains per second (fastest first)
logging.info('-' * 70)
logging.info(f'{"Type":<12} {"Concurrency":<14} {"Time (s)":<12} {"Domains/sec":<12}')
logging.info('-' * 70)
results.sort(key=lambda x: x[3], reverse=True)
for test_type, concurrency, elapsed, domains_per_sec in results:
logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}')
# Highlight fastest result
for test_type, concurrency, elapsed, rps in results:
logging.info(f'{test_type:<12} {concurrency:<14} {elapsed:<12.2f} {rps:<12.2f}')
fastest = results[0]
logging.info('-' * 80)
logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections')
logging.info(f'Time: {fastest[2]:.2f} seconds')
logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}')
logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
logging.info('-' * 70)
logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} @ {fastest[1]} concurrent {fastest[3]:.2f}/s{Colors.RESET}')
logging.info(f'\n{Colors.GREEN}All tests passed.{Colors.RESET}')
except Exception as e:
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
logging.error(f'Test failed: {Colors.RED}{e}{Colors.RESET}')
import traceback; traceback.print_exc()
sys.exit(1)
@@ -232,4 +200,4 @@ if __name__ == '__main__':
asyncio.run(main())
except KeyboardInterrupt:
logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}')
sys.exit(1)
sys.exit(1)