REAL BAY SHIT MOTHER FUCKER

This commit is contained in:
Dionysus 2025-02-09 23:56:46 -05:00
parent 709295bd4a
commit 2f1ba983aa
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
4 changed files with 647 additions and 245 deletions

View File

@ -1,24 +1,76 @@
# HTTP-Z # HTTPZ Web Scanner
###### This is still a work in progress...stay tuned for updates!
## Information A high-performance concurrent web scanner written in Python. HTTPZ efficiently scans domains for HTTP/HTTPS services, extracting valuable information like status codes, titles, SSL certificates, and more.
This script is developed as a robust alternative to HTTPX, addressing the limitations in customizing JSON outputs and other functionalities that HTTPX lacks. It is specifically designed for asynchronous lookups on a list of domains, efficiently gathering DNS information and web content details such as page titles and body previews.
## Requirements
- [Python](https://www.python.org/downloads/)
- [aiohttp](https://pypi.org/project/aiohttp/)
- [apv](https://pypi.org/project/apv/)
- [beautifulsoup4](https://pypi.org/project/beautifulsoup4/)
- [cryptography](https://pypi.org/project/cryptography/)
- [dnspython](https://pypi.org/project/dnspython/)
- [mmh3](https://pypi.org/project/mmh3/)
- [python-dotenv](https://pypi.org/project/python-dotenv/)
- [tqdm](https://pypi.org/project/tqdm/)
## Installation
```bash
git clone https://github.com/acidvegas/httpz
cd httpz
chmod +x setup.sh
./setup.sh
```
## Usage ## Usage
| Argument | Description | ```bash
| ---------------------- | ----------------------------------------------------------- | python httpz.py domains.txt [options]
| `<input_file>` | File containing list of domains | ```
| `-c`, `--concurrency` | Number of concurrent requests |
| `-m`, `--memory_limit` | Number of results to store in memory before syncing to file |
| `-o`, `--output` | Output file |
| `-t`, `--timeout` | Timeout for HTTP requests |
| `-u`, `--user_agent` | User agent to use for HTTP requests |
| `-x`, `--proxy` | Proxy to use for HTTP requests |
| `-r`, `--retry` | Number of times to retry failed requests |
| `-v`, `--verbose` | Increase output verbosity |
| `-p`, `--preview` | Preview size in bytes for body & title *(default: 500)* |
___ ### Arguments
###### Mirrors | Argument | Long Form | Description |
[acid.vegas](https://git.acid.vegas/httpz) • [GitHub](https://github.com/acidvegas/httpz) • [GitLab](https://gitlab.com/acidvegas/httpz) • [SuperNETs](https://git.supernets.org/acidvegas/httpz) |-----------|------------------|-------------------------------------------------------------|
| `file` | - | File containing domains *(one per line)*, use `-` for stdin |
| `-d` | `--debug` | Show error states and debug information |
| `-c N` | `--concurrent N` | Number of concurrent checks *(default: 100)* |
| `-o FILE` | `--output FILE` | Output file path *(JSONL format)* |
| `-j` | `--jsonl` | Output JSON Lines format to console |
| `-all` | `--all-flags` | Enable all output flags |
### Output Field Flags
| Flag | Long Form | Description |
|--------| ---------------------|----------------------------------|
| `-sc` | `--status-code` | Show status code |
| `-ct` | `--content-type` | Show content type |
| `-ti` | `--title` | Show page title |
| `-b` | `--body` | Show body preview |
| `-i` | `--ip` | Show IP addresses |
| `-f` | `--favicon` | Show favicon hash |
| `-hr` | `--headers` | Show response headers |
| `-cl` | `--content-length` | Show content length |
| `-fr` | `--follow-redirects` | Follow redirects *(max 10)* |
| `-cn` | `--cname` | Show CNAME records |
| `-tls` | `--tls-info` | Show TLS certificate information |
### Other Options
| Option | Long Form | Description |
|-------------|-------------------------|-----------------------------------------------------|
| `-to N` | `--timeout N` | Request timeout in seconds *(default: 5)* |
| `-mc CODES` | `--match-codes CODES` | Only show specific status codes *(comma-separated)* |
| `-ec CODES` | `--exclude-codes CODES` | Exclude specific status codes *(comma-separated)* |
| `-p` | `--progress` | Show progress counter |
## Examples
Scan domains with all flags enabled and output to JSONL:
```bash
python httpz.py domains.txt -c 100 -o output.jsonl -j -all -to 10 -mc 200,301 -ec 404,500 -p
```
Scan domains from stdin:
```bash
cat domains.txt | python httpz.py - -c 100 -o output.jsonl -j -all -to 10 -mc 200,301 -ec 404,500 -p
```

770
httpz.py
View File

@ -1,282 +1,608 @@
#!/usr/bin/env python #!/usr/bin/env python3
# HTTPZ Crawler - Developed by acidvegas in Python (https://git.acid.vegas/httpz) # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
''' '''
BCUZ FUCK HTTPX PYTHON STILL GO HARD BCUZ FUCK PROJECT DISCOVERY PYTHON STILL GO HARD
REAL BAY SHIT FOR REAL BAY MOTHER FUCKERS
''' '''
import argparse import argparse
import asyncio import asyncio
import itertools
import json import json
import random
import re
import logging import logging
import ssl from pathlib import Path
import urllib.request import sys
try: try:
import aiodns import aiohttp
except ImportError: except ImportError:
print('Missing required module \'aiodns\'. (pip install aiodns)') raise ImportError('missing \'aiohttp\' library (pip install aiohttp)')
exit(1)
try: try:
import aiohttp import apv
except ImportError: except ImportError:
print('Missing required module \'aiohttp\'. (pip install aiohttp)') raise ImportError('missing \'apv\' library (pip install apv)')
exit(1)
# ANSI escape codes for colors try:
BLUE = '\033[34m' import bs4
CYAN = '\033[36m' except ImportError:
RED = '\033[91m' raise ImportError('missing \'bs4\' library (pip install beautifulsoup4)')
GREEN = '\033[92m'
DARK_GREY = '\033[90m'
YELLOW = '\033[93m'
RESET = '\033[0m'
# Globals try:
DNS_SERVERS = None from cryptography import x509
args = None # Global args variable from cryptography.hazmat.primitives import hashes
from cryptography.x509.oid import NameOID
except ImportError:
raise ImportError('missing \'cryptography\' library (pip install cryptography)')
def vlog(msg: str): try:
''' import dns.asyncresolver
Verbose logging only if enabled except ImportError:
raise ImportError('missing \'dns\' library (pip install dnspython)')
:param msg: Message to print to console try:
''' import mmh3
if args.verbose: except ImportError:
logging.info(msg) raise ImportError('missing \'mmh3\' library (pip install mmh3)')
def create_session(user_agent: str, timeout: int, proxy: str = None) -> dict: class Colors:
''' '''ANSI color codes for terminal output'''
Create a custom aiohttp session
:param user_agent: User agent to use for HTTP requests HEADER = '\033[95m' # Light purple
:param timeout: Timeout for HTTP requests BLUE = '\033[94m'
''' GREEN = '\033[92m'
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) YELLOW = '\033[93m'
ssl_context.check_hostname = False RED = '\033[91m'
ssl_context.verify_mode = ssl.CERT_NONE BOLD = '\033[1m'
UNDERLINE = '\033[4m'
headers = {'User-Agent': user_agent} RESET = '\033[0m'
connector = aiohttp.TCPConnector(ssl=ssl_context) PURPLE = '\033[35m' # Dark purple
LIGHT_RED = '\033[38;5;203m' # Light red
session_params = { DARK_GREEN = '\033[38;5;22m' # Dark green
'connector': connector, PINK = '\033[38;5;198m' # Bright pink
'headers': headers,
'timeout': aiohttp.ClientTimeout(total=timeout)
}
return session_params
def get_dns_servers() -> dict: async def resolve_dns(domain: str) -> tuple:
'''Get a list of DNS servers to use for lookups.''' '''
with urllib.request.urlopen('https://public-dns.info/nameservers.txt') as source: Resolve A, AAAA, and CNAME records for a domain
results = source.read().decode().split('\n')
v4_servers = [server for server in results if ':' not in server] :param domain: domain to resolve
v6_servers = [server for server in results if ':' in server] :return: tuple of (ips, cname)
'''
return {'4': v4_servers, '6': v6_servers} resolver = dns.asyncresolver.Resolver()
ips = []
cname = None
try:
# Check for CNAME first
cname_result = await resolver.resolve(domain, 'CNAME')
cname = str(cname_result[0].target).rstrip('.')
except Exception:
pass
try:
# Query A records
a_result = await resolver.resolve(domain, 'A')
ips.extend(str(ip) for ip in a_result)
except Exception as e:
logging.debug(f'Error resolving A records for {domain}: {str(e)}')
try:
# Query AAAA records
aaaa_result = await resolver.resolve(domain, 'AAAA')
ips.extend(str(ip) for ip in aaaa_result)
except Exception as e:
logging.debug(f'Error resolving AAAA records for {domain}: {str(e)}')
return sorted(set(ips)), cname
async def dns_lookup(domain: str, record_type: str, timeout: int, retry: int) -> list: async def get_favicon_hash(session: aiohttp.ClientSession, base_url: str, html: str) -> str:
''' '''
Resolve DNS information from a domain Get favicon hash from a webpage
:param domain: Domain name to resolve :param session: aiohttp client session
:param record_type: DNS record type to resolve :param base_url: base URL of the website
:param timeout: Timeout for DNS request :param html: HTML content of the page
:param retry: Number of times to retry failed requests '''
'''
for i in range(retry): try:
try: soup = bs4.BeautifulSoup(html, 'html.parser')
version = '4' if record_type == 'A' else '6' if record_type == 'AAAA' else random.choice(['4','6'])
nameserver = random.choice(DNS_SERVERS[version]) # Try to find favicon in link tags
resolver = aiodns.DNSResolver(nameservers=[nameserver], timeout=timeout) favicon_url = None
records = await resolver.query(domain, record_type) for link in soup.find_all('link'):
return records.cname if record_type == 'CNAME' else [record.host for record in records] if link.get('rel') and any(x.lower() == 'icon' for x in link.get('rel')):
except Exception as e: favicon_url = link.get('href')
vlog(f'{RED}[ERROR]{RESET} {domain} - Failed to resolve {record_type} record using {nameserver} {DARK_GREY}({str(e)}){RESET}') break
return []
if not favicon_url:
# Try default location
favicon_url = '/favicon.ico'
# Handle relative URLs
if favicon_url.startswith('//'):
favicon_url = 'https:' + favicon_url
elif favicon_url.startswith('/'):
favicon_url = base_url + favicon_url
elif not favicon_url.startswith(('http://', 'https://')):
favicon_url = base_url + '/' + favicon_url
async with session.get(favicon_url, timeout=10) as response:
if response.status == 200:
content = await response.read()
if len(content) <= 1024*1024: # Check if favicon is <= 1MB
hash_value = mmh3.hash64(content)[0]
# Only return hash if it's not 0 (likely invalid favicon)
if hash_value != 0:
return str(hash_value)
except Exception as e:
logging.debug(f'Error getting favicon for {base_url}: {str(e)}')
return None
async def get_body(source: str, preview: int) -> str: async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict:
''' '''
Get the body of a webpage Get SSL certificate information for a domain
:param source: HTML source of the webpage :param session: aiohttp client session
:param preview: Number of bytes to preview :param url: URL to check
''' '''
body_content = re.search(r'<body.*?>(.*?)</body>', source[:5000], re.DOTALL | re.IGNORECASE)
processed_content = body_content.group(1) if body_content else source try:
clean_content = re.sub(r'<[^>]+>', '', processed_content) async with session.get(url, timeout=10) as response:
return clean_content[:preview] # Get the SSL context from the connection
ssl_object = response.connection.transport.get_extra_info('ssl_object')
if not ssl_object:
return None
cert_bin = ssl_object.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(cert_bin)
# Get certificate details
cert_info = {
'fingerprint': cert.fingerprint(hashes.SHA256()).hex(),
'subject': cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
'issuer': cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
'alt_names': [],
'not_before': cert.not_valid_before_utc.isoformat(),
'not_after': cert.not_valid_after_utc.isoformat()
}
# Get Subject Alternative Names
try:
ext = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
cert_info['alt_names'] = [name.value for name in ext.value]
except x509.ExtensionNotFound:
pass
return cert_info
except Exception as e:
logging.debug(f'Error getting certificate info for {url}: {str(e)}')
return None
async def get_title(session: aiohttp.ClientSession, domain: str): async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redirects: bool = False, timeout: int = 5) -> dict:
''' '''
Get the title of a webpage and its status code Check a single domain for its status code, title, and body preview
:param session: aiohttp session :param session: aiohttp client session
:param domain: URL to get the title of :param domain: domain to check
''' :param follow_redirects: whether to follow redirects
title = None :param timeout: timeout in seconds
body = None '''
status_code = None
try: if not domain.startswith(('http://', 'https://')):
async with session.get(domain, timeout=args.timeout, allow_redirects=False) as response: protocols = ['https://', 'http://']
status_code = response.status base_domain = domain
if status_code in (200, 201): else:
html_content = await response.text() protocols = [domain]
match = re.search(r'<title>(.*?)</title>', html_content, re.IGNORECASE | re.DOTALL) base_domain = domain.split('://')[-1].split('/')[0]
title = match.group(1).strip() if match else None
title = bytes(title, 'utf-8').decode('unicode_escape') if title else None result = {
title = re.sub(r'[\r\n]+', ' ', title)[:300] if title else None # Fix this ugly shit 'domain' : base_domain,
body = await get_body(html_content, args.preview) 'status' : 0,
body = re.sub(r'\s+', ' ', body).strip() if body else None 'title' : None,
elif status_code in (301, 302, 303, 307, 308) and args.retry > 0: # Need to implement a max redirect limit 'body' : None,
redirect_url = response.headers.get('Location') 'content_type' : None,
if redirect_url: 'url' : f"https://{base_domain}" if base_domain else domain,
vlog(f'{YELLOW}[WARN]{RESET} {domain} -> {redirect_url} {DARK_GREY}({status_code}){RESET}') 'ips' : [],
return await get_title(session, redirect_url) 'cname' : None,
else: 'favicon_hash' : None,
vlog(f'{RED}[ERROR]{RESET} No redirect URL found for {domain} {DARK_GREY}({status_code}){RESET}') 'headers' : {},
else: 'content_length' : None,
vlog(f'{RED}[ERROR]{RESET} {domain} - Invalid status code {DARK_GREY}{status_code}{RESET}') 'redirect_chain' : [],
except asyncio.TimeoutError: 'tls' : None
vlog(f'{RED}[ERROR]{RESET} {domain} - HTTP request timed out') }
except Exception as e:
vlog(f'{RED}[ERROR]{RESET} Failed to get title for {domain} {DARK_GREY}({e}){RESET}') # Resolve DNS records
return title, body, status_code # Fix this ugly shit result['ips'], result['cname'] = await resolve_dns(base_domain)
for protocol in protocols:
url = f'{protocol}{base_domain}'
try:
max_redirects = 10 if follow_redirects else 0
async with session.get(url, timeout=timeout, allow_redirects=follow_redirects, max_redirects=max_redirects) as response:
result['status'] = response.status
result['url'] = str(response.url)
result['headers'] = dict(response.headers)
result['content_type'] = response.headers.get('content-type', '').split(';')[0]
result['content_length'] = response.headers.get('content-length')
# Track redirect chain
if follow_redirects:
result['redirect_chain'] = [str(h.url) for h in response.history]
if result['redirect_chain']:
result['redirect_chain'].append(str(response.url))
# Get TLS info if HTTPS
if url.startswith('https://'):
result['tls'] = await get_cert_info(session, url)
if response.status == 200:
html = (await response.text())[:1024*1024]
soup = bs4.BeautifulSoup(html, 'html.parser')
if soup.title:
title = soup.title.string.strip() if soup.title.string else ''
result['title'] = title[:300]
if soup.get_text():
body = ' '.join(soup.get_text().split()[:50])
result['body'] = body[:500] # Changed from preview
result['favicon_hash'] = await get_favicon_hash(session, url, html)
break
except Exception as e:
logging.debug(f'Error checking {url}: {str(e)}')
result['status'] = -1
continue
return result
async def check_url(session: aiohttp.ClientSession, domain: str): def domain_generator(input_source: str = None):
''' '''
Process a domain name Generator function to yield domains from file or stdin
:param session: aiohttp session :param input_source: path to file containing domains, or None for stdin
:param domain: URL to get the title of '''
''' if input_source == '-' or input_source is None:
dns_records = {} for line in sys.stdin:
if line.strip():
for record_type in ('A', 'AAAA'): yield line.strip()
records = await dns_lookup(domain, record_type, args.timeout, args.retry) else:
if records: with open(input_source, 'r') as f:
dns_records[record_type] = records for line in f:
if not dns_records: if line.strip():
cname_record = await dns_lookup(domain, 'CNAME', args.timeout, args.retry) yield line.strip()
if cname_record:
dns_records['CNAME'] = cname_record
domain = cname_record
else:
vlog(f'{RED}[ERROR]{RESET} No DNS records found for {domain}')
return domain, None, None, None, None, None
title, body, status_code = await get_title(session, f'https://{domain}')
if not title and not body:
title, body, status_code = await get_title(session, f'http://{domain}')
if title or body:
if status_code in (200, 201):
status_code = f'[{GREEN}200{RESET}]'
elif status_code in (301, 302, 303, 307, 308):
status_code = f'[{YELLOW}{status_code}{RESET}]'
logging.info(f'{domain} {status_code} [{CYAN}{title}{RESET}] - [{BLUE}{body}{RESET}]')
return domain, 'https', title, body, dns_records, status_code
else:
vlog(f'{RED}[ERROR]{RESET} {domain} - Failed to retrieve title')
return domain, None, None, None, None, status_code
async def process_file(): def human_size(size_bytes: int) -> str:
''' '''
Process a list of domains from file Convert bytes to human readable string
'''
session_params = create_session(args.user_agent, args.timeout, args.proxy) :param size_bytes: Size in bytes
'''
async with aiohttp.ClientSession(**session_params) as session: if not size_bytes:
tasks = set() return '0B'
with open(args.file, 'r') as file:
for line in file:
domain = line.strip()
if domain:
tasks.add(asyncio.create_task(check_url(session, domain)))
if len(tasks) >= args.concurrency: # Should be a better way to do this units = ('B', 'KB', 'MB', 'GB')
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) size = float(size_bytes)
unit_index = 0
for task in done: while size >= 1024 and unit_index < len(units) - 1:
domain, protocol, title, body, dns_records, status_code = task.result() size /= 1024
if title or body or dns_records: unit_index += 1
write_result_to_file(domain, protocol, title, body, dns_records, status_code)
return f"{size:.1f}{units[unit_index]}"
if tasks: def parse_status_codes(codes_str: str) -> set:
done, _ = await asyncio.wait(tasks) '''
for task in done: Parse comma-separated status codes into a set of integers
domain, protocol, title, body, dns_records, status_code = task.result()
if title: :param codes_str: Comma-separated status codes
write_result_to_file(domain, protocol, title, body, dns_records, status_code) '''
try:
return {int(code.strip()) for code in codes_str.split(',')}
except ValueError:
raise argparse.ArgumentTypeError('Status codes must be comma-separated numbers (e.g., 200,301,404)')
def format_status_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
'''
Format the output with colored sections
:param result: Dictionary containing domain check results
:param debug: Whether to show error states
:param show_fields: Dictionary of fields to show
:param match_codes: Set of status codes to match
:param exclude_codes: Set of status codes to exclude
'''
# Skip errors unless in debug mode
if result['status'] < 0 and not debug:
return ''
# Skip if status code doesn't match filters
if match_codes and result['status'] not in match_codes:
return ''
if exclude_codes and result['status'] in exclude_codes:
return ''
parts = []
# Status code
if show_fields['status_code']:
if result['status'] < 0:
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
elif 200 <= result['status'] < 300:
status = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
elif 300 <= result['status'] < 400:
status = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
else: # 400+ and 500+ codes
status = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
parts.append(status)
# Domain (always shown)
parts.append(f"[{result['url']}]")
# Title
if show_fields['title'] and result['title']:
parts.append(f"{Colors.DARK_GREEN}[{result['title']}]{Colors.RESET}")
# Body
if show_fields['body'] and result['body']:
body = result['body'][:100] + ('...' if len(result['body']) > 100 else '')
parts.append(f"{Colors.BLUE}[{body}]{Colors.RESET}")
# IPs
if show_fields['ip'] and result['ips']:
ips_text = ', '.join(result['ips'])
parts.append(f"{Colors.YELLOW}[{ips_text}]{Colors.RESET}")
# Favicon hash
if show_fields['favicon'] and result['favicon_hash']:
parts.append(f"{Colors.PURPLE}[{result['favicon_hash']}]{Colors.RESET}")
# Headers (includes content-type and content-length)
if show_fields['headers'] and result['headers']:
headers_text = []
for k, v in result['headers'].items():
headers_text.append(f"{k}: {v}")
parts.append(f"{Colors.LIGHT_RED}[{', '.join(headers_text)}]{Colors.RESET}")
else:
# Only show content-type and content-length if headers aren't shown
if show_fields['content_type'] and result['content_type']:
parts.append(f"{Colors.HEADER}[{result['content_type']}]{Colors.RESET}")
if show_fields['content_length'] and result['content_length']:
try:
size = human_size(int(result['content_length']))
parts.append(f"{Colors.PINK}[{size}]{Colors.RESET}")
except (ValueError, TypeError):
parts.append(f"{Colors.PINK}[{result['content_length']}]{Colors.RESET}")
# CNAME
if show_fields['cname'] and result['cname']:
parts.append(f"{Colors.PURPLE}[CNAME: {result['cname']}]{Colors.RESET}")
# Redirect Chain
if show_fields['follow_redirects'] and result['redirect_chain']:
chain = ' -> '.join(result['redirect_chain'])
parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}")
# TLS Certificate Info
if show_fields['tls'] and result['tls']:
cert = result['tls']
tls_parts = []
tls_parts.append(f"Fingerprint: {cert['fingerprint']}")
tls_parts.append(f"Subject: {cert['subject']}")
tls_parts.append(f"Issuer: {cert['issuer']}")
if cert['alt_names']:
tls_parts.append(f"SANs: {', '.join(cert['alt_names'])}")
tls_parts.append(f"Valid: {cert['not_before']} to {cert['not_after']}")
parts.append(f"{Colors.GREEN}[{' | '.join(tls_parts)}]{Colors.RESET}")
return ' '.join(parts)
def write_result_to_file(domain, protocol, title, body, dns_records, status_code): def count_domains(input_source: str = None) -> int:
''' '''
Write a single domain result to file Count total number of domains from file or stdin
:param domain: Domain name :param input_source: path to file containing domains, or None for stdin
:param protocol: Protocol used (http or https) '''
:param title: Title of the domain if input_source == '-' or input_source is None:
:param dns_records: DNS records of the domain # Can't count lines from stdin without consuming them
:param status_code: HTTP status code return 0
''' else:
result = { with open(input_source, 'r') as f:
'domain': domain, return sum(1 for line in f if line.strip())
'protocol': protocol,
'status_code': status_code,
'title': title, async def process_domains(input_source: str = None, debug: bool = False, concurrent_limit: int = 100, show_fields: dict = None, output_file: str = None, jsonl: bool = None, timeout: int = 5, match_codes: set = None, exclude_codes: set = None, show_progress: bool = False):
'body': body, '''
'dns_records': dns_records Process domains from a file or stdin with concurrent requests
}
with open(args.output, 'a') as f: :param input_source: path to file containing domains, or None for stdin
json.dump(result, f) :param debug: Whether to show error states
f.write('\n') :param concurrent_limit: maximum number of concurrent requests
:param show_fields: Dictionary of fields to show
:param output_file: Path to output file (JSONL format)
:param timeout: Request timeout in seconds
:param match_codes: Set of status codes to match
:param exclude_codes: Set of status codes to exclude
:param show_progress: Whether to show progress counter
'''
if input_source and input_source != '-' and not Path(input_source).exists():
raise FileNotFoundError(f'Domain file not found: {input_source}')
# Get total domain count if showing progress (only works for files)
total_domains = count_domains(input_source) if show_progress else 0
processed_domains = 0
# Clear the output file if specified
if output_file:
open(output_file, 'w').close()
tasks = set()
async def write_result(result: dict):
'''Write a single result to the output file'''
nonlocal processed_domains
# Create JSON output dict
output_dict = {
'url': result['url'],
'domain': result['domain'],
'status': result['status']
}
# Add optional fields if they exist
if result['title']:
output_dict['title'] = result['title']
if result['body']:
output_dict['body'] = result['body']
if result['ips']:
output_dict['ips'] = result['ips']
if result['favicon_hash']:
output_dict['favicon_hash'] = result['favicon_hash']
if result['headers']:
output_dict['headers'] = result['headers']
if result['cname']:
output_dict['cname'] = result['cname']
if result['redirect_chain']:
output_dict['redirect_chain'] = result['redirect_chain']
if result['tls']:
output_dict['tls'] = result['tls']
# Get formatted output based on filters
formatted = format_status_output(result, debug, show_fields, match_codes, exclude_codes)
if formatted:
# Write to file if specified
if output_file:
if (not match_codes or result['status'] in match_codes) and \
(not exclude_codes or result['status'] not in exclude_codes):
with open(output_file, 'a') as f:
json.dump(output_dict, f, ensure_ascii=False)
f.write('\n')
# Console output
if jsonl:
# Pure JSON Lines output without any logging prefixes
print(json.dumps(output_dict))
else:
if show_progress:
processed_domains += 1
logging.info(f"{Colors.BOLD}[{processed_domains}/{total_domains}]{Colors.RESET} {formatted}")
else:
logging.info(formatted)
async with aiohttp.ClientSession() as session:
# Start initial batch of tasks
for domain in itertools.islice(domain_generator(input_source), concurrent_limit):
task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout))
tasks.add(task)
# Process remaining domains, maintaining concurrent_limit active tasks
domains_iter = domain_generator(input_source)
next(itertools.islice(domains_iter, concurrent_limit, concurrent_limit), None) # Skip first concurrent_limit domains
for domain in domains_iter:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
tasks = pending
for task in done:
result = await task
await write_result(result)
task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout))
tasks.add(task)
# Wait for remaining tasks
if tasks:
done, _ = await asyncio.wait(tasks)
for task in done:
result = await task
await write_result(result)
def main(): def main():
global DNS_SERVERS, args '''Main function to handle command line arguments and run the domain checker'''
parser = argparse.ArgumentParser(description=f'{Colors.HEADER}Concurrent domain checker{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks')
parser.add_argument('-o', '--output', help='Output file path (JSONL format)')
parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console')
parser = argparse.ArgumentParser(description='Check URLs from a file asynchronously, perform DNS lookups and store results in JSON.') # Add all-flags argument
parser.add_argument('file', help='File containing list of domains') parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags')
parser.add_argument('-c', '--concurrency', type=int, default=10, help='Number of concurrent requests')
parser.add_argument('-m', '--memory_limit', type=int, default=1000, help='Number of results to store in memory before syncing to file')
parser.add_argument('-o', '--output', default='results.json', help='Output file')
parser.add_argument('-t', '--timeout', type=int, default=10, help='Timeout for HTTP requests')
parser.add_argument('-u', '--user_agent', default='Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)', help='User agent to use for HTTP requests')
parser.add_argument('-x', '--proxy', type=str, help='Proxy to use for HTTP requests')
parser.add_argument('-r', '--retry', type=int, default=2, help='Number of times to retry failed requests')
parser.add_argument('-v', '--verbose', action='store_true', help='Increase output verbosity')
parser.add_argument('-p', '--preview', type=int, default=500, help='Preview size in bytes for body & title (default: 500)')
args = parser.parse_args()
log_level = logging.INFO # Output field flags
logging.basicConfig(level=log_level, format=f'{DARK_GREY}%(asctime)s{RESET} %(message)s', datefmt='%H:%M:%S') parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code')
parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type')
parser.add_argument('-ti', '--title', action='store_true', help='Show page title')
parser.add_argument('-b', '--body', action='store_true', help='Show body preview')
parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses')
parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash')
parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers')
parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records')
parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information')
logging.info('Loading DNS servers...') # Other arguments
DNS_SERVERS = get_dns_servers() parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds')
if not DNS_SERVERS: parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)')
logging.fatal('Failed to get DNS servers.') parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)')
logging.info(f'Found {len(DNS_SERVERS["4"])} IPv4 and {len(DNS_SERVERS["6"])} IPv6 DNS servers.') parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter')
args = parser.parse_args()
# Only setup logging if we're not in JSONL mode
if not args.jsonl:
apv.setup_logging(level='DEBUG' if args.debug else 'INFO')
logging.info(f'{Colors.BOLD}Starting domain checker...{Colors.RESET}')
if args.file == '-':
logging.info('Reading domains from stdin')
else:
logging.info(f'Processing file: {Colors.UNDERLINE}{args.file}{Colors.RESET}')
logging.info(f'Concurrent checks: {args.concurrent}')
show_fields = {
'status_code' : args.all_flags or args.status_code,
'content_type' : args.all_flags or args.content_type,
'title' : args.all_flags or args.title,
'body' : args.all_flags or args.body,
'ip' : args.all_flags or args.ip,
'favicon' : args.all_flags or args.favicon,
'headers' : args.all_flags or args.headers,
'content_length' : args.all_flags or args.content_length,
'follow_redirects' : args.all_flags or args.follow_redirects,
'cname' : args.all_flags or args.cname,
'tls' : args.all_flags or args.tls_info
}
# If no fields specified and no -all flag, show all (maintain existing behavior)
if not any(show_fields.values()):
show_fields = {k: True for k in show_fields}
try:
asyncio.run(process_domains(args.file, args.debug, args.concurrent, show_fields, args.output, args.jsonl, args.timeout, args.match_codes, args.exclude_codes, args.progress))
except KeyboardInterrupt:
if not args.jsonl:
logging.warning(f'{Colors.YELLOW}Process interrupted by user{Colors.RESET}')
sys.exit(1)
except Exception as e:
if not args.jsonl:
logging.error(f'{Colors.RED}An error occurred: {str(e)}{Colors.RESET}')
sys.exit(1)
asyncio.run(process_file())
if __name__ == '__main__': if __name__ == '__main__':
main() main()

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
aiohttp>=3.8.0
apv>=1.0.0
beautifulsoup4>=4.9.3
cryptography>=3.4.7
dnspython>=2.1.0
mmh3>=3.0.0

18
setup.sh Normal file
View File

@ -0,0 +1,18 @@
#!/bin/bash
# Create virtual environment
python3 -m venv venv
# Activate virtual environment
source venv/bin/activate
# Upgrade pip
pip install --upgrade pip
# Install requirements
pip install -r requirements.txt
# Make the main script executable
chmod +x httpz.py
echo "Setup complete! Activate the virtual environment with: source venv/bin/activate"