Compare commits

..

22 Commits
v1.0.9 ... main

Author SHA1 Message Date
19525aec7d
sup roarie 2025-02-12 03:03:20 -05:00
f1f5a78ae0
sup tommyrot 2025-02-12 02:59:51 -05:00
9a4b7e977a
fixed chunk output 2025-02-12 02:57:44 -05:00
e220648a1a
fixed chunk output 2025-02-12 02:55:31 -05:00
7fe571ddad
fixed chunk output 2025-02-12 00:50:02 -05:00
6dacafeee5
fixed chunk output 2025-02-12 00:35:35 -05:00
41d7e53d30
fixed chunk output 2025-02-12 00:32:28 -05:00
db9590f59d
faster processing 2025-02-12 00:28:46 -05:00
90b5134c25
Added unit test 2025-02-11 22:30:22 -05:00
3d8b2d8e4f
fuck 2025-02-11 21:48:34 -05:00
63517430b7
fuck 2025-02-11 21:44:58 -05:00
311d37108a
fuck 2025-02-11 21:40:49 -05:00
cd66542003
fuck 2025-02-11 21:31:00 -05:00
44e27630de
fuck 2025-02-11 21:28:08 -05:00
d5ce06ed1e
fuck 2025-02-11 21:25:47 -05:00
ef115eb3da
fuck 2025-02-11 21:21:45 -05:00
1819c7dc48
fuck 2025-02-11 21:13:28 -05:00
f7c797c851
fuck 2025-02-11 21:08:12 -05:00
dfb11b0a1c
Better input processing 2025-02-11 20:57:01 -05:00
718b50b6c2
Prepair= for 2.0.0 2025-02-11 19:23:49 -05:00
e27e5e4095
Allow any form of input for scanning 2025-02-11 19:18:52 -05:00
a6fc596547
Docunentation finished 2025-02-11 02:55:37 -05:00
12 changed files with 847 additions and 390 deletions

161
README.md
View File

