Productionalized, read for relase

This commit is contained in:
Dionysus 2025-02-11 02:15:39 -05:00
parent a006a1dac4
commit 3138edc754
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
16 changed files with 1120 additions and 753 deletions

27
.gitignore vendored Normal file
View File

@ -0,0 +1,27 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Project specific
.env
logs/*
*.log
.log

3
MANIFEST.in Normal file
View File

@ -0,0 +1,3 @@
include LICENSE
include README.md
include requirements.txt

139
README.md
View File

@ -8,28 +8,155 @@ A high-performance concurrent web scanner written in Python. HTTPZ efficiently s
- [Python](https://www.python.org/downloads/)
- [aiohttp](https://pypi.org/project/aiohttp/)
- [apv](https://pypi.org/project/apv/)
- [beautifulsoup4](https://pypi.org/project/beautifulsoup4/)
- [cryptography](https://pypi.org/project/cryptography/)
- [dnspython](https://pypi.org/project/dnspython/)
- [mmh3](https://pypi.org/project/mmh3/)
- [python-dotenv](https://pypi.org/project/python-dotenv/)
- [tqdm](https://pypi.org/project/tqdm/)
## Installation
### Via pip (recommended)
```bash
# Install from PyPI
pip install httpz
# The 'httpz' command will now be available in your terminal
httpz --help
```
### From source
```bash
# Clone the repository
git clone https://github.com/acidvegas/httpz
cd httpz
chmod +x setup.sh
./setup.sh
pip install -r requirements.txt
```
## Usage
### Command Line Interface
Basic usage:
```bash
python httpz.py domains.txt [options]
python -m httpz domains.txt
```
### Arguments
Scan with all flags enabled and output to JSONL:
```bash
python -m httpz domains.txt -all -c 100 -o results.jsonl -j -p
```
Read from stdin:
```bash
cat domains.txt | python -m httpz - -all -c 100
echo "example.com" | python -m httpz - -all
```
Filter by status codes and follow redirects:
```bash
httpz domains.txt -mc 200,301-399 -ec 404,500 -fr -p
```
Show specific fields with custom timeout and resolvers:
```bash
httpz domains.txt -sc -ti -i -tls -to 10 -r resolvers.txt
```
Full scan with all options:
```bash
httpz domains.txt -c 100 -o output.jsonl -j -all -to 10 -mc 200,301 -ec 404,500 -p -ax -r resolvers.txt
```
### Python Library
```python
import asyncio
from httpz import HTTPZScanner
async def scan_domains():
# Initialize scanner with all possible options (showing defaults)
scanner = HTTPZScanner(
# Core settings
concurrent_limit=100, # Number of concurrent requests
timeout=5, # Request timeout in seconds
follow_redirects=False, # Follow redirects (max 10)
check_axfr=False, # Try AXFR transfer against nameservers
resolver_file=None, # Path to custom DNS resolvers file
output_file=None, # Path to JSONL output file
show_progress=False, # Show progress counter
debug_mode=False, # Show error states and debug info
jsonl_output=False, # Output in JSONL format
# Control which fields to show (all False by default unless show_fields is None)
show_fields={
'status_code': True, # Show status code
'content_type': True, # Show content type
'content_length': True, # Show content length
'title': True, # Show page title
'body': True, # Show body preview
'ip': True, # Show IP addresses
'favicon': True, # Show favicon hash
'headers': True, # Show response headers
'follow_redirects': True, # Show redirect chain
'cname': True, # Show CNAME records
'tls': True # Show TLS certificate info
},
# Filter results
match_codes={200, 301, 302}, # Only show these status codes
exclude_codes={404, 500, 503} # Exclude these status codes
)
# Initialize resolvers (required before scanning)
await scanner.init()
# Scan domains from file
await scanner.scan('domains.txt')
# Or scan from stdin
await scanner.scan('-')
if __name__ == '__main__':
asyncio.run(scan_domains())
```
The scanner will return results in this format:
```python
{
'domain': 'example.com', # Base domain
'url': 'https://example.com', # Full URL
'status': 200, # HTTP status code
'port': 443, # Port number
'title': 'Example Domain', # Page title
'body': 'Example body text...', # Body preview
'content_type': 'text/html', # Content type
'content_length': '12345', # Content length
'ips': ['93.184.216.34'], # IP addresses
'cname': 'cdn.example.com', # CNAME record
'nameservers': ['ns1.example.com'],# Nameservers
'favicon_hash': '123456789', # Favicon hash
'headers': { # Response headers
'Server': 'nginx',
'Content-Type': 'text/html'
},
'redirect_chain': [ # Redirect history
'http://example.com',
'https://example.com'
],
'tls': { # TLS certificate info
'fingerprint': 'sha256...',
'common_name': 'example.com',
'issuer': 'Let\'s Encrypt',
'alt_names': ['www.example.com'],
'not_before': '2023-01-01T00:00:00',
'not_after': '2024-01-01T00:00:00',
'version': 3,
'serial_number': 'abcdef1234'
}
}
```
## Arguments
| Argument | Long Form | Description |
|-----------|------------------|-------------------------------------------------------------|

728
httpz.py
View File

@ -1,728 +0,0 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
'''
BCUZ FUCK PROJECT DISCOVERY PYTHON STILL GO HARD
REAL BAY SHIT FOR REAL BAY MOTHER FUCKERS
'''
import argparse
import asyncio
import itertools
import json
import logging
import os
import random
import sys
try:
import aiohttp
except ImportError:
raise ImportError('missing \'aiohttp\' library (pip install aiohttp)')
try:
import apv
except ImportError:
raise ImportError('missing \'apv\' library (pip install apv)')
try:
import bs4
except ImportError:
raise ImportError('missing \'bs4\' library (pip install beautifulsoup4)')
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.x509.oid import NameOID
except ImportError:
raise ImportError('missing \'cryptography\' library (pip install cryptography)')
try:
import dns.asyncresolver
import dns.query
import dns.resolver
import dns.zone
except ImportError:
raise ImportError('missing \'dns\' library (pip install dnspython)')
try:
import mmh3
except ImportError:
raise ImportError('missing \'mmh3\' library (pip install mmh3)')
class Colors:
'''ANSI color codes for terminal output'''
HEADER = '\033[95m' # Light purple
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
RESET = '\033[0m'
PURPLE = '\033[35m' # Dark purple
LIGHT_RED = '\033[38;5;203m' # Light red
DARK_GREEN = '\033[38;5;22m' # Dark green
PINK = '\033[38;5;198m' # Bright pink
GRAY = '\033[90m' # Gray color
CYAN = '\033[96m' # Cyan color
# Global for silent mode
SILENT_MODE = False
def debug(msg: str):
if not SILENT_MODE: logging.debug(msg)
def error(msg: str):
if not SILENT_MODE: logging.error(msg)
def info(msg: str):
if not SILENT_MODE: logging.info(msg)
async def get_cert_info(ssl_object, url: str) -> dict:
'''
Get SSL certificate information for a domain
:param ssl_object: SSL object to get certificate info from
:param url: URL to get certificate info from
'''
try:
# Check if we have a certificate
if not ssl_object:
return None
# Get the certificate in DER format
if not (cert_der := ssl_object.getpeercert(binary_form=True)):
return None
# Load the certificate
cert = x509.load_der_x509_certificate(cert_der)
# Extract all subject alternative names
try:
san_extension = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
alt_names = [name.value for name in san_extension.value] if san_extension else []
except x509.extensions.ExtensionNotFound:
alt_names = []
# Get subject CN
try:
common_name = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except IndexError:
common_name = None
# Get issuer CN
try:
issuer = cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except IndexError:
issuer = None
return {
'fingerprint' : cert.fingerprint(hashes.SHA256()).hex(),
'common_name' : common_name,
'issuer' : issuer,
'alt_names' : alt_names,
'not_before' : cert.not_valid_before_utc.isoformat(),
'not_after' : cert.not_valid_after_utc.isoformat(),
'version' : cert.version.value,
'serial_number' : format(cert.serial_number, 'x'),
}
except Exception as e:
error(f'Error getting cert info for {url}: {str(e)}')
return None
async def get_favicon_hash(session: aiohttp.ClientSession, base_url: str, html: str) -> str:
'''
Get favicon hash from a webpage
:param session: aiohttp client session
:param base_url: base URL of the website
:param html: HTML content of the page
'''
try:
soup = bs4.BeautifulSoup(html, 'html.parser')
# Try to find favicon in link tags
favicon_url = None
for link in soup.find_all('link'):
if link.get('rel') and any(x.lower() == 'icon' for x in link.get('rel')):
favicon_url = link.get('href')
break
if not favicon_url:
# Try default location
favicon_url = '/favicon.ico'
# Handle relative URLs
if favicon_url.startswith('//'):
favicon_url = 'https:' + favicon_url
elif favicon_url.startswith('/'):
favicon_url = base_url + favicon_url
elif not favicon_url.startswith(('http://', 'https://')):
favicon_url = base_url + '/' + favicon_url
# Get favicon hash
async with session.get(favicon_url, timeout=10) as response:
if response.status == 200:
content = (await response.read())[:1024*1024]
hash_value = mmh3.hash64(content)[0]
if hash_value != 0:
return str(hash_value)
except Exception as e:
debug(f'Error getting favicon for {base_url}: {str(e)}')
return None
def human_size(size_bytes: int) -> str:
'''
Convert bytes to human readable string
:param size_bytes: Size in bytes
'''
if not size_bytes:
return '0B'
units = ('B', 'KB', 'MB', 'GB')
size = float(size_bytes)
unit_index = 0
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
return f'{size:.1f}{units[unit_index]}'
def input_generator(input_source: str):
'''
Generator function to yield domains from file or stdin
:param input_source: path to file containing domains, or None for stdin
'''
if input_source == '-' or input_source is None:
for line in sys.stdin:
if line.strip():
yield line.strip()
else:
with open(input_source, 'r') as f:
for line in f:
if line.strip():
yield line.strip()
async def load_resolvers(resolver_file: str = None) -> list:
'''
Load DNS resolvers from file or return default resolvers
:param resolver_file: Path to file containing resolver IPs
:return: List of resolver IPs
'''
if resolver_file:
try:
with open(resolver_file) as f:
resolvers = [line.strip() for line in f if line.strip()]
if resolvers:
return resolvers
except Exception as e:
debug(f'Error loading resolvers from {resolver_file}: {str(e)}')
else:
async with aiohttp.ClientSession() as session:
async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
resolvers = await response.text()
if not SILENT_MODE:
info(f'Loaded {len(resolvers.splitlines()):,} resolvers.')
return [resolver.strip() for resolver in resolvers.splitlines()]
async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple:
'''
Resolve all DNS records (NS, A, AAAA, CNAME) for a domain
:param domain: Domain to resolve
:param timeout: Timeout in seconds
:param nameserver: Specific nameserver to use
'''
# Create the resolver
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
# Set the nameserver if provided
if nameserver:
resolver.nameservers = [nameserver]
# Do all DNS lookups at once
results = await asyncio.gather(*[resolver.resolve(domain, rtype) for rtype in ('NS', 'A', 'AAAA', 'CNAME')], return_exceptions=True)
# Parse the results
nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else []
ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
# Get NS IPs
ns_ips = {}
if nameservers:
ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) for ns in nameservers for rtype in ('A', 'AAAA')], return_exceptions=True)
for i, ns in enumerate(nameservers):
ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] if isinstance(records, dns.resolver.Answer) for ip in records]
# Try AXFR if enabled (using already resolved nameserver IPs)
if check_axfr:
try:
# Create the axfrout directory if it doesn't exist
os.makedirs('axfrout', exist_ok=True)
# Iterate over each nameserver and their IPs
for ns_host, ips in ns_ips.items():
for ns_ip in ips:
try:
# Perform the AXFR transfer
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
# Write the zone to a file
with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f:
zone.to_text(f)
info(f'{Colors.GREEN}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}')
except Exception as e:
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e:
debug(f'Failed AXFR for {domain}: {str(e)}')
return sorted(set(ips)), cname, nameservers, ns_ips
def parse_domain_url(domain: str) -> tuple:
'''
Parse domain string into base domain, port, and protocol list
:param domain: Raw domain string to parse
:return: Tuple of (base_domain, port, protocols)
'''
port = None
base_domain = domain.rstrip('/')
if base_domain.startswith(('http://', 'https://')):
protocol = 'https://' if base_domain.startswith('https://') else 'http://'
base_domain = base_domain.split('://', 1)[1]
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
try:
port = int(port_str.split('/')[0])
except ValueError:
port = 443 if protocol == 'https://' else 80
else:
port = 443 if protocol == 'https://' else 80
protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}']
else:
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else 443
else:
port = 443
protocols = [
f'https://{base_domain}{":" + str(port) if port else ""}',
f'http://{base_domain}{":" + str(port) if port else ""}'
]
return base_domain, port, protocols
async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redirects: bool = False, timeout: int = 5, check_axfr: bool = False, resolvers: list = None) -> dict:
'''
Check a single domain for its status code, title, and body preview
:param session: aiohttp client session
:param domain: domain to check
:param follow_redirects: whether to follow redirects
:param timeout: timeout in seconds
:param check_axfr: whether to check for AXFR
:param resolvers: list of DNS resolvers to use
'''
nameserver = random.choice(resolvers) if resolvers else None
base_domain, port, protocols = parse_domain_url(domain)
result = {
'domain' : base_domain,
'status' : 0,
'title' : None,
'body' : None,
'content_type' : None,
'url' : protocols[0],
'port' : port,
'ips' : [],
'cname' : None,
'nameservers' : [],
'favicon_hash' : None,
'headers' : {},
'content_length' : None,
'redirect_chain' : [],
'tls' : None
}
# Do DNS lookups
result['ips'], result['cname'], result['nameservers'], _ = await resolve_all_dns(base_domain, timeout, nameserver, check_axfr)
# Try each protocol
for url in protocols:
try:
async with session.get(url, timeout=timeout, allow_redirects=follow_redirects, max_redirects=10 if follow_redirects else 0) as response:
result.update({
'status' : response.status,
'url' : str(response.url),
'headers' : dict(response.headers),
'content_type' : response.headers.get('content-type', '').split(';')[0],
'content_length' : response.headers.get('content-length'),
'redirect_chain' : [str(h.url) for h in response.history] + [str(response.url)] if follow_redirects and response.history else []
})
if response.url.scheme == 'https':
try:
if ssl_object := response._protocol.transport.get_extra_info('ssl_object'):
result['tls'] = await get_cert_info(ssl_object, str(response.url))
except AttributeError:
debug(f'Failed to get SSL info for {url}')
if response.status == 200:
html = (await response.text())[:1024*1024]
soup = bs4.BeautifulSoup(html, 'html.parser')
result.update({
'title' : ' '.join(soup.title.string.strip().split()).rstrip('.')[:300] if soup.title and soup.title.string else None,
'body' : ' '.join(soup.get_text().split()).rstrip('.')[:500] if soup.get_text() else None,
'favicon_hash' : await get_favicon_hash(session, url, html)
})
break
except Exception as e:
debug(f'Error checking {url}: {str(e)}')
result['status'] = -1
continue
return result
def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
'''
Format the output with colored sections
:param result: Dictionary containing domain check results
:param debug: Whether to show error states
:param show_fields: Dictionary of fields to show
:param match_codes: Set of status codes to match
:param exclude_codes: Set of status codes to exclude
'''
# Skip errors unless in debug mode
if result['status'] < 0 and not debug:
return ''
# Skip if status code doesn't match filters
if match_codes and result['status'] not in match_codes:
return ''
if exclude_codes and result['status'] in exclude_codes:
return ''
parts = []
# Status code
if show_fields['status_code']:
if result['status'] < 0:
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
elif 200 <= result['status'] < 300:
status = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
elif 300 <= result['status'] < 400:
status = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
else: # 400+ and 500+ codes
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
parts.append(status)
# Domain (always shown)
parts.append(f"[{result['url']}]")
# Title
if show_fields['title'] and result['title']:
parts.append(f"{Colors.DARK_GREEN}[{result['title']}]{Colors.RESET}")
# Body
if show_fields['body'] and result['body']:
body = result['body'][:100] + ('...' if len(result['body']) > 100 else '')
parts.append(f"{Colors.BLUE}[{body}]{Colors.RESET}")
# IPs
if show_fields['ip'] and result['ips']:
ips_text = ', '.join(result['ips'])
parts.append(f"{Colors.YELLOW}[{ips_text}]{Colors.RESET}")
# Favicon hash
if show_fields['favicon'] and result['favicon_hash']:
parts.append(f"{Colors.PURPLE}[{result['favicon_hash']}]{Colors.RESET}")
# Headers (includes content-type and content-length)
if show_fields['headers'] and result['headers']:
headers_text = []
for k, v in result['headers'].items():
headers_text.append(f"{k}: {v}")
parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}")
else:
# Only show content-type and content-length if headers aren't shown
if show_fields['content_type'] and result['content_type']:
parts.append(f"{Colors.HEADER}[{result['content_type']}]{Colors.RESET}")
if show_fields['content_length'] and result['content_length']:
try:
size = human_size(int(result['content_length']))
parts.append(f"{Colors.PINK}[{size}]{Colors.RESET}")
except (ValueError, TypeError):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# CNAME
if show_fields['cname'] and result['cname']:
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# Redirect Chain
if show_fields['follow_redirects'] and result['redirect_chain']:
chain = ' -> '.join(result['redirect_chain'])
parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}")
# TLS Certificate Info - Modified to always show if available
if result['tls']:
cert = result['tls']
tls_parts = []
if cert.get('subject'):
tls_parts.append(f"Subject: {cert['subject']}")
if cert.get('issuer'):
tls_parts.append(f"Issuer: {cert['issuer']}")
if cert.get('fingerprint'):
tls_parts.append(f"Fingerprint: {cert['fingerprint'][:16]}...")
if cert.get('alt_names'):
tls_parts.append(f"SANs: {', '.join(cert['alt_names'][:3])}")
if cert.get('not_before') and cert.get('not_after'):
tls_parts.append(f"Valid: {cert['not_before'].split('T')[0]} to {cert['not_after'].split('T')[0]}")
if cert.get('version'):
tls_parts.append(f"Version: {cert['version']}")
if cert.get('serial_number'):
tls_parts.append(f"Serial: {cert['serial_number'][:16]}...")
parts.append(f"{Colors.GREEN}[{' | '.join(tls_parts)}]{Colors.RESET}")
return ' '.join(parts)
def parse_status_codes(codes_str: str) -> set:
'''
Parse comma-separated status codes and ranges into a set of integers
:param codes_str: Comma-separated status codes (e.g., "200,301-399,404,500-503")
'''
codes = set()
try:
for part in codes_str.split(','):
if '-' in part:
start, end = map(int, part.split('-'))
codes.update(range(start, end + 1))
else:
codes.add(int(part))
return codes
except ValueError:
raise argparse.ArgumentTypeError('Invalid status code format. Use comma-separated numbers or ranges (e.g., 200,301-399,404,500-503)')
async def process_domains(input_source: str = None, debug: bool = False, concurrent_limit: int = 100, show_fields: dict = None, output_file: str = None, jsonl: bool = None, timeout: int = 5, match_codes: set = None, exclude_codes: set = None, show_progress: bool = False, check_axfr: bool = False, resolver_file: str = None):
'''
Process domains from a file or stdin with concurrent requests
:param input_source: path to file containing domains, or None for stdin
:param debug: Whether to show error states
:param concurrent_limit: maximum number of concurrent requests
:param show_fields: Dictionary of fields to show
:param output_file: Path to output file (JSONL format)
:param timeout: Request timeout in seconds
:param match_codes: Set of status codes to match
:param exclude_codes: Set of status codes to exclude
:param show_progress: Whether to show progress counter
:param check_axfr: Whether to check for AXFR
:param resolver_file: Path to file containing DNS resolvers
'''
# Check if input file exists
if input_source and input_source != '-' and not os.path.exists(input_source):
raise FileNotFoundError(f'Domain file not found: {input_source}')
# Initialize tasks and processed domains
tasks = set()
processed_domains = 0
# Load resolvers - await the coroutine
resolvers = await load_resolvers(resolver_file)
async def write_result(result: dict):
'''Write a single result to the output file'''
nonlocal processed_domains
# Create JSON output dict with required fields
output_dict = {'url': result['url'], 'domain': result['domain'], 'status': result['status'], 'port': result['port']}
# Add optional fields if they exist
if result['title']:
output_dict['title'] = result['title']
if result['body']:
output_dict['body'] = result['body']
if result['ips']:
output_dict['ips'] = result['ips']
if result['favicon_hash']:
output_dict['favicon_hash'] = result['favicon_hash']
if result['headers']:
output_dict['headers'] = result['headers']
if result['cname']:
output_dict['cname'] = result['cname']
if result['redirect_chain']:
output_dict['redirect_chain'] = result['redirect_chain']
if result['tls']:
output_dict['tls'] = result['tls']
if result['nameservers']:
output_dict['nameservers'] = result['nameservers']
# Get formatted output based on filters
formatted = format_console_output(result, debug, show_fields, match_codes, exclude_codes)
if formatted:
# Write to file if specified
if output_file:
if (not match_codes or result['status'] in match_codes) and (not exclude_codes or result['status'] not in exclude_codes):
with open(output_file, 'a') as f:
json.dump(output_dict, f, ensure_ascii=False)
f.write('\n')
# Console output
if jsonl:
print(json.dumps(output_dict))
else:
processed_domains += 1 # Increment counter for each domain processed
if show_progress:
info(f"{Colors.GRAY}[{processed_domains:,}]{Colors.RESET} {formatted}")
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
# Start initial batch of tasks
for domain in itertools.islice(input_generator(input_source), concurrent_limit):
task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout, check_axfr=check_axfr, resolvers=resolvers))
tasks.add(task)
# Process remaining domains, maintaining concurrent_limit active tasks
domains_iter = input_generator(input_source)
next(itertools.islice(domains_iter, concurrent_limit, concurrent_limit), None) # Skip first concurrent_limit domains
for domain in domains_iter:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
tasks = pending
for task in done:
result = await task
await write_result(result)
task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout, check_axfr=check_axfr, resolvers=resolvers))
tasks.add(task)
# Wait for remaining tasks
if tasks:
done, _ = await asyncio.wait(tasks)
for task in done:
result = await task
await write_result(result)
def main():
'''Main function to handle command line arguments and run the domain checker'''
global SILENT_MODE
# Setup argument parser
parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
# Add arguments
parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags')
parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks')
parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console')
parser.add_argument('-o', '--output', help='Output file path (JSONL format)')
# Output field flags
parser.add_argument('-b', '--body', action='store_true', help='Show body preview')
parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records')
parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length')
parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type')
parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers')
parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses')
parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code')
parser.add_argument('-ti', '--title', action='store_true', help='Show page title')
parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information')
# Other arguments
parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers')
parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)')
parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)')
parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter')
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)')
parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds')
# Parse arguments
args = parser.parse_args()
if not (SILENT_MODE := args.jsonl):
# Setup logging
if args.debug:
apv.setup_logging(level='DEBUG', log_to_disk=True, log_file_name='havoc', show_details=True)
logging.debug('Debug logging enabled')
else:
apv.setup_logging(level='INFO')
if args.file == '-':
logging.info('Reading domains from stdin')
else:
logging.info(f'Processing file: {args.file}')
# Setup show_fields
show_fields = {
'status_code' : args.all_flags or args.status_code,
'content_type' : args.all_flags or args.content_type,
'content_length' : args.all_flags or args.content_length,
'title' : args.all_flags or args.title,
'body' : args.all_flags or args.body,
'ip' : args.all_flags or args.ip,
'favicon' : args.all_flags or args.favicon,
'headers' : args.all_flags or args.headers,
'follow_redirects' : args.all_flags or args.follow_redirects,
'cname' : args.all_flags or args.cname,
'tls' : args.all_flags or args.tls_info
}
# If no fields specified show all
if not any(show_fields.values()):
show_fields = {k: True for k in show_fields}
try:
asyncio.run(process_domains(args.file, args.debug, args.concurrent, show_fields, args.output, args.jsonl, args.timeout, args.match_codes, args.exclude_codes, args.progress, check_axfr=args.axfr, resolver_file=args.resolvers))
except KeyboardInterrupt:
logging.warning('Process interrupted by user')
sys.exit(1)
except Exception as e:
logging.error(f'Unexpected error: {str(e)}')
sys.exit(1)
if __name__ == '__main__':
main()

9
httpz/__init__.py Normal file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz/__init__.py
from .scanner import HTTPZScanner
from .colors import Colors
__version__ = '1.0.0'

13
httpz/__main__.py Normal file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz/__main__.py
import asyncio
import sys
from .cli import main
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
sys.exit(1)

174
httpz/cli.py Normal file
View File

@ -0,0 +1,174 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz/cli.py
import argparse
import asyncio
import logging
import os
import sys
from .colors import Colors
from .scanner import HTTPZScanner
from .utils import SILENT_MODE, info
def setup_logging(level='INFO', log_to_disk=False):
'''
Setup logging configuration
:param level: Logging level (INFO or DEBUG)
:param log_to_disk: Whether to also log to file
'''
class ColoredFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
# Format: MM-DD HH:MM
from datetime import datetime
dt = datetime.fromtimestamp(record.created)
return f"{Colors.GRAY}{dt.strftime('%m-%d %H:%M')}{Colors.RESET}"
def format(self, record):
return f'{self.formatTime(record)} {record.getMessage()}'
handlers = []
# Console handler
console = logging.StreamHandler()
console.setFormatter(ColoredFormatter())
handlers.append(console)
# File handler
if log_to_disk:
os.makedirs('logs', exist_ok=True)
file_handler = logging.FileHandler(f'logs/httpz.log')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
handlers.append(file_handler)
# Setup logger
logging.basicConfig(
level=getattr(logging, level.upper()),
handlers=handlers
)
def parse_status_codes(codes_str: str) -> set:
'''
Parse comma-separated status codes and ranges into a set of integers
:param codes_str: Comma-separated status codes (e.g., "200,301-399,404,500-503")
'''
codes = set()
try:
for part in codes_str.split(','):
if '-' in part:
start, end = map(int, part.split('-'))
codes.update(range(start, end + 1))
else:
codes.add(int(part))
return codes
except ValueError:
raise argparse.ArgumentTypeError('Invalid status code format. Use comma-separated numbers or ranges (e.g., 200,301-399,404,500-503)')
async def main():
parser = argparse.ArgumentParser(
description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}',
formatter_class=argparse.RawDescriptionHelpFormatter
)
# Add arguments
parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags')
parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks')
parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console')
parser.add_argument('-o', '--output', help='Output file path (JSONL format)')
# Output field flags
parser.add_argument('-b', '--body', action='store_true', help='Show body preview')
parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records')
parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length')
parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type')
parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers')
parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses')
parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code')
parser.add_argument('-ti', '--title', action='store_true', help='Show page title')
parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information')
# Other arguments
parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers')
parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)')
parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)')
parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter')
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)')
parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds')
args = parser.parse_args()
# Setup logging based on arguments
global SILENT_MODE
SILENT_MODE = args.jsonl
if not SILENT_MODE:
if args.debug:
setup_logging(level='DEBUG', log_to_disk=True)
else:
setup_logging(level='INFO')
if args.file == '-':
info('Reading domains from stdin')
else:
info(f'Processing file: {args.file}')
# Setup show_fields
show_fields = {
'status_code' : args.all_flags or args.status_code,
'content_type' : args.all_flags or args.content_type,
'content_length' : args.all_flags or args.content_length,
'title' : args.all_flags or args.title,
'body' : args.all_flags or args.body,
'ip' : args.all_flags or args.ip,
'favicon' : args.all_flags or args.favicon,
'headers' : args.all_flags or args.headers,
'follow_redirects' : args.all_flags or args.follow_redirects,
'cname' : args.all_flags or args.cname,
'tls' : args.all_flags or args.tls_info
}
# If no fields specified show all
if not any(show_fields.values()):
show_fields = {k: True for k in show_fields}
try:
# Create scanner instance
scanner = HTTPZScanner(
concurrent_limit=args.concurrent,
timeout=args.timeout,
follow_redirects=args.all_flags or args.follow_redirects,
check_axfr=args.axfr,
resolver_file=args.resolvers,
output_file=args.output,
show_progress=args.progress,
debug_mode=args.debug,
jsonl_output=args.jsonl,
show_fields=show_fields,
match_codes=args.match_codes,
exclude_codes=args.exclude_codes
)
# Run the scanner with file/stdin input
await scanner.scan(args.file)
except KeyboardInterrupt:
logging.warning('Process interrupted by user')
sys.exit(1)
except Exception as e:
logging.error(f'Unexpected error: {str(e)}')
sys.exit(1)
def run():
'''Entry point for the CLI'''
asyncio.run(main())
if __name__ == '__main__':
run()

20
httpz/colors.py Normal file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz/colors.py
class Colors:
'''ANSI color codes for terminal output'''
HEADER = '\033[95m' # Light purple
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
RESET = '\033[0m'
PURPLE = '\033[35m' # Dark purple
LIGHT_RED = '\033[38;5;203m' # Light red
DARK_GREEN = '\033[38;5;22m' # Dark green
PINK = '\033[38;5;198m' # Bright pink
GRAY = '\033[90m' # Gray color
CYAN = '\033[96m' # Cyan color

98
httpz/dns.py Normal file
View File

@ -0,0 +1,98 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz/dns.py
import asyncio
import os
import aiohttp
import dns.asyncresolver
import dns.query
import dns.resolver
import dns.zone
from .utils import debug, info, SILENT_MODE
async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple:
'''
Resolve all DNS records for a domain
:param domain: Domain to resolve
:param timeout: Timeout in seconds
:param nameserver: Specific nameserver to use
:param check_axfr: Whether to attempt zone transfer
'''
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
if nameserver:
resolver.nameservers = [nameserver]
results = await asyncio.gather(*[resolver.resolve(domain, rtype)
for rtype in ('NS', 'A', 'AAAA', 'CNAME')],
return_exceptions=True)
nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else []
ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + \
([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
ns_ips = {}
if nameservers:
ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype)
for ns in nameservers
for rtype in ('A', 'AAAA')],
return_exceptions=True)
for i, ns in enumerate(nameservers):
ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2]
if isinstance(records, dns.resolver.Answer)
for ip in records]
if check_axfr:
await attempt_axfr(domain, ns_ips, timeout)
return sorted(set(ips)), cname, nameservers, ns_ips
async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
'''
Attempt zone transfer for a domain
:param domain: Domain to attempt AXFR transfer
:param ns_ips: Dictionary of nameserver hostnames to their IPs
:param timeout: Timeout in seconds
'''
try:
os.makedirs('axfrout', exist_ok=True)
for ns_host, ips in ns_ips.items():
for ns_ip in ips:
try:
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f:
zone.to_text(f)
info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})')
except Exception as e:
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e:
debug(f'Failed AXFR for {domain}: {str(e)}')
async def load_resolvers(resolver_file: str = None) -> list:
'''
Load DNS resolvers from file or default source
:param resolver_file: Path to file containing resolver IPs
:return: List of resolver IPs
'''
if resolver_file:
try:
with open(resolver_file) as f:
resolvers = [line.strip() for line in f if line.strip()]
if resolvers:
return resolvers
except Exception as e:
debug(f'Error loading resolvers from {resolver_file}: {str(e)}')
async with aiohttp.ClientSession() as session:
async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
resolvers = await response.text()
if not SILENT_MODE:
info(f'Loaded {len(resolvers.splitlines()):,} resolvers.')
return [resolver.strip() for resolver in resolvers.splitlines()]

107
httpz/formatters.py Normal file
View File

@ -0,0 +1,107 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz/formatters.py
from .colors import Colors
from .utils import human_size
def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
'''
Format the output with colored sections
:param result: Dictionary containing domain check results
:param debug: Whether to show error states
:param show_fields: Dictionary of fields to show
:param match_codes: Set of status codes to match
:param exclude_codes: Set of status codes to exclude
'''
if result['status'] < 0 and not debug:
return ''
if match_codes and result['status'] not in match_codes:
return ''
if exclude_codes and result['status'] in exclude_codes:
return ''
parts = []
# Status code
if show_fields.get('status_code'):
if result['status'] < 0:
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
elif 200 <= result['status'] < 300:
status = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
elif 300 <= result['status'] < 400:
status = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
else:
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
parts.append(status)
# Domain (always shown)
parts.append(f"[{result['url']}]")
# Title
if show_fields.get('title') and result.get('title'):
parts.append(f"{Colors.DARK_GREEN}[{result['title']}]{Colors.RESET}")
# Body preview
if show_fields.get('body') and result.get('body'):
body = result['body'][:100] + ('...' if len(result['body']) > 100 else '')
parts.append(f"{Colors.BLUE}[{body}]{Colors.RESET}")
# IPs
if show_fields.get('ip') and result.get('ips'):
ips_text = ', '.join(result['ips'])
parts.append(f"{Colors.YELLOW}[{ips_text}]{Colors.RESET}")
# Favicon hash
if show_fields.get('favicon') and result.get('favicon_hash'):
parts.append(f"{Colors.PURPLE}[{result['favicon_hash']}]{Colors.RESET}")
# Headers
if show_fields.get('headers') and result.get('headers'):
headers_text = [f"{k}: {v}" for k, v in result['headers'].items()]
parts.append(f"{Colors.CYAN}[{', '.join(headers_text)}]{Colors.RESET}")
else:
if show_fields.get('content_type') and result.get('content_type'):
parts.append(f"{Colors.HEADER}[{result['content_type']}]{Colors.RESET}")
if show_fields.get('content_length') and result.get('content_length'):
try:
size = human_size(int(result['content_length']))
parts.append(f"{Colors.PINK}[{size}]{Colors.RESET}")
except (ValueError, TypeError):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# CNAME
if show_fields.get('cname') and result.get('cname'):
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# Redirect Chain
if show_fields.get('follow_redirects') and result.get('redirect_chain'):
chain = ' -> '.join(result['redirect_chain'])
parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}")
# TLS Certificate Info
if result.get('tls'):
cert = result['tls']
tls_parts = []
if cert.get('common_name'):
tls_parts.append(f"Subject: {cert['common_name']}")
if cert.get('issuer'):
tls_parts.append(f"Issuer: {cert['issuer']}")
if cert.get('fingerprint'):
tls_parts.append(f"Fingerprint: {cert['fingerprint'][:16]}...")
if cert.get('alt_names'):
tls_parts.append(f"SANs: {', '.join(cert['alt_names'][:3])}")
if cert.get('not_before') and cert.get('not_after'):
tls_parts.append(f"Valid: {cert['not_before'].split('T')[0]} to {cert['not_after'].split('T')[0]}")
if cert.get('version'):
tls_parts.append(f"Version: {cert['version']}")
if cert.get('serial_number'):
tls_parts.append(f"Serial: {cert['serial_number'][:16]}...")
if tls_parts: # Only add TLS info if we have any parts
parts.append(f"{Colors.GREEN}[{' | '.join(tls_parts)}]{Colors.RESET}")
return ' '.join(parts)

140
httpz/parsers.py Normal file
View File

@ -0,0 +1,140 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz/parsers.py
try:
import bs4
except ImportError:
raise ImportError('missing bs4 module (pip install beautifulsoup4)')
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.x509.oid import NameOID
except ImportError:
raise ImportError('missing cryptography module (pip install cryptography)')
try:
import mmh3
except ImportError:
raise ImportError('missing mmh3 module (pip install mmh3)')
from .utils import debug, error
def parse_domain_url(domain: str) -> tuple:
'''
Parse domain string into base domain, port, and protocol list
:param domain: Raw domain string to parse
:return: Tuple of (base_domain, port, protocols)
'''
port = None
base_domain = domain.rstrip('/')
if base_domain.startswith(('http://', 'https://')):
protocol = 'https://' if base_domain.startswith('https://') else 'http://'
base_domain = base_domain.split('://', 1)[1]
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
try:
port = int(port_str.split('/')[0])
except ValueError:
port = 443 if protocol == 'https://' else 80
else:
port = 443 if protocol == 'https://' else 80
protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}']
else:
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else 443
else:
port = 443
protocols = [
f'https://{base_domain}{":" + str(port) if port else ""}',
f'http://{base_domain}{":" + str(port) if port else ""}'
]
return base_domain, port, protocols
async def get_cert_info(ssl_object, url: str) -> dict:
'''
Get SSL certificate information for a domain
:param ssl_object: SSL object to get certificate info from
:param url: URL to get certificate info from
'''
try:
if not ssl_object or not (cert_der := ssl_object.getpeercert(binary_form=True)):
return None
cert = x509.load_der_x509_certificate(cert_der)
try:
san_extension = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
alt_names = [name.value for name in san_extension.value] if san_extension else []
except x509.extensions.ExtensionNotFound:
alt_names = []
try:
common_name = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except IndexError:
common_name = None
try:
issuer = cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except IndexError:
issuer = None
return {
'fingerprint' : cert.fingerprint(hashes.SHA256()).hex(),
'common_name' : common_name,
'issuer' : issuer,
'alt_names' : alt_names,
'not_before' : cert.not_valid_before_utc.isoformat(),
'not_after' : cert.not_valid_after_utc.isoformat(),
'version' : cert.version.value,
'serial_number' : format(cert.serial_number, 'x'),
}
except Exception as e:
error(f'Error getting cert info for {url}: {str(e)}')
return None
async def get_favicon_hash(session, base_url: str, html: str) -> str:
'''
Get favicon hash from a webpage
:param session: aiohttp client session
:param base_url: base URL of the website
:param html: HTML content of the page
'''
try:
soup = bs4.BeautifulSoup(html, 'html.parser')
favicon_url = None
for link in soup.find_all('link'):
if link.get('rel') and any(x.lower() == 'icon' for x in link.get('rel')):
favicon_url = link.get('href')
break
if not favicon_url:
favicon_url = '/favicon.ico'
if favicon_url.startswith('//'):
favicon_url = 'https:' + favicon_url
elif favicon_url.startswith('/'):
favicon_url = base_url + favicon_url
elif not favicon_url.startswith(('http://', 'https://')):
favicon_url = base_url + '/' + favicon_url
async with session.get(favicon_url, timeout=10) as response:
if response.status == 200:
content = (await response.read())[:1024*1024]
hash_value = mmh3.hash64(content)[0]
if hash_value != 0:
return str(hash_value)
except Exception as e:
debug(f'Error getting favicon for {base_url}: {str(e)}')
return None

239
httpz/scanner.py Normal file
View File

@ -0,0 +1,239 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz/scanner.py
import asyncio
import json
import random
import sys
try:
import aiohttp
except ImportError:
raise ImportError('missing aiohttp module (pip install aiohttp)')
try:
import bs4
except ImportError:
raise ImportError('missing bs4 module (pip install beautifulsoup4)')
from .dns import resolve_all_dns, load_resolvers
from .formatters import format_console_output
from .colors import Colors
from .parsers import parse_domain_url, get_cert_info, get_favicon_hash
from .utils import debug, info, USER_AGENTS, input_generator
class HTTPZScanner:
'''Core scanner class for HTTP domain checking'''
def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None):
'''
Initialize the HTTPZScanner class
:param concurrent_limit: Maximum number of concurrent requests
:param timeout: Request timeout in seconds
:param follow_redirects: Follow redirects
:param check_axfr: Check for AXFR
:param resolver_file: Path to resolver file
:param output_file: Path to output file
:param show_progress: Show progress bar
:param debug_mode: Enable debug mode
:param jsonl_output: Output in JSONL format
:param show_fields: Fields to show
:param match_codes: Status codes to match
:param exclude_codes: Status codes to exclude
'''
self.concurrent_limit = concurrent_limit
self.timeout = timeout
self.follow_redirects = follow_redirects
self.check_axfr = check_axfr
self.resolver_file = resolver_file
self.output_file = output_file
self.show_progress = show_progress
self.debug_mode = debug_mode
self.jsonl_output = jsonl_output
self.show_fields = show_fields or {
'status_code' : True,
'content_type' : True,
'content_length' : True,
'title' : True,
'body' : True,
'ip' : True,
'favicon' : True,
'headers' : True,
'follow_redirects' : True,
'cname' : True,
'tls' : True
}
self.match_codes = match_codes
self.exclude_codes = exclude_codes
self.resolvers = None
self.processed_domains = 0
async def init(self):
'''Initialize resolvers - must be called before scanning'''
self.resolvers = await load_resolvers(self.resolver_file)
async def check_domain(self, session: aiohttp.ClientSession, domain: str):
'''Check a single domain and return results'''
nameserver = random.choice(self.resolvers) if self.resolvers else None
base_domain, port, protocols = parse_domain_url(domain)
result = {
'domain' : base_domain,
'status' : 0,
'url' : protocols[0],
'port' : port,
}
# Try each protocol
for url in protocols:
try:
# Set random user agent for each request
headers = {'User-Agent': random.choice(USER_AGENTS)}
async with session.get(url, timeout=self.timeout,
allow_redirects=self.follow_redirects,
max_redirects=10 if self.follow_redirects else 0,
headers=headers) as response:
result['status'] = response.status
# Early exit if status code doesn't match criteria
if self.match_codes and result['status'] not in self.match_codes:
return result
if self.exclude_codes and result['status'] in self.exclude_codes:
return result
# Continue with full processing only if status code matches criteria
result['url'] = str(response.url)
# Add headers if requested
headers = dict(response.headers)
if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')):
result['headers'] = headers
else:
# Only add content type/length if headers aren't included
if content_type := response.headers.get('content-type', '').split(';')[0]:
result['content_type'] = content_type
if content_length := response.headers.get('content-length'):
result['content_length'] = content_length
# Only add redirect chain if it exists
if self.follow_redirects and response.history:
result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
# Do DNS lookups only if we're going to use the result
ips, cname, nameservers, _ = await resolve_all_dns(
base_domain, self.timeout, nameserver, self.check_axfr
)
# Only add DNS fields if they have values
if ips:
result['ips'] = ips
if cname:
result['cname'] = cname
if nameservers:
result['nameservers'] = nameservers
# Only add TLS info if available
if response.url.scheme == 'https':
try:
if ssl_object := response._protocol.transport.get_extra_info('ssl_object'):
if tls_info := await get_cert_info(ssl_object, str(response.url)):
# Only add TLS fields that have values
result['tls'] = {k: v for k, v in tls_info.items() if v}
except AttributeError:
debug(f'Failed to get SSL info for {url}')
html = (await response.text())[:1024*1024]
soup = bs4.BeautifulSoup(html, 'html.parser')
# Only add title if it exists
if soup.title and soup.title.string:
result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300]
# Only add body if it exists
if body_text := soup.get_text():
result['body'] = ' '.join(body_text.split()).rstrip('.')[:500]
# Only add favicon hash if it exists
if favicon_hash := await get_favicon_hash(session, url, html):
result['favicon_hash'] = favicon_hash
break
except Exception as e:
debug(f'Error checking {url}: {str(e)}')
result['status'] = -1
continue
return result
async def process_result(self, result):
'''
Process and output a single result
:param result: result to process
'''
formatted = format_console_output(result, self.debug_mode, self.show_fields, self.match_codes, self.exclude_codes)
if formatted:
# Write to file if specified
if self.output_file:
if (not self.match_codes or result['status'] in self.match_codes) and \
(not self.exclude_codes or result['status'] not in self.exclude_codes):
async with aiohttp.ClientSession() as session:
with open(self.output_file, 'a') as f:
json.dump(result, f, ensure_ascii=False)
f.write('\n')
# Console output
if self.jsonl_output:
print(json.dumps(result))
else:
self.processed_domains += 1
if self.show_progress:
info(f"{Colors.GRAY}[{self.processed_domains:,}]{Colors.RESET} {formatted}")
else:
info(formatted)
async def scan(self, input_source):
'''
Scan domains from a file or stdin
:param input_source: Path to file or '-' for stdin
'''
if not self.resolvers:
await self.init()
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
tasks = set()
# Process domains with concurrent limit
for domain in input_generator(input_source):
if len(tasks) >= self.concurrent_limit:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = await task
await self.process_result(result)
task = asyncio.create_task(self.check_domain(session, domain))
tasks.add(task)
# Process remaining tasks
if tasks:
done, _ = await asyncio.wait(tasks)
for task in done:
result = await task
await self.process_result(result)

115
httpz/utils.py Normal file
View File

@ -0,0 +1,115 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# httpz/utils.py
import logging
import sys
# Global for silent mode
SILENT_MODE = False
# List of user agents to randomize requests
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36 Edg/132.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.6.5 Chrome/124.0.6367.243 Electron/30.1.2 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 OPR/116.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:134.0) Gecko/20100101 Firefox/134.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.8.3 Chrome/130.0.6723.191 Electron/33.3.2 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.3 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.6613.137 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:134.0) Gecko/20100101 Firefox/134.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1.1 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64; rv:134.0) Gecko/20100101 Firefox/134.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:135.0) Gecko/20100101 Firefox/135.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.5.12 Chrome/120.0.6099.283 Electron/28.2.3 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
"Mozilla/5.0 (X11; CrOS x86_64 14541.0.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36 OPR/114.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.1 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.5 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) obsidian/1.7.7 Chrome/128.0.6613.186 Electron/32.2.5 Safari/537.36"
]
def debug(msg: str):
if not SILENT_MODE: logging.debug(msg)
def error(msg: str):
if not SILENT_MODE: logging.error(msg)
def info(msg: str):
if not SILENT_MODE: logging.info(msg)
def warning(msg: str):
if not SILENT_MODE: logging.warning(msg)
def human_size(size_bytes: int) -> str:
'''
Convert bytes to human readable string
:param size_bytes: size in bytes
'''
if not size_bytes:
return '0B'
units = ('B', 'KB', 'MB', 'GB')
size = float(size_bytes)
unit_index = 0
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
return f'{size:.1f}{units[unit_index]}'
def input_generator(input_source: str):
'''
Generator function to yield domains from file or stdin
:param input_source: file or stdin
'''
if input_source == '-' or input_source is None:
for line in sys.stdin:
if line.strip():
yield line.strip()
else:
with open(input_source, 'r') as f:
for line in f:
if line.strip():
yield line.strip()

View File

@ -1,5 +1,4 @@
aiohttp>=3.8.0
apv>=1.0.0
beautifulsoup4>=4.9.3
cryptography>=3.4.7
dnspython>=2.1.0

42
setup.py Normal file
View File

@ -0,0 +1,42 @@
#!/usr/bin/env python3
# HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
# setup.py
from setuptools import setup, find_packages
with open('README.md', 'r', encoding='utf-8') as f:
long_description = f.read()
setup(
name='httpz',
version='1.0.0',
author='acidvegas',
author_email='acid.vegas@acid.vegas',
description='Hyper-fast HTTP Scraping Tool',
long_description=long_description,
long_description_content_type='text/markdown',
url='https://github.com/acidvegas/httpz',
packages=find_packages(),
classifiers=[
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Intended Audience :: Information Technology',
'License :: OSI Approved :: ISC License (ISCL)',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Security',
],
python_requires='>=3.8',
install_requires=[
'aiohttp>=3.8.0',
'beautifulsoup4>=4.9.3',
'cryptography>=3.4.7',
'dnspython>=2.1.0',
'mmh3>=3.0.0',
],
)

View File

@ -1,18 +0,0 @@
#!/bin/bash
# Create virtual environment
python3 -m venv venv
# Activate virtual environment
source venv/bin/activate
# Upgrade pip
pip install --upgrade pip
# Install requirements
pip install -r requirements.txt
# Make the main script executable
chmod +x httpz.py
echo "Setup complete! Activate the virtual environment with: source venv/bin/activate"