Title and body cleanup

This commit is contained in:
Dionysus 2025-02-10 00:24:28 -05:00
parent 2f1ba983aa
commit df2a309a0d
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
2 changed files with 128 additions and 53 deletions

View File

@ -1,6 +1,6 @@
ISC License ISC License
Copyright (c) 2023, acidvegas <acid.vegas@acid.vegas> Copyright (c) 2025, acidvegas <acid.vegas@acid.vegas>
Permission to use, copy, modify, and/or distribute this software for any Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above purpose with or without fee is hereby granted, provided that the above

179
httpz.py
View File

@ -13,6 +13,10 @@ import json
import logging import logging
from pathlib import Path from pathlib import Path
import sys import sys
import os
import dns.zone
import dns.query
import dns.resolver
try: try:
import aiohttp import aiohttp
@ -64,6 +68,24 @@ class Colors:
PINK = '\033[38;5;198m' # Bright pink PINK = '\033[38;5;198m' # Bright pink
_SILENT_MODE = False
def debug(msg: str) -> None:
'''Print debug message if not in silent mode'''
if not _SILENT_MODE:
logging.debug(msg)
def error(msg: str) -> None:
'''Print error message if not in silent mode'''
if not _SILENT_MODE:
logging.error(msg)
def info(msg: str) -> None:
'''Print info message if not in silent mode'''
if not _SILENT_MODE:
logging.info(msg)
async def resolve_dns(domain: str) -> tuple: async def resolve_dns(domain: str) -> tuple:
''' '''
Resolve A, AAAA, and CNAME records for a domain Resolve A, AAAA, and CNAME records for a domain
@ -88,14 +110,14 @@ async def resolve_dns(domain: str) -> tuple:
a_result = await resolver.resolve(domain, 'A') a_result = await resolver.resolve(domain, 'A')
ips.extend(str(ip) for ip in a_result) ips.extend(str(ip) for ip in a_result)
except Exception as e: except Exception as e:
logging.debug(f'Error resolving A records for {domain}: {str(e)}') debug(f'Error resolving A records for {domain}: {str(e)}')
try: try:
# Query AAAA records # Query AAAA records
aaaa_result = await resolver.resolve(domain, 'AAAA') aaaa_result = await resolver.resolve(domain, 'AAAA')
ips.extend(str(ip) for ip in aaaa_result) ips.extend(str(ip) for ip in aaaa_result)
except Exception as e: except Exception as e:
logging.debug(f'Error resolving AAAA records for {domain}: {str(e)}') debug(f'Error resolving AAAA records for {domain}: {str(e)}')
return sorted(set(ips)), cname return sorted(set(ips)), cname
@ -140,7 +162,7 @@ async def get_favicon_hash(session: aiohttp.ClientSession, base_url: str, html:
if hash_value != 0: if hash_value != 0:
return str(hash_value) return str(hash_value)
except Exception as e: except Exception as e:
logging.debug(f'Error getting favicon for {base_url}: {str(e)}') debug(f'Error getting favicon for {base_url}: {str(e)}')
return None return None
@ -161,16 +183,16 @@ async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict:
return None return None
cert_bin = ssl_object.getpeercert(binary_form=True) cert_bin = ssl_object.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(cert_bin) cert = x509.load_der_x509_certificate(cert_bin)
# Get certificate details # Get certificate details
cert_info = { cert_info = {
'fingerprint': cert.fingerprint(hashes.SHA256()).hex(), 'fingerprint' : cert.fingerprint(hashes.SHA256()).hex(),
'subject': cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, 'subject' : cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
'issuer': cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, 'issuer' : cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
'alt_names': [], 'alt_names' : [],
'not_before': cert.not_valid_before_utc.isoformat(), 'not_before' : cert.not_valid_before_utc.isoformat(),
'not_after': cert.not_valid_after_utc.isoformat() 'not_after' : cert.not_valid_after_utc.isoformat()
} }
# Get Subject Alternative Names # Get Subject Alternative Names
@ -182,11 +204,11 @@ async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict:
return cert_info return cert_info
except Exception as e: except Exception as e:
logging.debug(f'Error getting certificate info for {url}: {str(e)}') debug(f'Error getting certificate info for {url}: {str(e)}')
return None return None
async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redirects: bool = False, timeout: int = 5) -> dict: async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redirects: bool = False, timeout: int = 5, check_axfr: bool = False) -> dict:
''' '''
Check a single domain for its status code, title, and body preview Check a single domain for its status code, title, and body preview
@ -194,6 +216,7 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
:param domain: domain to check :param domain: domain to check
:param follow_redirects: whether to follow redirects :param follow_redirects: whether to follow redirects
:param timeout: timeout in seconds :param timeout: timeout in seconds
:param check_axfr: whether to check for AXFR
''' '''
if not domain.startswith(('http://', 'https://')): if not domain.startswith(('http://', 'https://')):
@ -247,27 +270,31 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
html = (await response.text())[:1024*1024] html = (await response.text())[:1024*1024]
soup = bs4.BeautifulSoup(html, 'html.parser') soup = bs4.BeautifulSoup(html, 'html.parser')
if soup.title: if soup.title:
title = soup.title.string.strip() if soup.title.string else '' title = ' '.join(soup.title.string.strip().split()) if soup.title.string else ''
result['title'] = title[:300] result['title'] = title[:300]
if soup.get_text(): if soup.get_text():
body = ' '.join(soup.get_text().split()[:50]) body = ' '.join(soup.get_text().split())
result['body'] = body[:500] # Changed from preview result['body'] = body[:500]
result['favicon_hash'] = await get_favicon_hash(session, url, html) result['favicon_hash'] = await get_favicon_hash(session, url, html)
break break
except Exception as e: except Exception as e:
logging.debug(f'Error checking {url}: {str(e)}') debug(f'Error checking {url}: {str(e)}')
result['status'] = -1 result['status'] = -1
continue continue
if check_axfr:
await try_axfr(base_domain)
return result return result
def domain_generator(input_source: str = None): def domain_generator(input_source: str):
''' '''
Generator function to yield domains from file or stdin Generator function to yield domains from file or stdin
:param input_source: path to file containing domains, or None for stdin :param input_source: path to file containing domains, or None for stdin
''' '''
if input_source == '-' or input_source is None: if input_source == '-' or input_source is None:
for line in sys.stdin: for line in sys.stdin:
if line.strip(): if line.strip():
@ -425,7 +452,7 @@ def count_domains(input_source: str = None) -> int:
return sum(1 for line in f if line.strip()) return sum(1 for line in f if line.strip())
async def process_domains(input_source: str = None, debug: bool = False, concurrent_limit: int = 100, show_fields: dict = None, output_file: str = None, jsonl: bool = None, timeout: int = 5, match_codes: set = None, exclude_codes: set = None, show_progress: bool = False): async def process_domains(input_source: str = None, debug: bool = False, concurrent_limit: int = 100, show_fields: dict = None, output_file: str = None, jsonl: bool = None, timeout: int = 5, match_codes: set = None, exclude_codes: set = None, show_progress: bool = False, check_axfr: bool = False):
''' '''
Process domains from a file or stdin with concurrent requests Process domains from a file or stdin with concurrent requests
@ -438,12 +465,14 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
:param match_codes: Set of status codes to match :param match_codes: Set of status codes to match
:param exclude_codes: Set of status codes to exclude :param exclude_codes: Set of status codes to exclude
:param show_progress: Whether to show progress counter :param show_progress: Whether to show progress counter
:param check_axfr: Whether to check for AXFR
''' '''
if input_source and input_source != '-' and not Path(input_source).exists(): if input_source and input_source != '-' and not Path(input_source).exists():
raise FileNotFoundError(f'Domain file not found: {input_source}') raise FileNotFoundError(f'Domain file not found: {input_source}')
# Get total domain count if showing progress (only works for files) # Get total domain count if showing progress (only works for files)
total_domains = count_domains(input_source) if show_progress else 0 total_domains = count_domains(input_source) if show_progress else 0
processed_domains = 0 processed_domains = 0
# Clear the output file if specified # Clear the output file if specified
@ -457,11 +486,8 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
nonlocal processed_domains nonlocal processed_domains
# Create JSON output dict # Create JSON output dict
output_dict = { output_dict = {'url': result['url'], 'domain': result['domain'], 'status': result['status']}
'url': result['url'],
'domain': result['domain'],
'status': result['status']
}
# Add optional fields if they exist # Add optional fields if they exist
if result['title']: if result['title']:
output_dict['title'] = result['title'] output_dict['title'] = result['title']
@ -485,8 +511,7 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
if formatted: if formatted:
# Write to file if specified # Write to file if specified
if output_file: if output_file:
if (not match_codes or result['status'] in match_codes) and \ if (not match_codes or result['status'] in match_codes) and (not exclude_codes or result['status'] not in exclude_codes):
(not exclude_codes or result['status'] not in exclude_codes):
with open(output_file, 'a') as f: with open(output_file, 'a') as f:
json.dump(output_dict, f, ensure_ascii=False) json.dump(output_dict, f, ensure_ascii=False)
f.write('\n') f.write('\n')
@ -498,14 +523,17 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
else: else:
if show_progress: if show_progress:
processed_domains += 1 processed_domains += 1
logging.info(f"{Colors.BOLD}[{processed_domains}/{total_domains}]{Colors.RESET} {formatted}") info(f"{Colors.BOLD}[{processed_domains}/{total_domains}]{Colors.RESET} {formatted}")
else: else:
logging.info(formatted) info(formatted)
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
# Start initial batch of tasks # Start initial batch of tasks
for domain in itertools.islice(domain_generator(input_source), concurrent_limit): for domain in itertools.islice(domain_generator(input_source), concurrent_limit):
task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout)) task = asyncio.create_task(check_domain(session, domain,
follow_redirects=show_fields['follow_redirects'],
timeout=timeout,
check_axfr=check_axfr))
tasks.add(task) tasks.add(task)
# Process remaining domains, maintaining concurrent_limit active tasks # Process remaining domains, maintaining concurrent_limit active tasks
@ -520,7 +548,10 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
result = await task result = await task
await write_result(result) await write_result(result)
task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout)) task = asyncio.create_task(check_domain(session, domain,
follow_redirects=show_fields['follow_redirects'],
timeout=timeout,
check_axfr=check_axfr))
tasks.add(task) tasks.add(task)
# Wait for remaining tasks # Wait for remaining tasks
@ -531,9 +562,51 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
await write_result(result) await write_result(result)
async def try_axfr(domain: str) -> None:
'''
Try AXFR transfer for a domain against all its nameservers
:param domain: Domain to attempt AXFR transfer
'''
try:
# Ensure output directory exists
os.makedirs('axfrout', exist_ok=True)
# Get nameservers
resolver = dns.asyncresolver.Resolver()
nameservers = await resolver.resolve(domain, 'NS')
# Try AXFR against each nameserver
for ns in nameservers:
ns_host = str(ns).rstrip('.')
try:
# Get nameserver IP
ns_ips = await resolver.resolve(ns_host, 'A')
for ns_ip in ns_ips:
ns_ip = str(ns_ip)
try:
# Attempt zone transfer
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain))
# Save successful transfer
filename = f'axfrout/{domain}_{ns_ip}.zone'
with open(filename, 'w') as f:
zone.to_text(f)
info(f'{Colors.RED}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}')
except Exception as e:
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e:
debug(f'Failed to resolve {ns_host}: {str(e)}')
except Exception as e:
debug(f'Failed to get nameservers for {domain}: {str(e)}')
def main(): def main():
'''Main function to handle command line arguments and run the domain checker''' '''Main function to handle command line arguments and run the domain checker'''
parser = argparse.ArgumentParser(description=f'{Colors.HEADER}Concurrent domain checker{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter) global _SILENT_MODE
parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin') parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin')
parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information') parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information')
parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks') parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks')
@ -544,16 +617,16 @@ def main():
parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags') parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags')
# Output field flags # Output field flags
parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code') parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code')
parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type') parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type')
parser.add_argument('-ti', '--title', action='store_true', help='Show page title') parser.add_argument('-ti', '--title', action='store_true', help='Show page title')
parser.add_argument('-b', '--body', action='store_true', help='Show body preview') parser.add_argument('-b', '--body', action='store_true', help='Show body preview')
parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses') parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses')
parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash') parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash')
parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers') parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers')
parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length') parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length')
parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)') parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)')
parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records') parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records')
parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information') parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information')
# Other arguments # Other arguments
@ -561,18 +634,22 @@ def main():
parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)') parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)')
parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)') parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)')
parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter') parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter')
parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers')
args = parser.parse_args() args = parser.parse_args()
# Only setup logging if we're not in JSONL mode # Set silent mode based on jsonl argument
if not args.jsonl: _SILENT_MODE = args.jsonl
# Only setup logging if we're not in silent mode
if not _SILENT_MODE:
apv.setup_logging(level='DEBUG' if args.debug else 'INFO') apv.setup_logging(level='DEBUG' if args.debug else 'INFO')
logging.info(f'{Colors.BOLD}Starting domain checker...{Colors.RESET}') info(f'{Colors.BOLD}Starting domain checker...{Colors.RESET}')
if args.file == '-': if args.file == '-':
logging.info('Reading domains from stdin') info('Reading domains from stdin')
else: else:
logging.info(f'Processing file: {Colors.UNDERLINE}{args.file}{Colors.RESET}') info(f'Processing file: {Colors.UNDERLINE}{args.file}{Colors.RESET}')
logging.info(f'Concurrent checks: {args.concurrent}') info(f'Concurrent checks: {args.concurrent}')
show_fields = { show_fields = {
'status_code' : args.all_flags or args.status_code, 'status_code' : args.all_flags or args.status_code,
@ -593,14 +670,12 @@ def main():
show_fields = {k: True for k in show_fields} show_fields = {k: True for k in show_fields}
try: try:
asyncio.run(process_domains(args.file, args.debug, args.concurrent, show_fields, args.output, args.jsonl, args.timeout, args.match_codes, args.exclude_codes, args.progress)) asyncio.run(process_domains(args.file, args.debug, args.concurrent, show_fields, args.output, args.jsonl, args.timeout, args.match_codes, args.exclude_codes, args.progress, check_axfr=args.axfr))
except KeyboardInterrupt: except KeyboardInterrupt:
if not args.jsonl: logging.warning(f'{Colors.YELLOW}Process interrupted by user{Colors.RESET}')
logging.warning(f'{Colors.YELLOW}Process interrupted by user{Colors.RESET}')
sys.exit(1) sys.exit(1)
except Exception as e: except Exception as e:
if not args.jsonl: logging.error(f'{Colors.RED}An error occurred: {str(e)}{Colors.RESET}')
logging.error(f'{Colors.RED}An error occurred: {str(e)}{Colors.RESET}')
sys.exit(1) sys.exit(1)