@ -16,10 +16,10 @@ A high-performance concurrent web scanner written in Python. HTTPZ efficiently s
## Installation ## Installation
### Via pip (recommended) ### Via pip *(recommended)*
```bash ```bash
# Install from PyPI # Install from PyPI
pip install httpz-scanner pip install httpz_scanner
# The 'httpz' command will now be available in your terminal # The 'httpz' command will now be available in your terminal
httpz --help httpz --help
@ -39,133 +39,152 @@ pip install -r requirements.txt
Basic usage: Basic usage:
```bash ```bash
python -m httpz-scanner domains.txt python -m httpz_scanner domains.txt
``` ```
Scan with all flags enabled and output to JSONL: Scan with all flags enabled and output to JSONL:
```bash ```bash
python -m httpz-scanner domains.txt -all -c 100 -o results.jsonl -j -p python -m httpz_scanner domains.txt -all -c 100 -o results.jsonl -j -p
``` ```
Read from stdin: Read from stdin:
```bash ```bash
cat domains.txt | python -m httpz-scanner - -all -c 100 cat domains.txt | python -m httpz_scanner - -all -c 100
echo "example.com" | python -m httpz-scanner - -all echo "example.com" | python -m httpz_scanner - -all
``` ```
Filter by status codes and follow redirects: Filter by status codes and follow redirects:
```bash ```bash
python -m httpz-scanner domains.txt -mc 200,301-399 -ec 404,500 -fr -p python -m httpz_scanner domains.txt -mc 200,301-399 -ec 404,500 -fr -p
``` ```
Show specific fields with custom timeout and resolvers: Show specific fields with custom timeout and resolvers:
```bash ```bash
python -m httpz-scanner domains.txt -sc -ti -i -tls -to 10 -r resolvers.txt python -m httpz_scanner domains.txt -sc -ti -i -tls -to 10 -r resolvers.txt
``` ```
Full scan with all options: Full scan with all options:
```bash ```bash
python -m httpz-scanner domains.txt -c 100 -o output.jsonl -j -all -to 10 -mc 200,301 -ec 404,500 -p -ax -r resolvers.txt python -m httpz_scanner domains.txt -c 100 -o output.jsonl -j -all -to 10 -mc 200,301 -ec 404,500 -p -ax -r resolvers.txt
``` ```
### Distributed Scanning
Split scanning across multiple machines using the `--shard` argument:
```bash
# Machine 1
httpz domains.txt --shard 1/3
# Machine 2
httpz domains.txt --shard 2/3
# Machine 3
httpz domains.txt --shard 3/3
```
Each machine will process a different subset of domains without overlap. For example, with 3 shards:
- Machine 1 processes lines 0,3,6,9,...
- Machine 2 processes lines 1,4,7,10,...
- Machine 3 processes lines 2,5,8,11,...
This allows efficient distribution of large scans across multiple machines.
### Python Library ### Python Library
```python ```python
import asyncio import asyncio
import urllib.request
from httpz_scanner import HTTPZScanner from httpz_scanner import HTTPZScanner
async def scan_domains(): async def scan_from_list() -> list:
with urllib.request.urlopen('https://example.com/domains.txt') as response:
content = response.read().decode()
return [line.strip() for line in content.splitlines() if line.strip()][:20]
async def scan_from_url():
with urllib.request.urlopen('https://example.com/domains.txt') as response:
for line in response:
if line := line.strip():
yield line.decode().strip()
async def scan_from_file():
with open('domains.txt', 'r') as file:
for line in file:
if line := line.strip():
yield line
async def main():
# Initialize scanner with all possible options (showing defaults) # Initialize scanner with all possible options (showing defaults)
scanner = HTTPZScanner( scanner = HTTPZScanner(
# Core settings concurrent_limit=100, # Number of concurrent requests
concurrent_limit=100, # Number of concurrent requests
timeout=5, # Request timeout in seconds timeout=5, # Request timeout in seconds
follow_redirects=False, # Follow redirects (max 10) follow_redirects=False, # Follow redirects (max 10)
check_axfr=False, # Try AXFR transfer against nameservers check_axfr=False, # Try AXFR transfer against nameservers
resolver_file=None, # Path to custom DNS resolvers file resolver_file=None, # Path to custom DNS resolvers file
output_file=None, # Path to JSONL output file output_file=None, # Path to JSONL output file
show_progress=False, # Show progress counter show_progress=False, # Show progress counter
debug_mode=False, # Show error states and debug info debug_mode=False, # Show error states and debug info
jsonl_output=False, # Output in JSONL format jsonl_output=False, # Output in JSONL format
shard=None, # Tuple of (shard_index, total_shards) for distributed scanning
# Control which fields to show (all False by default unless show_fields is None) # Control which fields to show (all False by default unless show_fields is None)
show_fields={ show_fields={
'status_code': True, # Show status code 'status_code': True, # Show status code
'content_type': True, # Show content type 'content_type': True, # Show content type
'content_length': True, # Show content length 'content_length': True, # Show content length
'title': True, # Show page title 'title': True, # Show page title
'body': True, # Show body preview 'body': True, # Show body preview
'ip': True, # Show IP addresses 'ip': True, # Show IP addresses
'favicon': True, # Show favicon hash 'favicon': True, # Show favicon hash
'headers': True, # Show response headers 'headers': True, # Show response headers
'follow_redirects': True, # Show redirect chain 'follow_redirects': True, # Show redirect chain
'cname': True, # Show CNAME records 'cname': True, # Show CNAME records
'tls': True # Show TLS certificate info 'tls': True # Show TLS certificate info
}, },
# Filter results # Filter results
match_codes={200, 301, 302}, # Only show these status codes match_codes={200,301,302}, # Only show these status codes
exclude_codes={404, 500, 503} # Exclude these status codes exclude_codes={404,500,503} # Exclude these status codes
) )
# Initialize resolvers (required before scanning) # Example 1: Process file
await scanner.init() print('\nProcessing file:')
async for result in scanner.scan(scan_from_file()):
print(f"{result['domain']}: {result['status']}")
# Scan domains from file # Example 2: Stream URLs
await scanner.scan('domains.txt') print('\nStreaming URLs:')
async for result in scanner.scan(scan_from_url()):
print(f"{result['domain']}: {result['status']}")
# Or scan from stdin # Example 3: Process list
await scanner.scan('-') print('\nProcessing list:')
domains = await scan_from_list()
async for result in scanner.scan(domains):
print(f"{result['domain']}: {result['status']}")
if __name__ == '__main__': if __name__ == '__main__':
asyncio.run(scan_domains()) asyncio.run(main())
``` ```
The scanner will return results in this format: The scanner accepts various input types:
```python - File paths (string)
{ - Lists/tuples of domains
'domain': 'example.com', # Base domain - stdin (using '-')
'url': 'https://example.com', # Full URL - Async generators that yield domains
'status': 200, # HTTP status code
'port': 443, # Port number All inputs support sharding for distributed scanning using the `shard` parameter.
'title': 'Example Domain', # Page title
'body': 'Example body text...', # Body preview
'content_type': 'text/html', # Content type
'content_length': '12345', # Content length
'ips': ['93.184.216.34'], # IP addresses
'cname': 'cdn.example.com', # CNAME record
'nameservers': ['ns1.example.com'],# Nameservers
'favicon_hash': '123456789', # Favicon hash
'headers': { # Response headers
'Server': 'nginx',
'Content-Type': 'text/html'
},
'redirect_chain': [ # Redirect history
'http://example.com',
'https://example.com'
],
'tls': { # TLS certificate info
'fingerprint': 'sha256...',
'common_name': 'example.com',
'issuer': 'Let\'s Encrypt',
'alt_names': ['www.example.com'],
'not_before': '2023-01-01T00:00:00',
'not_after': '2024-01-01T00:00:00',
'version': 3,
'serial_number': 'abcdef1234'
}
}
```
## Arguments ## Arguments
| Argument | Long Form | Description | | Argument | Long Form | Description |
|-----------|------------------|-------------------------------------------------------------| |---------------|------------------|-------------------------------------------------------------|
| `file` | - | File containing domains *(one per line)*, use `-` for stdin | | `file` | | File containing domains *(one per line)*, use `-` for stdin |
| `-d` | `--debug` | Show error states and debug information | | `-d` | `--debug` | Show error states and debug information |
| `-c N` | `--concurrent N` | Number of concurrent checks *(default: 100)* | | `-c N` | `--concurrent N` | Number of concurrent checks *(default: 100)* |
| `-o FILE` | `--output FILE` | Output file path *(JSONL format)* | | `-o FILE` | `--output FILE` | Output file path *(JSONL format)* |
| `-j` | `--jsonl` | Output JSON Lines format to console | | `-j` | `--jsonl` | Output JSON Lines format to console |
| `-all` | `--all-flags` | Enable all output flags | | `-all` | `--all-flags` | Enable all output flags |
| `-sh` | `--shard N/T` | Process shard N of T total shards *(e.g., 1/3)* |
### Output Field Flags ### Output Field Flags
@ -191,5 +210,5 @@ The scanner will return results in this format:
| `-mc CODES` | `--match-codes CODES` | Only show specific status codes *(comma-separated)* | | `-mc CODES` | `--match-codes CODES` | Only show specific status codes *(comma-separated)* |
| `-ec CODES` | `--exclude-codes CODES` | Exclude specific status codes *(comma-separated)* | | `-ec CODES` | `--exclude-codes CODES` | Exclude specific status codes *(comma-separated)* |
| `-p` | `--progress` | Show progress counter | | `-p` | `--progress` | Show progress counter |
| `-ax` | `--axfr` | Try AXFR transfer against nameservers | | `-ax` | `--axfr` | Try AXFR transfer against nameservers |
| `-r FILE` | `--resolvers FILE` | File containing DNS resolvers *(one per line)* | | `-r FILE` | `--resolvers FILE` | File containing DNS resolvers *(one per line)* |

View File

@ -2,8 +2,8 @@
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/__init__.py # httpz_scanner/__init__.py
from .scanner import HTTPZScanner
from .colors import Colors from .colors import Colors
from .scanner import HTTPZScanner
__version__ = '1.0.9' __version__ = '2.1.8'

View File

@ -4,8 +4,11 @@
import asyncio import asyncio
import sys import sys
from .cli import main from .cli import main
if __name__ == '__main__': if __name__ == '__main__':
try: try:
asyncio.run(main()) asyncio.run(main())

View File

