Code cleanup

This commit is contained in:
Dionysus 2025-02-11 00:03:28 -05:00
parent 2698bb5bc0
commit a006a1dac4
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE

178
httpz.py
View File

@ -69,15 +69,15 @@ class Colors:
GRAY = '\033[90m' # Gray color
CYAN = '\033[96m' # Cyan color
_SILENT_MODE = False
# Global for silent mode
SILENT_MODE = False
def debug(msg: str):
if not _SILENT_MODE: logging.debug(msg)
if not SILENT_MODE: logging.debug(msg)
def error(msg: str):
if not _SILENT_MODE: logging.error(msg)
if not SILENT_MODE: logging.error(msg)
def info(msg: str):
if not _SILENT_MODE: logging.info(msg)
if not SILENT_MODE: logging.info(msg)
async def get_cert_info(ssl_object, url: str) -> dict:
@ -130,7 +130,7 @@ async def get_cert_info(ssl_object, url: str) -> dict:
'serial_number' : format(cert.serial_number, 'x'),
}
except Exception as e:
debug(f'Error getting cert info for {url}: {str(e)}')
error(f'Error getting cert info for {url}: {str(e)}')
return None
@ -239,7 +239,7 @@ async def load_resolvers(resolver_file: str = None) -> list:
async with aiohttp.ClientSession() as session:
async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
resolvers = await response.text()
if not _SILENT_MODE:
if not SILENT_MODE:
info(f'Loaded {len(resolvers.splitlines()):,} resolvers.')
return [resolver.strip() for resolver in resolvers.splitlines()]
@ -302,6 +302,43 @@ async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None,
return sorted(set(ips)), cname, nameservers, ns_ips
def parse_domain_url(domain: str) -> tuple:
'''
Parse domain string into base domain, port, and protocol list
:param domain: Raw domain string to parse
:return: Tuple of (base_domain, port, protocols)
'''
port = None
base_domain = domain.rstrip('/')
if base_domain.startswith(('http://', 'https://')):
protocol = 'https://' if base_domain.startswith('https://') else 'http://'
base_domain = base_domain.split('://', 1)[1]
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
try:
port = int(port_str.split('/')[0])
except ValueError:
port = 443 if protocol == 'https://' else 80
else:
port = 443 if protocol == 'https://' else 80
protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}']
else:
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else 443
else:
port = 443
protocols = [
f'https://{base_domain}{":" + str(port) if port else ""}',
f'http://{base_domain}{":" + str(port) if port else ""}'
]
return base_domain, port, protocols
async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redirects: bool = False, timeout: int = 5, check_axfr: bool = False, resolvers: list = None) -> dict:
'''
Check a single domain for its status code, title, and body preview
@ -313,44 +350,10 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
:param check_axfr: whether to check for AXFR
:param resolvers: list of DNS resolvers to use
'''
# Pick random resolver for this domain
nameserver = random.choice(resolvers) if resolvers else None
base_domain, port, protocols = parse_domain_url(domain)
# Parse domain and port
port = None
base_domain = domain.rstrip('/')
# Handle URLs with existing protocol
if base_domain.startswith(('http://', 'https://')):
protocol = 'https://' if base_domain.startswith('https://') else 'http://'
base_domain = base_domain.split('://', 1)[1]
# Try to extract port from domain
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
try:
port = int(port_str.split('/')[0])
except ValueError:
port = 443 if protocol == 'https://' else 80
else:
port = 443 if protocol == 'https://' else 80
protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}']
else:
# No protocol specified - try HTTPS first, then HTTP
if ':' in base_domain.split('/')[0]:
base_domain, port_str = base_domain.split(':', 1)
try:
port = int(port_str.split('/')[0])
except ValueError:
port = 443 # Default to HTTPS port
else:
port = 443 # Default to HTTPS port when no port specified
protocols = [
f'https://{base_domain}{":" + str(port) if port else ""}',
f'http://{base_domain}{":" + str(port) if port else ""}'
]
result = {} # Start with empty dict
base_result = {
result = {
'domain' : base_domain,
'status' : 0,
'title' : None,
@ -367,37 +370,26 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
'redirect_chain' : [],
'tls' : None
}
result.update(base_result) # Update result with base fields
# Do all DNS lookups at once
ips, cname, nameservers, ns_ips = await resolve_all_dns(base_domain, timeout, nameserver, check_axfr)
result['ips'] = ips
result['cname'] = cname
result['nameservers'] = nameservers
# Do DNS lookups
result['ips'], result['cname'], result['nameservers'], _ = await resolve_all_dns(base_domain, timeout, nameserver, check_axfr)
# Try each protocol
for url in protocols:
try:
max_redirects = 10 if follow_redirects else 0
async with session.get(url, timeout=timeout, allow_redirects=follow_redirects, max_redirects=max_redirects) as response:
result['status'] = response.status
result['url'] = str(response.url)
result['headers'] = dict(response.headers)
result['content_type'] = response.headers.get('content-type', '').split(';')[0]
result['content_length'] = response.headers.get('content-length')
async with session.get(url, timeout=timeout, allow_redirects=follow_redirects, max_redirects=10 if follow_redirects else 0) as response:
result.update({
'status' : response.status,
'url' : str(response.url),
'headers' : dict(response.headers),
'content_type' : response.headers.get('content-type', '').split(';')[0],
'content_length' : response.headers.get('content-length'),
'redirect_chain' : [str(h.url) for h in response.history] + [str(response.url)] if follow_redirects and response.history else []
})
# Track redirect chain
if follow_redirects:
result['redirect_chain'] = [str(h.url) for h in response.history]
if result['redirect_chain']:
result['redirect_chain'].append(str(response.url))
# Try to get cert info for any successful HTTPS connection
if response.url.scheme == 'https':
try:
ssl_object = response._protocol.transport.get_extra_info('ssl_object')
if ssl_object: # Only get cert info if we have a valid SSL object
if ssl_object := response._protocol.transport.get_extra_info('ssl_object'):
result['tls'] = await get_cert_info(ssl_object, str(response.url))
except AttributeError:
debug(f'Failed to get SSL info for {url}')
@ -405,27 +397,21 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
if response.status == 200:
html = (await response.text())[:1024*1024]
soup = bs4.BeautifulSoup(html, 'html.parser')
if soup.title:
title = ' '.join(soup.title.string.strip().split()).rstrip('.') if soup.title.string else ''
result['title'] = title[:300]
if soup.get_text():
body = ' '.join(soup.get_text().split()).rstrip('.')
result['body'] = body[:500]
result['favicon_hash'] = await get_favicon_hash(session, url, html)
result.update({
'title' : ' '.join(soup.title.string.strip().split()).rstrip('.')[:300] if soup.title and soup.title.string else None,
'body' : ' '.join(soup.get_text().split()).rstrip('.')[:500] if soup.get_text() else None,
'favicon_hash' : await get_favicon_hash(session, url, html)
})
break
except Exception as e:
debug(f'Error checking {url}: {str(e)}')
result['status'] = -1
continue
# Make absolutely sure port is in the result before returning
if 'port' not in result:
result['port'] = port
return result
def format_status_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str:
'''
Format the output with colored sections
@ -568,32 +554,24 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
:param resolver_file: Path to file containing DNS resolvers
'''
# Check if input file exists
if input_source and input_source != '-' and not os.path.exists(input_source):
raise FileNotFoundError(f'Domain file not found: {input_source}')
# Clear the output file if specified
if output_file:
open(output_file, 'w').close()
# Initialize tasks and processed domains
tasks = set()
processed_domains = 0 # Simple counter for all processed domains
processed_domains = 0
# Load resolvers - await the coroutine
resolvers = await load_resolvers(resolver_file)
async def write_result(result: dict):
'''Write a single result to the output file'''
nonlocal processed_domains
# Create JSON output dict with required fields
output_dict = {
'url' : result['url'],
'domain' : result['domain'],
'status' : result['status'],
'port' : result['port']
}
output_dict = {'url': result['url'], 'domain': result['domain'], 'status': result['status'], 'port': result['port']}
# Add optional fields if they exist
if result['title']:
@ -616,7 +594,8 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
output_dict['nameservers'] = result['nameservers']
# Get formatted output based on filters
formatted = format_status_output(result, debug, show_fields, match_codes, exclude_codes)
formatted = format_console_output(result, debug, show_fields, match_codes, exclude_codes)
if formatted:
# Write to file if specified
if output_file:
@ -665,7 +644,7 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
def main():
'''Main function to handle command line arguments and run the domain checker'''
global _SILENT_MODE
global SILENT_MODE
# Setup argument parser
parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter)
@ -699,16 +678,10 @@ def main():
parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)')
parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds')
# Parse arguments
args = parser.parse_args()
# Set silent mode based on jsonl argument
_SILENT_MODE = args.jsonl
# Only setup logging if we're not in silent mode
if not _SILENT_MODE:
if not (SILENT_MODE := args.jsonl):
# Setup logging
if args.debug:
apv.setup_logging(level='DEBUG', log_to_disk=True, log_file_name='havoc', show_details=True)
@ -716,8 +689,6 @@ def main():
else:
apv.setup_logging(level='INFO')
logging.info('Starting domain checker...')
if args.file == '-':
logging.info('Reading domains from stdin')
else:
@ -743,7 +714,6 @@ def main():
show_fields = {k: True for k in show_fields}
try:
# Run the domain checker
asyncio.run(process_domains(args.file, args.debug, args.concurrent, show_fields, args.output, args.jsonl, args.timeout, args.match_codes, args.exclude_codes, args.progress, check_axfr=args.axfr, resolver_file=args.resolvers))
except KeyboardInterrupt:
logging.warning('Process interrupted by user')