diff --git a/httpz_scanner/__init__.py b/httpz_scanner/__init__.py index c529ec1..8f04455 100644 --- a/httpz_scanner/__init__.py +++ b/httpz_scanner/__init__.py @@ -6,4 +6,4 @@ from .colors import Colors from .scanner import HTTPZScanner -__version__ = '2.1.4' \ No newline at end of file +__version__ = '2.1.5' \ No newline at end of file diff --git a/httpz_scanner/cli.py b/httpz_scanner/cli.py index f571b40..2120ed6 100644 --- a/httpz_scanner/cli.py +++ b/httpz_scanner/cli.py @@ -91,6 +91,10 @@ async def main(): # Add this to the argument parser section parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")') + # Add these arguments in the parser section + parser.add_argument('-hd', '--headers', help='Custom headers to send with each request (format: "Header1: value1,Header2: value2")') + parser.add_argument('-p', '--post', help='Send POST request with this data') + # If no arguments provided, print help and exit if len(sys.argv) == 1: parser.print_help() @@ -147,7 +151,9 @@ async def main(): match_codes=args.match_codes, exclude_codes=args.exclude_codes, shard=args.shard, - paths=args.paths.split(',') if args.paths else None + paths=args.paths.split(',') if args.paths else None, + custom_headers=dict(h.split(': ', 1) for h in args.headers.split(',')) if args.headers else None, + post_data=args.post ) count = 0 diff --git a/httpz_scanner/parsers.py b/httpz_scanner/parsers.py index c3efdbc..e04bfcb 100644 --- a/httpz_scanner/parsers.py +++ b/httpz_scanner/parsers.py @@ -42,20 +42,13 @@ def parse_domain_url(domain: str) -> tuple: try: port = int(port_str.split('/')[0]) except ValueError: - port = 443 if protocol == 'https://' else 80 - else: - port = 443 if protocol == 'https://' else 80 - protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}'] + port = None else: if ':' in base_domain.split('/')[0]: base_domain, port_str = base_domain.split(':', 1) - port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else 443 - else: - port = 443 - protocols = [ - f'https://{base_domain}{":" + str(port) if port else ""}', - f'http://{base_domain}{":" + str(port) if port else ""}' - ] + port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else None + + protocols = ['http://', 'https://'] # Always try HTTP first return base_domain, port, protocols diff --git a/httpz_scanner/scanner.py b/httpz_scanner/scanner.py index e3c20f9..8e569f2 100644 --- a/httpz_scanner/scanner.py +++ b/httpz_scanner/scanner.py @@ -5,6 +5,7 @@ import asyncio import random import urllib.parse +import json try: import aiohttp @@ -24,7 +25,7 @@ from .utils import debug, USER_AGENTS, input_generator class HTTPZScanner: '''Core scanner class for HTTP domain checking''' - def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None, paths = None): + def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None, paths = None, custom_headers=None, post_data=None): ''' Initialize the HTTPZScanner class @@ -42,6 +43,8 @@ class HTTPZScanner: :param exclude_codes: Status codes to exclude :param shard: Tuple of (shard_index, total_shards) for distributed scanning :param paths: List of additional paths to check on each domain + :param custom_headers: Dictionary of custom headers to send with each request + :param post_data: Data to send with POST requests ''' self.concurrent_limit = concurrent_limit @@ -55,6 +58,8 @@ class HTTPZScanner: self.jsonl_output = jsonl_output self.shard = shard self.paths = paths or [] + self.custom_headers = custom_headers or {} + self.post_data = post_data self.show_fields = show_fields or { 'status_code' : True, @@ -78,136 +83,109 @@ class HTTPZScanner: async def check_domain(self, session: aiohttp.ClientSession, domain: str): - ''' - Check a single domain and return results - - :param session: aiohttp.ClientSession - :param domain: str - ''' - # Parse domain + '''Check a single domain and return results''' base_domain, port, protocols = parse_domain_url(domain) - results = [] - - # For each protocol (http/https) - for base_url in protocols: + for protocol in protocols: + url = f'{protocol}{base_domain}' + if port: + url += f':{port}' + try: - # Check base URL first - if result := await self._check_url(session, base_url): - results.append(result) - - # Check additional paths - for path in self.paths: - path = path.strip('/') - url = f'{base_url}/{path}' - if result := await self._check_url(session, url): - results.append(result) - - if results: # If we got any successful results, return them - break - + debug(f'Trying {url}...') + result = await self._check_url(session, url) + debug(f'Got result for {url}: {result}') + if result and (result['status'] != 400 or result.get('redirect_chain')): # Accept redirects + return result except Exception as e: - debug(f'Error checking {base_url}: {str(e)}') + debug(f'Error checking {url}: {str(e)}') continue - - return results[0] if results else None # Return first successful result or None + + return None async def _check_url(self, session: aiohttp.ClientSession, url: str): - ''' - Check a single URL and return results - - :param session: aiohttp.ClientSession - :param url: URL to check - ''' + '''Check a single URL and return results''' try: headers = {'User-Agent': random.choice(USER_AGENTS)} + headers.update(self.custom_headers) - async with session.get(url, timeout=self.timeout, - allow_redirects=self.follow_redirects, - max_redirects=10 if self.follow_redirects else 0, - headers=headers) as response: + debug(f'Making request to {url} with headers: {headers}') + async with session.request('GET', url, + timeout=self.timeout, + allow_redirects=True, # Always follow redirects + max_redirects=10, + ssl=False, # Don't verify SSL + headers=headers) as response: - # Properly parse the URL - parsed_url = urllib.parse.urlparse(url) - parsed_domain = parsed_url.hostname + debug(f'Got response from {url}: status={response.status}, headers={dict(response.headers)}') result = { - 'domain': parsed_domain, + 'domain': urllib.parse.urlparse(url).hostname, 'status': response.status, 'url': str(response.url), - 'port': parsed_url.port or ('443' if parsed_url.scheme == 'https' else '80') + 'response_headers': dict(response.headers) } - # Early exit conditions - if result['status'] == -1: - return None - if self.match_codes and result['status'] not in self.match_codes: - return result - if self.exclude_codes and result['status'] in self.exclude_codes: - return result - - # Continue with full processing only if status code matches criteria - result['url'] = str(response.url) - - # Add headers if requested - headers = dict(response.headers) - if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')): - result['headers'] = headers - else: - # Only add content type/length if headers aren't included - if content_type := response.headers.get('content-type', '').split(';')[0]: - result['content_type'] = content_type - if content_length := response.headers.get('content-length'): - result['content_length'] = content_length - - # Only add redirect chain if it exists - if self.follow_redirects and response.history: + if response.history: result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)] - - # Do DNS lookups only if we're going to use the result - ips, cname, nameservers, _ = await resolve_all_dns( - parsed_domain, self.timeout, None, self.check_axfr - ) - - # Only add DNS fields if they have values - if ips: - result['ips'] = ips - if cname: - result['cname'] = cname - if nameservers: - result['nameservers'] = nameservers - - # Only add TLS info if available - if response.url.scheme == 'https': - try: - if ssl_object := response._protocol.transport.get_extra_info('ssl_object'): - if tls_info := await get_cert_info(ssl_object, str(response.url)): - # Only add TLS fields that have values - result['tls'] = {k: v for k, v in tls_info.items() if v} - except AttributeError: - debug(f'Failed to get SSL info for {url}') - - content_type = response.headers.get('Content-Type', '') - html = await response.text() if any(x in content_type.lower() for x in ['text/html', 'application/xhtml']) else None - - # Only add title if it exists - if soup := bs4.BeautifulSoup(html, 'html.parser'): - if soup.title and soup.title.string: - result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300] - - # Only add body if it exists - if body_text := soup.get_text(): - result['body'] = ' '.join(body_text.split()).rstrip('.')[:500] - - # Only add favicon hash if it exists - if favicon_hash := await get_favicon_hash(session, url, html): - result['favicon_hash'] = favicon_hash + debug(f'Redirect chain for {url}: {result["redirect_chain"]}') return result + except aiohttp.ClientSSLError as e: + debug(f'SSL Error for {url}: {str(e)}') + return { + 'domain': urllib.parse.urlparse(url).hostname, + 'status': -1, + 'error': f'SSL Error: {str(e)}', + 'protocol': 'https' if url.startswith('https://') else 'http', + 'error_type': 'SSL' + } + except aiohttp.ClientConnectorCertificateError as e: + debug(f'Certificate Error for {url}: {str(e)}') + return { + 'domain': urllib.parse.urlparse(url).hostname, + 'status': -1, + 'error': f'Certificate Error: {str(e)}', + 'protocol': 'https' if url.startswith('https://') else 'http', + 'error_type': 'CERT' + } + except aiohttp.ClientConnectorError as e: + debug(f'Connection Error for {url}: {str(e)}') + return { + 'domain': urllib.parse.urlparse(url).hostname, + 'status': -1, + 'error': f'Connection Failed: {str(e)}', + 'protocol': 'https' if url.startswith('https://') else 'http', + 'error_type': 'CONN' + } + except aiohttp.ClientError as e: + debug(f'HTTP Error for {url}: {e.__class__.__name__}: {str(e)}') + return { + 'domain': urllib.parse.urlparse(url).hostname, + 'status': -1, + 'error': f'HTTP Error: {e.__class__.__name__}: {str(e)}', + 'protocol': 'https' if url.startswith('https://') else 'http', + 'error_type': 'HTTP' + } + except asyncio.TimeoutError: + debug(f'Timeout for {url}') + return { + 'domain': urllib.parse.urlparse(url).hostname, + 'status': -1, + 'error': f'Connection Timed Out after {self.timeout}s', + 'protocol': 'https' if url.startswith('https://') else 'http', + 'error_type': 'TIMEOUT' + } except Exception as e: - debug(f'Error checking {url}: {str(e)}') - return None + debug(f'Unexpected error for {url}: {e.__class__.__name__}: {str(e)}') + return { + 'domain': urllib.parse.urlparse(url).hostname, + 'status': -1, + 'error': f'Error: {e.__class__.__name__}: {str(e)}', + 'protocol': 'https' if url.startswith('https://') else 'http', + 'error_type': 'UNKNOWN' + } async def scan(self, input_source): @@ -225,7 +203,9 @@ class HTTPZScanner: if not self.resolvers: self.resolvers = await load_resolvers(self.resolver_file) - async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: + # Just use ssl=False, that's all we need + connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True) + async with aiohttp.ClientSession(connector=connector) as session: tasks = {} # Change to dict to track domain for each task domain_queue = asyncio.Queue() queue_empty = False @@ -233,90 +213,75 @@ class HTTPZScanner: async def process_domain(domain): try: result = await self.check_domain(session, domain) + if self.show_progress: + self.progress_count += 1 if result: - if self.show_progress: - self.progress_count += 1 - return result - except Exception as e: - debug(f'Error processing {domain}: {str(e)}') - return None - - # Add domains to queue based on input type - async def queue_domains(): - try: - if isinstance(input_source, str): - # File or stdin input - gen = input_generator(input_source, self.shard) - async for domain in gen: - await domain_queue.put(domain) - - elif isinstance(input_source, (list, tuple)): - # List/tuple input - for line_num, domain in enumerate(input_source): - if domain := str(domain).strip(): - if self.shard is None or line_num % self.shard[1] == self.shard[0]: - await domain_queue.put(domain) - + return domain, result else: - # Async generator input - line_num = 0 - async for domain in input_source: - if isinstance(domain, bytes): - domain = domain.decode() - if domain := domain.strip(): - if self.shard is None or line_num % self.shard[1] == self.shard[0]: - await domain_queue.put(domain) - line_num += 1 + # Create a proper error result if check_domain returns None + return domain, { + 'domain': domain, + 'status': -1, + 'error': 'No successful response from either HTTP or HTTPS', + 'protocol': 'unknown', + 'error_type': 'NO_RESPONSE' + } except Exception as e: - debug(f'Error queuing domains: {str(e)}') - finally: - # Signal queue completion - await domain_queue.put(None) + debug(f'Error processing {domain}: {e.__class__.__name__}: {str(e)}') + # Return structured error information + return domain, { + 'domain': domain, + 'status': -1, + 'error': f'{e.__class__.__name__}: {str(e)}', + 'protocol': 'unknown', + 'error_type': 'PROCESS' + } + + # Queue processor + async def queue_processor(): + async for domain in input_generator(input_source, self.shard): + await domain_queue.put(domain) + self.processed_domains += 1 + nonlocal queue_empty + queue_empty = True + + # Start queue processor + queue_task = asyncio.create_task(queue_processor()) - # Start domain queuing task - queue_task = asyncio.create_task(queue_domains()) - try: - while not queue_empty or tasks: - # Start new tasks if needed - while len(tasks) < self.concurrent_limit and not queue_empty: - try: - domain = await domain_queue.get() - if domain is None: - queue_empty = True - break - task = asyncio.create_task(process_domain(domain)) - tasks[task] = domain - except Exception as e: - debug(f'Error creating task: {str(e)}') + while not (queue_empty and domain_queue.empty() and not tasks): + # Fill up tasks until we hit concurrent limit + while len(tasks) < self.concurrent_limit and not domain_queue.empty(): + domain = await domain_queue.get() + task = asyncio.create_task(process_domain(domain)) + tasks[task] = domain - if not tasks: - break - - # Wait for the FIRST task to complete - try: + if tasks: + # Wait for at least one task to complete done, _ = await asyncio.wait( tasks.keys(), - timeout=self.timeout, return_when=asyncio.FIRST_COMPLETED ) - # Process completed task immediately + # Process completed tasks for task in done: domain = tasks.pop(task) try: - if result := await task: + _, result = await task + if result: yield result except Exception as e: - debug(f'Error processing result for {domain}: {str(e)}') - - except Exception as e: - debug(f'Error in task processing loop: {str(e)}') - # Remove any failed tasks - failed_tasks = [t for t in tasks if t.done() and t.exception()] - for task in failed_tasks: - tasks.pop(task) - + debug(f'Task error for {domain}: {e.__class__.__name__}: {str(e)}') + yield { + 'domain': domain, + 'status': -1, + 'error': f'Task Error: {e.__class__.__name__}: {str(e)}', + 'protocol': 'unknown', + 'error_type': 'TASK' + } + else: + await asyncio.sleep(0.1) # Prevent CPU spin when no tasks + finally: # Clean up for task in tasks: diff --git a/httpz_scanner/utils.py b/httpz_scanner/utils.py index 792641c..671505a 100644 --- a/httpz_scanner/utils.py +++ b/httpz_scanner/utils.py @@ -68,8 +68,9 @@ USER_AGENTS = [ ] -def debug(msg: str): - if not SILENT_MODE: logging.debug(msg) +def debug(msg: str): + if not SILENT_MODE: + logging.debug(msg) def error(msg: str): if not SILENT_MODE: logging.error(msg) def info(msg: str): diff --git a/setup.py b/setup.py index 928f4ef..215d64c 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ with open('README.md', 'r', encoding='utf-8') as f: setup( name='httpz_scanner', - version='2.1.4', + version='2.1.5', author='acidvegas', author_email='acid.vegas@acid.vegas', description='Hyper-fast HTTP Scraping Tool', diff --git a/unit_test.py b/unit_test.py index b83b304..96af265 100644 --- a/unit_test.py +++ b/unit_test.py @@ -5,9 +5,10 @@ import asyncio import logging import sys +import time try: - from httpz_scanner import HTTPZScanner + from httpz_scanner import HTTPZScanner from httpz_scanner.colors import Colors except ImportError: raise ImportError('missing httpz_scanner library (pip install httpz_scanner)') @@ -38,7 +39,7 @@ logger.setLevel(logging.INFO) logger.addHandler(handler) -async def get_domains_from_url(): +async def get_domains_from_url() -> list: ''' Fetch domains from SecLists URL @@ -58,7 +59,7 @@ async def get_domains_from_url(): return [line.strip() for line in content.splitlines() if line.strip()] -async def domain_generator(domains): +async def domain_generator(domains: list): ''' Async generator that yields domains @@ -70,40 +71,113 @@ async def domain_generator(domains): yield domain -async def test_list_input(domains): - ''' - Test scanning using a list input +async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple: + '''Run a single benchmark test''' - :param domains: List of domains to scan - ''' + logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}') + scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True) + + count = 0 + got_first = False + start_time = None + + if test_type == 'List': + async for result in scanner.scan(domains): + if result: + if not got_first: + got_first = True + start_time = time.time() + count += 1 + + # More detailed status reporting + status_str = '' + if result['status'] < 0: + error_type = result.get('error_type', 'UNKNOWN') + error_msg = result.get('error', 'Unknown Error') + status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}" + elif 200 <= result['status'] < 300: + status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}" + elif 300 <= result['status'] < 400: + status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}" + else: + status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}" + + # Show protocol and response headers if available + protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else '' + headers_info = '' + if result.get('response_headers'): + important_headers = ['server', 'location', 'content-type'] + headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers] + if headers: + headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}" + + # Show redirect chain if present + redirect_info = '' + if result.get('redirect_chain'): + redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}" + + # Show error details if present + error_info = '' + if result.get('error'): + error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}" + + # Show final URL if different from original + url_info = '' + if result.get('url') and result['url'] != f"http(s)://{result['domain']}": + url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}" + + logging.info( + f"{test_type}-{concurrency} Result {count}: " + f"{status_str}{protocol_info} " + f"{Colors.CYAN}{result['domain']}{Colors.RESET}" + f"{redirect_info}" + f"{url_info}" + f"{headers_info}" + f"{error_info}" + ) + else: + # Skip generator test + pass + + elapsed = time.time() - start_time if start_time else 0 + domains_per_sec = count/elapsed if elapsed > 0 else 0 + logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}') + + return elapsed, domains_per_sec + + +async def test_list_input(domains: list): + '''Test scanning using a list input''' logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}') - scanner = HTTPZScanner(concurrent_limit=100, timeout=3, show_progress=True, debug_mode=True) + scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True) + start_time = time.time() count = 0 async for result in scanner.scan(domains): if result: count += 1 status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED - logging.info(f'List Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}') + title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else '' + error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else '' + logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}') -async def test_generator_input(domains): - ''' - Test scanning using an async generator input - - :param domains: List of domains to generate from - ''' +async def test_generator_input(domains: list): + '''Test scanning using an async generator input''' logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}') - scanner = HTTPZScanner(concurrent_limit=100, timeout=3, show_progress=True, debug_mode=True) + scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True) + start_time = time.time() count = 0 async for result in scanner.scan(domain_generator(domains)): if result: count += 1 status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED - logging.info(f'Generator Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}') + title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else '' + error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else '' + logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}') async def main() -> None: @@ -114,11 +188,39 @@ async def main() -> None: domains = await get_domains_from_url() logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing') - # Run tests - await test_generator_input(domains) - await test_list_input(domains) + # Store benchmark results + results = [] - logging.info(f'{Colors.GREEN}All tests completed successfully!{Colors.RESET}') + # Run tests with different concurrency levels + for concurrency in [25, 50, 100]: + # Generator tests + gen_result = await run_benchmark('Generator', domains, concurrency) + results.append(('Generator', concurrency, *gen_result)) + + # List tests + list_result = await run_benchmark('List', domains, concurrency) + results.append(('List', concurrency, *list_result)) + + # Print benchmark comparison + logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}') + logging.info('-' * 80) + logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}') + logging.info('-' * 80) + + # Sort by domains per second (fastest first) + results.sort(key=lambda x: x[3], reverse=True) + + for test_type, concurrency, elapsed, domains_per_sec in results: + logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}') + + # Highlight fastest result + fastest = results[0] + logging.info('-' * 80) + logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections') + logging.info(f'Time: {fastest[2]:.2f} seconds') + logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}') + + logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}') except Exception as e: logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')