@ -4,13 +4,19 @@
import argparse import argparse
import asyncio import asyncio
import json
import logging import logging
import os import os
import sys import sys
from .colors import Colors from datetime import datetime
from .scanner import HTTPZScanner
from .utils import SILENT_MODE, info from .colors import Colors
from .formatters import format_console_output
from .parsers import parse_status_codes, parse_shard
from .scanner import HTTPZScanner
from .utils import SILENT_MODE, info
def setup_logging(level='INFO', log_to_disk=False): def setup_logging(level='INFO', log_to_disk=False):
''' '''
@ -19,16 +25,16 @@ def setup_logging(level='INFO', log_to_disk=False):
:param level: Logging level (INFO or DEBUG) :param level: Logging level (INFO or DEBUG)
:param log_to_disk: Whether to also log to file :param log_to_disk: Whether to also log to file
''' '''
class ColoredFormatter(logging.Formatter): class ColoredFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None): def formatTime(self, record):
# Format: MM-DD HH:MM
from datetime import datetime
dt = datetime.fromtimestamp(record.created) dt = datetime.fromtimestamp(record.created)
return f"{Colors.GRAY}{dt.strftime('%m-%d %H:%M')}{Colors.RESET}" return f'{Colors.GRAY}{dt.strftime("%m-%d %H:%M")}{Colors.RESET}'
def format(self, record): def format(self, record):
return f'{self.formatTime(record)} {record.getMessage()}' return f'{self.formatTime(record)} {record.getMessage()}'
# Setup logging handlers
handlers = [] handlers = []
# Console handler # Console handler
@ -44,65 +50,51 @@ def setup_logging(level='INFO', log_to_disk=False):
handlers.append(file_handler) handlers.append(file_handler)
# Setup logger # Setup logger
logging.basicConfig( logging.basicConfig(level=getattr(logging, level.upper()), handlers=handlers)
level=getattr(logging, level.upper()),
handlers=handlers
)
def parse_status_codes(codes_str: str) -> set:
'''
Parse comma-separated status codes and ranges into a set of integers
:param codes_str: Comma-separated status codes (e.g., "200,301-399,404,500-503")
'''
codes = set()
try:
for part in codes_str.split(','):
if '-' in part:
start, end = map(int, part.split('-'))
codes.update(range(start, end + 1))
else:
codes.add(int(part))
return codes
except ValueError:
raise argparse.ArgumentTypeError('Invalid status code format. Use comma-separated numbers or ranges (e.g., 200,301-399,404,500-503)')
async def main(): async def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}',
formatter_class=argparse.RawDescriptionHelpFormatter
)
# Add arguments # Add arguments
parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin') parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags') parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags')
parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information') parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks') parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks')
parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console') parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console')
parser.add_argument('-o', '--output', help='Output file path (JSONL format)') parser.add_argument('-o', '--output', help='Output file path (JSONL format)')
# Output field flags # Output field flags
parser.add_argument('-b', '--body', action='store_true', help='Show body preview') parser.add_argument('-b', '--body', action='store_true', help='Show body preview')
parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records') parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records')
parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length') parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length')
parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type') parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type')
parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash') parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)') parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers') parser.add_argument('-hr', '--show-headers', action='store_true', help='Show response headers')
parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses') parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses')
parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code') parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code')
parser.add_argument('-ti', '--title', action='store_true', help='Show page title') parser.add_argument('-ti', '--title', action='store_true', help='Show page title')
parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information') parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information')
# Other arguments # Other arguments
parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers') parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers')
parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)') parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)')
parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)') parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)')
parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter') parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter')
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)') parser.add_argument('-pd', '--post-data', help='Send POST request with this data')
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)')
parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds') parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds')
# Add shard argument
parser.add_argument('-sh','--shard', type=parse_shard, help='Shard index and total shards (e.g., 1/3)')
# Add this to the argument parser section
parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")')
# Add these arguments in the parser section
parser.add_argument('-hd', '--headers', help='Custom headers to send with each request (format: "Header1: value1,Header2: value2")')
# If no arguments provided, print help and exit # If no arguments provided, print help and exit
if len(sys.argv) == 1: if len(sys.argv) == 1:
parser.print_help() parser.print_help()
@ -134,7 +126,7 @@ async def main():
'body' : args.all_flags or args.body, 'body' : args.all_flags or args.body,
'ip' : args.all_flags or args.ip, 'ip' : args.all_flags or args.ip,
'favicon' : args.all_flags or args.favicon, 'favicon' : args.all_flags or args.favicon,
'headers' : args.all_flags or args.headers, 'headers' : args.all_flags or args.show_headers,
'follow_redirects' : args.all_flags or args.follow_redirects, 'follow_redirects' : args.all_flags or args.follow_redirects,
'cname' : args.all_flags or args.cname, 'cname' : args.all_flags or args.cname,
'tls' : args.all_flags or args.tls_info 'tls' : args.all_flags or args.tls_info
@ -145,7 +137,6 @@ async def main():
show_fields = {k: True for k in show_fields} show_fields = {k: True for k in show_fields}
try: try:
# Create scanner instance
scanner = HTTPZScanner( scanner = HTTPZScanner(
concurrent_limit=args.concurrent, concurrent_limit=args.concurrent,
timeout=args.timeout, timeout=args.timeout,
@ -158,11 +149,35 @@ async def main():
jsonl_output=args.jsonl, jsonl_output=args.jsonl,
show_fields=show_fields, show_fields=show_fields,
match_codes=args.match_codes, match_codes=args.match_codes,
exclude_codes=args.exclude_codes exclude_codes=args.exclude_codes,
shard=args.shard,
paths=args.paths.split(',') if args.paths else None,
custom_headers=dict(h.split(': ', 1) for h in args.headers.split(',')) if args.headers else None,
post_data=args.post_data
) )
# Run the scanner with file/stdin input count = 0
await scanner.scan(args.file) async for result in scanner.scan(args.file):
# Write to output file if specified
if args.output:
with open(args.output, 'a') as f:
f.write(json.dumps(result) + '\n')
f.flush() # Ensure file output is immediate
# Handle JSON output separately
if args.jsonl:
print(json.dumps(result), flush=True) # Force flush
continue
# Only output and increment counter if we have content to show for normal output
formatted = format_console_output(result, args.debug, show_fields, args.match_codes, args.exclude_codes)
if formatted:
if args.progress:
count += 1
info(f"[{count}] {formatted}")
sys.stdout.flush() # Force flush after each domain
else:
print(formatted, flush=True) # Force flush
except KeyboardInterrupt: except KeyboardInterrupt:
logging.warning('Process interrupted by user') logging.warning('Process interrupted by user')
@ -171,9 +186,12 @@ async def main():
logging.error(f'Unexpected error: {str(e)}') logging.error(f'Unexpected error: {str(e)}')
sys.exit(1) sys.exit(1)
def run(): def run():
'''Entry point for the CLI''' '''Entry point for the CLI'''
asyncio.run(main()) asyncio.run(main())
if __name__ == '__main__': if __name__ == '__main__':
run() run()

View File

