#!/usr/bin/env python3 # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) # httpz_scanner/scanner.py import asyncio import json import random try: import aiohttp except ImportError: raise ImportError('missing aiohttp module (pip install aiohttp)') try: import bs4 except ImportError: raise ImportError('missing bs4 module (pip install beautifulsoup4)') from .dns import resolve_all_dns, load_resolvers from .formatters import format_console_output from .colors import Colors from .parsers import parse_domain_url, get_cert_info, get_favicon_hash, parse_title from .utils import debug, info, USER_AGENTS, input_generator class HTTPZScanner: '''Core scanner class for HTTP domain checking''' def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None): ''' Initialize the HTTPZScanner class :param concurrent_limit: Maximum number of concurrent requests :param timeout: Request timeout in seconds :param follow_redirects: Follow redirects :param check_axfr: Check for AXFR :param resolver_file: Path to resolver file :param output_file: Path to output file :param show_progress: Show progress bar :param debug_mode: Enable debug mode :param jsonl_output: Output in JSONL format :param show_fields: Fields to show :param match_codes: Status codes to match :param exclude_codes: Status codes to exclude :param shard: Tuple of (shard_index, total_shards) for distributed scanning ''' self.concurrent_limit = concurrent_limit self.timeout = timeout self.follow_redirects = follow_redirects self.check_axfr = check_axfr self.resolver_file = resolver_file self.output_file = output_file self.show_progress = show_progress self.debug_mode = debug_mode self.jsonl_output = jsonl_output self.shard = shard self.show_fields = show_fields or { 'status_code' : True, 'content_type' : True, 'content_length' : True, 'title' : True, 'body' : True, 'ip' : True, 'favicon' : True, 'headers' : True, 'follow_redirects' : True, 'cname' : True, 'tls' : True } self.match_codes = match_codes self.exclude_codes = exclude_codes self.resolvers = None self.processed_domains = 0 self.progress_count = 0 async def init(self): '''Initialize resolvers - must be called before scanning''' self.resolvers = await load_resolvers(self.resolver_file) async def check_domain(self, session: aiohttp.ClientSession, domain: str): '''Check a single domain and return results''' nameserver = random.choice(self.resolvers) if self.resolvers else None base_domain, port, protocols = parse_domain_url(domain) result = { 'domain' : base_domain, 'status' : 0, 'url' : protocols[0], 'port' : port, } # Try each protocol for url in protocols: try: # Set random user agent for each request headers = {'User-Agent': random.choice(USER_AGENTS)} async with session.get(url, timeout=self.timeout, allow_redirects=self.follow_redirects, max_redirects=10 if self.follow_redirects else 0, headers=headers) as response: result['status'] = response.status # Early exit if status code doesn't match criteria if self.match_codes and result['status'] not in self.match_codes: return result if self.exclude_codes and result['status'] in self.exclude_codes: return result # Continue with full processing only if status code matches criteria result['url'] = str(response.url) # Add headers if requested headers = dict(response.headers) if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')): result['headers'] = headers else: # Only add content type/length if headers aren't included if content_type := response.headers.get('content-type', '').split(';')[0]: result['content_type'] = content_type if content_length := response.headers.get('content-length'): result['content_length'] = content_length # Only add redirect chain if it exists if self.follow_redirects and response.history: result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)] # Do DNS lookups only if we're going to use the result ips, cname, nameservers, _ = await resolve_all_dns( base_domain, self.timeout, nameserver, self.check_axfr ) # Only add DNS fields if they have values if ips: result['ips'] = ips if cname: result['cname'] = cname if nameservers: result['nameservers'] = nameservers # Only add TLS info if available if response.url.scheme == 'https': try: if ssl_object := response._protocol.transport.get_extra_info('ssl_object'): if tls_info := await get_cert_info(ssl_object, str(response.url)): # Only add TLS fields that have values result['tls'] = {k: v for k, v in tls_info.items() if v} except AttributeError: debug(f'Failed to get SSL info for {url}') content_type = response.headers.get('Content-Type', '') html = await response.text() if any(x in content_type.lower() for x in ['text/html', 'application/xhtml']) else None # Only add title if it exists if soup := bs4.BeautifulSoup(html, 'html.parser'): if soup.title and soup.title.string: result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300] # Only add body if it exists if body_text := soup.get_text(): result['body'] = ' '.join(body_text.split()).rstrip('.')[:500] # Only add favicon hash if it exists if favicon_hash := await get_favicon_hash(session, url, html): result['favicon_hash'] = favicon_hash break except Exception as e: debug(f'Error checking {url}: {str(e)}') result['status'] = -1 continue return result async def scan(self, input_source): ''' Scan domains from a file, stdin, or async generator :param input_source: Can be: - Path to file (str) - stdin ('-') - List/tuple of domains - Async generator yielding domains :yields: Result dictionary for each domain scanned ''' if not self.resolvers: await self.init() async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: tasks = set() count = 0 # Move counter here since that's all process_result was doing # Handle different input types if isinstance(input_source, str): # File or stdin input gen = input_generator(input_source, self.shard) async for domain in gen: if len(tasks) >= self.concurrent_limit: done, tasks = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) for task in done: result = await task if self.show_progress: count += 1 # Increment counter here yield result task = asyncio.create_task(self.check_domain(session, domain)) tasks.add(task) elif isinstance(input_source, (list, tuple)): # List/tuple input for line_num, domain in enumerate(input_source): if domain := str(domain).strip(): if self.shard is None or line_num % self.shard[1] == self.shard[0]: if len(tasks) >= self.concurrent_limit: done, tasks = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) for task in done: result = await task if self.show_progress: count += 1 yield result task = asyncio.create_task(self.check_domain(session, domain)) tasks.add(task) else: # Async generator input line_num = 0 async for domain in input_source: if isinstance(domain, bytes): domain = domain.decode() domain = domain.strip() if domain: if self.shard is None or line_num % self.shard[1] == self.shard[0]: if len(tasks) >= self.concurrent_limit: done, tasks = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) for task in done: result = await task if self.show_progress: count += 1 yield result task = asyncio.create_task(self.check_domain(session, domain)) tasks.add(task) line_num += 1 # Process remaining tasks if tasks: done, _ = await asyncio.wait(tasks) for task in done: result = await task if self.show_progress: count += 1 yield result