@ -4,7 +4,8 @@
class Colors: class Colors:
'''ANSI color codes for terminal output''' '''ANSI color codes for terminal output'''
HEADER = '\033[95m' # Light purple
HEADER = '\033[95m'
BLUE = '\033[94m' BLUE = '\033[94m'
GREEN = '\033[92m' GREEN = '\033[92m'
YELLOW = '\033[93m' YELLOW = '\033[93m'
@ -12,9 +13,9 @@ class Colors:
BOLD = '\033[1m' BOLD = '\033[1m'
UNDERLINE = '\033[4m' UNDERLINE = '\033[4m'
RESET = '\033[0m' RESET = '\033[0m'
PURPLE = '\033[35m' # Dark purple PURPLE = '\033[35m'
LIGHT_RED = '\033[38;5;203m' # Light red LIGHT_RED = '\033[38;5;203m'
DARK_GREEN = '\033[38;5;22m' # Dark green DARK_GREEN = '\033[38;5;22m'
PINK = '\033[38;5;198m' # Bright pink PINK = '\033[38;5;198m'
GRAY = '\033[90m' # Gray color GRAY = '\033[90m'
CYAN = '\033[96m' # Cyan color CYAN = '\033[96m'

View File

@ -4,14 +4,23 @@
import asyncio import asyncio
import os import os
import aiohttp
import dns.asyncresolver try:
import dns.query import aiohttp
import dns.resolver except ImportError:
import dns.zone raise ImportError('missing aiohttp library (pip install aiohttp)')
try:
import dns.asyncresolver
import dns.query
import dns.resolver
import dns.zone
except ImportError:
raise ImportError('missing dnspython library (pip install dnspython)')
from .utils import debug, info, SILENT_MODE from .utils import debug, info, SILENT_MODE
async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple: async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple:
''' '''
Resolve all DNS records for a domain Resolve all DNS records for a domain
@ -21,36 +30,35 @@ async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None,
:param nameserver: Specific nameserver to use :param nameserver: Specific nameserver to use
:param check_axfr: Whether to attempt zone transfer :param check_axfr: Whether to attempt zone transfer
''' '''
# Setup resolver
resolver = dns.asyncresolver.Resolver() resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout resolver.lifetime = timeout
if nameserver: if nameserver:
resolver.nameservers = [nameserver] resolver.nameservers = [nameserver]
results = await asyncio.gather(*[resolver.resolve(domain, rtype) # Resolve all DNS records
for rtype in ('NS', 'A', 'AAAA', 'CNAME')], results = await asyncio.gather(*[resolver.resolve(domain, rtype) for rtype in ('NS', 'A', 'AAAA', 'CNAME')], return_exceptions=True)
return_exceptions=True)
# Parse results
nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else [] nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else []
ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + \ ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else []) cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
# Get NS IPs
ns_ips = {} ns_ips = {}
if nameservers: if nameservers:
ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) for ns in nameservers for rtype in ('A', 'AAAA')], return_exceptions=True)
for ns in nameservers
for rtype in ('A', 'AAAA')],
return_exceptions=True)
for i, ns in enumerate(nameservers): for i, ns in enumerate(nameservers):
ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] if isinstance(records, dns.resolver.Answer) for ip in records]
if isinstance(records, dns.resolver.Answer)
for ip in records]
# Attempt zone transfer
if check_axfr: if check_axfr:
await attempt_axfr(domain, ns_ips, timeout) await attempt_axfr(domain, ns_ips, timeout)
return sorted(set(ips)), cname, nameservers, ns_ips return sorted(set(ips)), cname, nameservers, ns_ips
async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None: async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
''' '''
Attempt zone transfer for a domain Attempt zone transfer for a domain
@ -59,28 +67,37 @@ async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
:param ns_ips: Dictionary of nameserver hostnames to their IPs :param ns_ips: Dictionary of nameserver hostnames to their IPs
:param timeout: Timeout in seconds :param timeout: Timeout in seconds
''' '''
try: try:
os.makedirs('axfrout', exist_ok=True) os.makedirs('axfrout', exist_ok=True)
# Loop through each NS
for ns_host, ips in ns_ips.items(): for ns_host, ips in ns_ips.items():
# Loop through each NS IP
for ns_ip in ips: for ns_ip in ips:
try: try:
# Attempt zone transfer
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout)) zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
# Write zone to file
with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f: with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f:
zone.to_text(f) zone.to_text(f)
info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})') info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})')
except Exception as e: except Exception as e:
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}') debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e: except Exception as e:
debug(f'Failed AXFR for {domain}: {str(e)}') debug(f'Failed AXFR for {domain}: {str(e)}')
async def load_resolvers(resolver_file: str = None) -> list: async def load_resolvers(resolver_file: str = None) -> list:
''' '''
Load DNS resolvers from file or default source Load DNS resolvers from file or default source
:param resolver_file: Path to file containing resolver IPs :param resolver_file: Path to file containing resolver IPs
:return: List of resolver IPs
''' '''
# Load from file
if resolver_file: if resolver_file:
try: try:
with open(resolver_file) as f: with open(resolver_file) as f:
@ -90,6 +107,7 @@ async def load_resolvers(resolver_file: str = None) -> list:
except Exception as e: except Exception as e:
debug(f'Error loading resolvers from {resolver_file}: {str(e)}') debug(f'Error loading resolvers from {resolver_file}: {str(e)}')
# Load from GitHub
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response: async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
resolvers = await response.text() resolvers = await response.text()

View File

@ -5,6 +5,7 @@
from .colors import Colors from .colors import Colors
from .utils import human_size from .utils import human_size
def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str: def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
''' '''
Format the output with colored sections Format the output with colored sections
@ -37,9 +38,17 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}" status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
parts.append(status) parts.append(status)
# Domain (always shown) # Domain/URL
parts.append(f"[{result['url']}]") parts.append(f"[{result['url']}]")
# Content Type
if show_fields.get('content_type') and result.get('content_type'):
parts.append(f"{Colors.CYAN}[{result['content_type']}]{Colors.RESET}")
# Content Length
if show_fields.get('content_length') and result.get('content_length'):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# Title # Title
if show_fields.get('title') and result.get('title'): if show_fields.get('title') and result.get('title'):
parts.append(f"{Colors.DARK_GREEN}[{result['title']}]{Colors.RESET}") parts.append(f"{Colors.DARK_GREEN}[{result['title']}]{Colors.RESET}")
@ -59,8 +68,8 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
parts.append(f"{Colors.PURPLE}[{result['favicon_hash']}]{Colors.RESET}") parts.append(f"{Colors.PURPLE}[{result['favicon_hash']}]{Colors.RESET}")
# Headers # Headers
if show_fields.get('headers') and result.get('headers'): if show_fields.get('headers') and result.get('response_headers'):
headers_text = [f"{k}: {v}" for k, v in result['headers'].items()] headers_text = [f"{k}: {v}" for k, v in result['response_headers'].items()]
parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}") parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}")
else: else:
if show_fields.get('content_type') and result.get('content_type'): if show_fields.get('content_type') and result.get('content_type'):
@ -73,17 +82,17 @@ def format_console_output(result: dict, debug: bool = False, show_fields: dict =
except (ValueError, TypeError): except (ValueError, TypeError):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}") parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# CNAME
if show_fields.get('cname') and result.get('cname'):
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# Redirect Chain # Redirect Chain
if show_fields.get('follow_redirects') and result.get('redirect_chain'): if show_fields.get('follow_redirects') and result.get('redirect_chain'):
chain = ' -> '.join(result['redirect_chain']) chain = ' -> '.join(result['redirect_chain'])
parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}") parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}")
# CNAME
if show_fields.get('cname') and result.get('cname'):
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# TLS Certificate Info # TLS Certificate Info
if result.get('tls'): if show_fields.get('tls') and result.get('tls'):
cert = result['tls'] cert = result['tls']
tls_parts = [] tls_parts = []
if cert.get('common_name'): if cert.get('common_name'):

View File

@ -2,15 +2,17 @@
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz_scanner/parsers.py # httpz_scanner/parsers.py
import argparse
try: try:
import bs4 import bs4
except ImportError: except ImportError:
raise ImportError('missing bs4 module (pip install beautifulsoup4)') raise ImportError('missing bs4 module (pip install beautifulsoup4)')
try: try:
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.x509.oid import NameOID from cryptography.x509.oid import NameOID
except ImportError: except ImportError:
raise ImportError('missing cryptography module (pip install cryptography)') raise ImportError('missing cryptography module (pip install cryptography)')
@ -27,8 +29,8 @@ def parse_domain_url(domain: str) -> tuple:
Parse domain string into base domain, port, and protocol list Parse domain string into base domain, port, and protocol list
:param domain: Raw domain string to parse :param domain: Raw domain string to parse
:return: Tuple of (base_domain, port, protocols)
''' '''
port = None port = None
base_domain = domain.rstrip('/') base_domain = domain.rstrip('/')
@ -40,23 +42,17 @@ def parse_domain_url(domain: str) -> tuple:
try: try:
port = int(port_str.split('/')[0]) port = int(port_str.split('/')[0])
except ValueError: except ValueError:
port = 443 if protocol == 'https://' else 80 port = None
else:
port = 443 if protocol == 'https://' else 80
protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}']
else: else:
if ':' in base_domain.split('/')[0]: if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1) base_domain, port_str = base_domain.split(':', 1)
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else 443 port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else None
else:
port = 443 protocols = ['http://', 'https://'] # Always try HTTP first
protocols = [
f'https://{base_domain}{":" + str(port) if port else ""}',
f'http://{base_domain}{":" + str(port) if port else ""}'
]
return base_domain, port, protocols return base_domain, port, protocols
async def get_cert_info(ssl_object, url: str) -> dict: async def get_cert_info(ssl_object, url: str) -> dict:
''' '''
Get SSL certificate information for a domain Get SSL certificate information for a domain
@ -64,6 +60,7 @@ async def get_cert_info(ssl_object, url: str) -> dict:
:param ssl_object: SSL object to get certificate info from :param ssl_object: SSL object to get certificate info from
:param url: URL to get certificate info from :param url: URL to get certificate info from
''' '''
try: try:
if not ssl_object or not (cert_der := ssl_object.getpeercert(binary_form=True)): if not ssl_object or not (cert_der := ssl_object.getpeercert(binary_form=True)):
return None return None
@ -100,6 +97,7 @@ async def get_cert_info(ssl_object, url: str) -> dict:
error(f'Error getting cert info for {url}: {str(e)}') error(f'Error getting cert info for {url}: {str(e)}')
return None return None
async def get_favicon_hash(session, base_url: str, html: str) -> str: async def get_favicon_hash(session, base_url: str, html: str) -> str:
''' '''
Get favicon hash from a webpage Get favicon hash from a webpage
@ -108,6 +106,7 @@ async def get_favicon_hash(session, base_url: str, html: str) -> str:
:param base_url: base URL of the website :param base_url: base URL of the website
:param html: HTML content of the page :param html: HTML content of the page
''' '''
try: try:
soup = bs4.BeautifulSoup(html, 'html.parser') soup = bs4.BeautifulSoup(html, 'html.parser')
@ -138,3 +137,61 @@ async def get_favicon_hash(session, base_url: str, html: str) -> str:
debug(f'Error getting favicon for {base_url}: {str(e)}') debug(f'Error getting favicon for {base_url}: {str(e)}')
return None return None
def parse_status_codes(codes_str: str) -> set:
'''
Parse comma-separated status codes and ranges into a set of integers
:param codes_str: Comma-separated status codes (e.g., "200,301-399,404,500-503")
'''
codes = set()
try:
for part in codes_str.split(','):
if '-' in part:
start, end = map(int, part.split('-'))
codes.update(range(start, end + 1))
else:
codes.add(int(part))
return codes
except ValueError:
raise argparse.ArgumentTypeError('Invalid status code format. Use comma-separated numbers or ranges (e.g., 200,301-399,404,500-503)')
def parse_shard(shard_str: str) -> tuple:
'''
Parse shard argument in format INDEX/TOTAL
:param shard_str: Shard string in format "INDEX/TOTAL"
'''
try:
shard_index, total_shards = map(int, shard_str.split('/'))
if shard_index < 1 or total_shards < 1 or shard_index > total_shards:
raise ValueError
return shard_index - 1, total_shards # Convert to 0-based index
except (ValueError, TypeError):
raise argparse.ArgumentTypeError('Shard must be in format INDEX/TOTAL where INDEX <= TOTAL')
def parse_title(html: str, content_type: str = None) -> str:
'''
Parse title from HTML content
:param html: HTML content of the page
:param content_type: Content-Type header value
'''
# Only parse title for HTML content
if content_type and not any(x in content_type.lower() for x in ['text/html', 'application/xhtml']):
return None
try:
soup = bs4.BeautifulSoup(html, 'html.parser', from_encoding='utf-8', features='lxml')
if title := soup.title:
return title.string.strip()
except:
pass
return None

View File

@ -3,9 +3,9 @@
# httpz_scanner/scanner.py # httpz_scanner/scanner.py
import asyncio import asyncio
import json
import random import random
import sys import urllib.parse
import json
try: try:
import aiohttp import aiohttp
@ -17,17 +17,15 @@ try:
except ImportError: except ImportError:
raise ImportError('missing bs4 module (pip install beautifulsoup4)') raise ImportError('missing bs4 module (pip install beautifulsoup4)')
from .dns import resolve_all_dns, load_resolvers from .dns import resolve_all_dns, load_resolvers
from .formatters import format_console_output from .parsers import parse_domain_url, get_cert_info, get_favicon_hash
from .colors import Colors from .utils import debug, USER_AGENTS, input_generator
from .parsers import parse_domain_url, get_cert_info, get_favicon_hash
from .utils import debug, info, USER_AGENTS, input_generator
class HTTPZScanner: class HTTPZScanner:
'''Core scanner class for HTTP domain checking''' '''Core scanner class for HTTP domain checking'''
def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None): def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None, paths = None, custom_headers=None, post_data=None):
''' '''
Initialize the HTTPZScanner class Initialize the HTTPZScanner class
@ -43,6 +41,10 @@ class HTTPZScanner:
:param show_fields: Fields to show :param show_fields: Fields to show
:param match_codes: Status codes to match :param match_codes: Status codes to match
:param exclude_codes: Status codes to exclude :param exclude_codes: Status codes to exclude
:param shard: Tuple of (shard_index, total_shards) for distributed scanning
:param paths: List of additional paths to check on each domain
:param custom_headers: Dictionary of custom headers to send with each request
:param post_data: Data to send with POST requests
''' '''
self.concurrent_limit = concurrent_limit self.concurrent_limit = concurrent_limit
@ -54,6 +56,10 @@ class HTTPZScanner:
self.show_progress = show_progress self.show_progress = show_progress
self.debug_mode = debug_mode self.debug_mode = debug_mode
self.jsonl_output = jsonl_output self.jsonl_output = jsonl_output
self.shard = shard
self.paths = paths or []
self.custom_headers = custom_headers or {}
self.post_data = post_data
self.show_fields = show_fields or { self.show_fields = show_fields or {
'status_code' : True, 'status_code' : True,
@ -73,167 +79,215 @@ class HTTPZScanner:
self.exclude_codes = exclude_codes self.exclude_codes = exclude_codes
self.resolvers = None self.resolvers = None
self.processed_domains = 0 self.processed_domains = 0
self.progress_count = 0
async def init(self):
'''Initialize resolvers - must be called before scanning'''
self.resolvers = await load_resolvers(self.resolver_file)
async def check_domain(self, session: aiohttp.ClientSession, domain: str): async def check_domain(self, session: aiohttp.ClientSession, domain: str):
'''Check a single domain and return results''' '''Check a single domain and return results'''
nameserver = random.choice(self.resolvers) if self.resolvers else None
base_domain, port, protocols = parse_domain_url(domain) base_domain, port, protocols = parse_domain_url(domain)
result = { for protocol in protocols:
'domain' : base_domain, url = f'{protocol}{base_domain}'
'status' : 0, if port:
'url' : protocols[0], url += f':{port}'
'port' : port,
}
# Try each protocol
for url in protocols:
try: try:
# Set random user agent for each request debug(f'Trying {url}...')
headers = {'User-Agent': random.choice(USER_AGENTS)} result = await self._check_url(session, url)
debug(f'Got result for {url}: {result}')
async with session.get(url, timeout=self.timeout, if result and (result['status'] != 400 or result.get('redirect_chain')): # Accept redirects
allow_redirects=self.follow_redirects, return result
max_redirects=10 if self.follow_redirects else 0,
headers=headers) as response:
result['status'] = response.status
# Early exit if status code doesn't match criteria
if self.match_codes and result['status'] not in self.match_codes:
return result
if self.exclude_codes and result['status'] in self.exclude_codes:
return result
# Continue with full processing only if status code matches criteria
result['url'] = str(response.url)
# Add headers if requested
headers = dict(response.headers)
if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')):
result['headers'] = headers
else:
# Only add content type/length if headers aren't included
if content_type := response.headers.get('content-type', '').split(';')[0]:
result['content_type'] = content_type
if content_length := response.headers.get('content-length'):
result['content_length'] = content_length
# Only add redirect chain if it exists
if self.follow_redirects and response.history:
result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
# Do DNS lookups only if we're going to use the result
ips, cname, nameservers, _ = await resolve_all_dns(
base_domain, self.timeout, nameserver, self.check_axfr
)
# Only add DNS fields if they have values
if ips:
result['ips'] = ips
if cname:
result['cname'] = cname
if nameservers:
result['nameservers'] = nameservers
# Only add TLS info if available
if response.url.scheme == 'https':
try:
if ssl_object := response._protocol.transport.get_extra_info('ssl_object'):
if tls_info := await get_cert_info(ssl_object, str(response.url)):
# Only add TLS fields that have values
result['tls'] = {k: v for k, v in tls_info.items() if v}
except AttributeError:
debug(f'Failed to get SSL info for {url}')
html = (await response.text())[:1024*1024]
soup = bs4.BeautifulSoup(html, 'html.parser')
# Only add title if it exists
if soup.title and soup.title.string:
result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300]
# Only add body if it exists
if body_text := soup.get_text():
result['body'] = ' '.join(body_text.split()).rstrip('.')[:500]
# Only add favicon hash if it exists
if favicon_hash := await get_favicon_hash(session, url, html):
result['favicon_hash'] = favicon_hash
break
except Exception as e: except Exception as e:
debug(f'Error checking {url}: {str(e)}') debug(f'Error checking {url}: {str(e)}')
result['status'] = -1
continue continue
return result return None
async def _check_url(self, session: aiohttp.ClientSession, url: str):
'''Check a single URL and return results'''
try:
headers = {'User-Agent': random.choice(USER_AGENTS)}
headers.update(self.custom_headers)
async def process_result(self, result): debug(f'Making request to {url} with headers: {headers}')
''' async with session.request('GET', url,
Process and output a single result timeout=self.timeout,
allow_redirects=True, # Always follow redirects
max_redirects=10,
ssl=False, # Don't verify SSL
headers=headers) as response:
:param result: result to process debug(f'Got response from {url}: status={response.status}, headers={dict(response.headers)}')
'''
formatted = format_console_output(result, self.debug_mode, self.show_fields, self.match_codes, self.exclude_codes) result = {
'domain': urllib.parse.urlparse(url).hostname,
'status': response.status,
'url': str(response.url),
'response_headers': dict(response.headers)
}
if formatted: if response.history:
# Write to file if specified result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
if self.output_file: debug(f'Redirect chain for {url}: {result["redirect_chain"]}')
if (not self.match_codes or result['status'] in self.match_codes) and \
(not self.exclude_codes or result['status'] not in self.exclude_codes):
async with aiohttp.ClientSession() as session:
with open(self.output_file, 'a') as f:
json.dump(result, f, ensure_ascii=False)
f.write('\n')
# Console output return result
if self.jsonl_output:
print(json.dumps(result)) except aiohttp.ClientSSLError as e:
else: debug(f'SSL Error for {url}: {str(e)}')
self.processed_domains += 1 return {
if self.show_progress: 'domain': urllib.parse.urlparse(url).hostname,
info(f"{Colors.GRAY}[{self.processed_domains:,}]{Colors.RESET} {formatted}") 'status': -1,
else: 'error': f'SSL Error: {str(e)}',
info(formatted) 'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'SSL'
}
except aiohttp.ClientConnectorCertificateError as e:
debug(f'Certificate Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Certificate Error: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'CERT'
}
except aiohttp.ClientConnectorError as e:
debug(f'Connection Error for {url}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Connection Failed: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'CONN'
}
except aiohttp.ClientError as e:
debug(f'HTTP Error for {url}: {e.__class__.__name__}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'HTTP Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'HTTP'
}
except asyncio.TimeoutError:
debug(f'Timeout for {url}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Connection Timed Out after {self.timeout}s',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'TIMEOUT'
}
except Exception as e:
debug(f'Unexpected error for {url}: {e.__class__.__name__}: {str(e)}')
return {
'domain': urllib.parse.urlparse(url).hostname,
'status': -1,
'error': f'Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'https' if url.startswith('https://') else 'http',
'error_type': 'UNKNOWN'
}
async def scan(self, input_source): async def scan(self, input_source):
''' '''
Scan domains from a file or stdin Scan domains from a file, stdin, or async generator
:param input_source: Path to file or '-' for stdin :param input_source: Can be:
- Path to file (str)
- stdin ('-')
- List/tuple of domains
- Async generator yielding domains
:yields: Result dictionary for each domain scanned
''' '''
if not self.resolvers: if not self.resolvers:
await self.init() self.resolvers = await load_resolvers(self.resolver_file)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: # Just use ssl=False, that's all we need
tasks = set() connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = {} # Change to dict to track domain for each task
domain_queue = asyncio.Queue()
queue_empty = False
# Process domains with concurrent limit async def process_domain(domain):
for domain in input_generator(input_source): try:
if len(tasks) >= self.concurrent_limit: result = await self.check_domain(session, domain)
done, tasks = await asyncio.wait( if self.show_progress:
tasks, return_when=asyncio.FIRST_COMPLETED self.progress_count += 1
) if result:
for task in done: return domain, result
result = await task else:
await self.process_result(result) # Create a proper error result if check_domain returns None
return domain, {
'domain': domain,
'status': -1,
'error': 'No successful response from either HTTP or HTTPS',
'protocol': 'unknown',
'error_type': 'NO_RESPONSE'
}
except Exception as e:
debug(f'Error processing {domain}: {e.__class__.__name__}: {str(e)}')
# Return structured error information
return domain, {
'domain': domain,
'status': -1,
'error': f'{e.__class__.__name__}: {str(e)}',
'protocol': 'unknown',
'error_type': 'PROCESS'
}
task = asyncio.create_task(self.check_domain(session, domain)) # Queue processor
tasks.add(task) async def queue_processor():
async for domain in input_generator(input_source, self.shard):
await domain_queue.put(domain)
self.processed_domains += 1
nonlocal queue_empty
queue_empty = True
# Process remaining tasks # Start queue processor
if tasks: queue_task = asyncio.create_task(queue_processor())
done, _ = await asyncio.wait(tasks)
for task in done: try:
result = await task while not (queue_empty and domain_queue.empty() and not tasks):
await self.process_result(result) # Fill up tasks until we hit concurrent limit
while len(tasks) < self.concurrent_limit and not domain_queue.empty():
domain = await domain_queue.get()
task = asyncio.create_task(process_domain(domain))
tasks[task] = domain
if tasks:
# Wait for at least one task to complete
done, _ = await asyncio.wait(
tasks.keys(),
return_when=asyncio.FIRST_COMPLETED
)
# Process completed tasks
for task in done:
domain = tasks.pop(task)
try:
_, result = await task
if result:
yield result
except Exception as e:
debug(f'Task error for {domain}: {e.__class__.__name__}: {str(e)}')
yield {
'domain': domain,
'status': -1,
'error': f'Task Error: {e.__class__.__name__}: {str(e)}',
'protocol': 'unknown',
'error_type': 'TASK'
}
else:
await asyncio.sleep(0.1) # Prevent CPU spin when no tasks
finally:
# Clean up
for task in tasks:
task.cancel()
queue_task.cancel()
try:
await queue_task
except asyncio.CancelledError:
pass

View File

@ -3,7 +3,9 @@
# httpz_scanner/utils.py # httpz_scanner/utils.py
import logging import logging
import os
import sys import sys
import asyncio
# Global for silent mode # Global for silent mode
@ -11,63 +13,64 @@ SILENT_MODE = False
# List of user agents to randomize requests # List of user agents to randomize requests
USER_AGENTS = [ USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0',
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36", 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.6.5 Chrome/124.0.6367.243 Electron/30.1.2 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.6.5 Chrome/124.0.6367.243 Electron/30.1.2 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 OPR/116.0.0.0", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 OPR/116.0.0.0',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:134.0) Gecko/20100101 Firefox/134.0", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:134.0) Gecko/20100101 Firefox/134.0',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.8.3 Chrome/130.0.6723.191 Electron/33.3.2 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.8.3 Chrome/130.0.6723.191 Electron/33.3.2 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.3 Safari/605.1.15", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.3 Safari/605.1.15',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Safari/605.1.15", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Safari/605.1.15',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.6613.137 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.6613.137 Safari/537.36',
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:134.0) Gecko/20100101 Firefox/134.0", 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:134.0) Gecko/20100101 Firefox/134.0',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Safari/605.1.15", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Safari/605.1.15',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1.1 Safari/605.1.15", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1.1 Safari/605.1.15',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
"Mozilla/5.0 (X11; Linux x86_64; rv:134.0) Gecko/20100101 Firefox/134.0", 'Mozilla/5.0 (X11; Linux x86_64; rv:134.0) Gecko/20100101 Firefox/134.0',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:135.0) Gecko/20100101 Firefox/135.0", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:135.0) Gecko/20100101 Firefox/135.0',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.5.12 Chrome/120.0.6099.283 Electron/28.2.3 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.5.12 Chrome/120.0.6099.283 Electron/28.2.3 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0',
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0',
"Mozilla/5.0 (X11; CrOS x86_64 14541.0.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36", 'Mozilla/5.0 (X11; CrOS x86_64 14541.0.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36 OPR/114.0.0.0", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36 OPR/114.0.0.0',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36", 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1 Safari/605.1.15", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1 Safari/605.1.15',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36',
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.7.7 Chrome/128.0.6613.186 Electron/32.2.5 Safari/537.36" 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.7.7 Chrome/128.0.6613.186 Electron/32.2.5 Safari/537.36'
] ]
def debug(msg: str): def debug(msg: str):
if not SILENT_MODE: logging.debug(msg) if not SILENT_MODE:
logging.debug(msg)
def error(msg: str): def error(msg: str):
if not SILENT_MODE: logging.error(msg) if not SILENT_MODE: logging.error(msg)
def info(msg: str): def info(msg: str):
@ -97,19 +100,58 @@ def human_size(size_bytes: int) -> str:
return f'{size:.1f}{units[unit_index]}' return f'{size:.1f}{units[unit_index]}'
def input_generator(input_source: str): async def input_generator(input_source, shard: tuple = None):
''' '''
Generator function to yield domains from file or stdin Async generator function to yield domains from various input sources with optional sharding
:param input_source: file or stdin :param input_source: Can be:
- string path to local file
- "-" for stdin
- list/tuple of domains
- generator/iterator yielding domains
- string content with newlines
:param shard: Tuple of (shard_index, total_shards) for distributed scanning
''' '''
line_num = 0
# Handle stdin
if input_source == '-' or input_source is None: if input_source == '-' or input_source is None:
for line in sys.stdin: for line in sys.stdin:
if line.strip(): await asyncio.sleep(0)
yield line.strip() if line := line.strip():
else: if shard is None or line_num % shard[1] == shard[0]:
yield line
line_num += 1
# Handle local files
elif isinstance(input_source, str) and os.path.exists(input_source):
with open(input_source, 'r') as f: with open(input_source, 'r') as f:
for line in f: for line in f:
if line.strip(): await asyncio.sleep(0)
yield line.strip() if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
line_num += 1
# Handle iterables (generators, lists, etc)
elif hasattr(input_source, '__iter__') and not isinstance(input_source, (str, bytes)):
for line in input_source:
await asyncio.sleep(0)
if isinstance(line, bytes):
line = line.decode()
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
line_num += 1
# Handle string content with newlines
elif isinstance(input_source, (str, bytes)):
if isinstance(input_source, bytes):
input_source = input_source.decode()
for line in input_source.splitlines():
await asyncio.sleep(0)
if line := line.strip():
if shard is None or line_num % shard[1] == shard[0]:
yield line
line_num += 1

View File

@ -4,12 +4,13 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
with open('README.md', 'r', encoding='utf-8') as f: with open('README.md', 'r', encoding='utf-8') as f:
long_description = f.read() long_description = f.read()
setup( setup(
name='httpz_scanner', name='httpz_scanner',
version='1.0.9', version='2.1.8',
author='acidvegas', author='acidvegas',
author_email='acid.vegas@acid.vegas', author_email='acid.vegas@acid.vegas',
description='Hyper-fast HTTP Scraping Tool', description='Hyper-fast HTTP Scraping Tool',

235
unit_test.py Normal file
View File

@ -0,0 +1,235 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Unit Tests
# unit_test.py
import asyncio
import logging
import sys
import time
try:
from httpz_scanner import HTTPZScanner
from httpz_scanner.colors import Colors
except ImportError:
raise ImportError('missing httpz_scanner library (pip install httpz_scanner)')
class ColoredFormatter(logging.Formatter):
'''Custom formatter for colored log output'''
def format(self, record):
if record.levelno == logging.INFO:
color = Colors.GREEN
elif record.levelno == logging.WARNING:
color = Colors.YELLOW
elif record.levelno == logging.ERROR:
color = Colors.RED
else:
color = Colors.RESET
record.msg = f'{color}{record.msg}{Colors.RESET}'
return super().format(record)
# Configure logging with colors
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.setLevel(logging.INFO)
logger.addHandler(handler)
async def get_domains_from_url() -> list:
'''
Fetch domains from SecLists URL
:return: List of domains
'''
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp library (pip install aiohttp)')
url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return [line.strip() for line in content.splitlines() if line.strip()]
async def domain_generator(domains: list):
'''
Async generator that yields domains
:param domains: List of domains to yield
'''
for domain in domains:
await asyncio.sleep(0) # Allow other coroutines to run
yield domain
async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple:
'''Run a single benchmark test'''
logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
count = 0
got_first = False
start_time = None
if test_type == 'List':
async for result in scanner.scan(domains):
if result:
if not got_first:
got_first = True
start_time = time.time()
count += 1
# More detailed status reporting
status_str = ''
if result['status'] < 0:
error_type = result.get('error_type', 'UNKNOWN')
error_msg = result.get('error', 'Unknown Error')
status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}"
elif 200 <= result['status'] < 300:
status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
elif 300 <= result['status'] < 400:
status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
else:
status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
# Show protocol and response headers if available
protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else ''
headers_info = ''
if result.get('response_headers'):
important_headers = ['server', 'location', 'content-type']
headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers]
if headers:
headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}"
# Show redirect chain if present
redirect_info = ''
if result.get('redirect_chain'):
redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}"
# Show error details if present
error_info = ''
if result.get('error'):
error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}"
# Show final URL if different from original
url_info = ''
if result.get('url') and result['url'] != f"http(s)://{result['domain']}":
url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}"
logging.info(
f"{test_type}-{concurrency} Result {count}: "
f"{status_str}{protocol_info} "
f"{Colors.CYAN}{result['domain']}{Colors.RESET}"
f"{redirect_info}"
f"{url_info}"
f"{headers_info}"
f"{error_info}"
)
else:
# Skip generator test
pass
elapsed = time.time() - start_time if start_time else 0
domains_per_sec = count/elapsed if elapsed > 0 else 0
logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}')
return elapsed, domains_per_sec
async def test_list_input(domains: list):
'''Test scanning using a list input'''
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
count = 0
async for result in scanner.scan(domains):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
async def test_generator_input(domains: list):
'''Test scanning using an async generator input'''
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
start_time = time.time()
count = 0
async for result in scanner.scan(domain_generator(domains)):
if result:
count += 1
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
async def main() -> None:
'''Main test function'''
try:
# Fetch domains
domains = await get_domains_from_url()
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
# Store benchmark results
results = []
# Run tests with different concurrency levels
for concurrency in [25, 50, 100]:
# Generator tests
gen_result = await run_benchmark('Generator', domains, concurrency)
results.append(('Generator', concurrency, *gen_result))
# List tests
list_result = await run_benchmark('List', domains, concurrency)
results.append(('List', concurrency, *list_result))
# Print benchmark comparison
logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}')
logging.info('-' * 80)
logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}')
logging.info('-' * 80)
# Sort by domains per second (fastest first)
results.sort(key=lambda x: x[3], reverse=True)
for test_type, concurrency, elapsed, domains_per_sec in results:
logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}')
# Highlight fastest result
fastest = results[0]
logging.info('-' * 80)
logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections')
logging.info(f'Time: {fastest[2]:.2f} seconds')
logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}')
logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
except Exception as e:
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
sys.exit(1)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}')
sys.exit(1)