From 7fe571ddad2ae241f67c8d48985453d5afa0cff7 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Wed, 12 Feb 2025 00:50:02 -0500 Subject: [PATCH] fixed chunk output --- httpz_scanner/__init__.py | 2 +- httpz_scanner/cli.py | 6 +- httpz_scanner/scanner.py | 213 ++++++++++++++++++++++---------------- setup.py | 2 +- 4 files changed, 128 insertions(+), 95 deletions(-) diff --git a/httpz_scanner/__init__.py b/httpz_scanner/__init__.py index cca02a7..c529ec1 100644 --- a/httpz_scanner/__init__.py +++ b/httpz_scanner/__init__.py @@ -6,4 +6,4 @@ from .colors import Colors from .scanner import HTTPZScanner -__version__ = '2.1.3' \ No newline at end of file +__version__ = '2.1.4' \ No newline at end of file diff --git a/httpz_scanner/cli.py b/httpz_scanner/cli.py index a1eb9a3..f571b40 100644 --- a/httpz_scanner/cli.py +++ b/httpz_scanner/cli.py @@ -87,6 +87,9 @@ async def main(): # Add shard argument parser.add_argument('-sh','--shard', type=parse_shard, help='Shard index and total shards (e.g., 1/3)') + + # Add this to the argument parser section + parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")') # If no arguments provided, print help and exit if len(sys.argv) == 1: @@ -143,7 +146,8 @@ async def main(): show_fields=show_fields, match_codes=args.match_codes, exclude_codes=args.exclude_codes, - shard=args.shard + shard=args.shard, + paths=args.paths.split(',') if args.paths else None ) count = 0 diff --git a/httpz_scanner/scanner.py b/httpz_scanner/scanner.py index 5fb7900..e3c20f9 100644 --- a/httpz_scanner/scanner.py +++ b/httpz_scanner/scanner.py @@ -4,6 +4,7 @@ import asyncio import random +import urllib.parse try: import aiohttp @@ -23,7 +24,7 @@ from .utils import debug, USER_AGENTS, input_generator class HTTPZScanner: '''Core scanner class for HTTP domain checking''' - def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None): + def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None, paths = None): ''' Initialize the HTTPZScanner class @@ -40,6 +41,7 @@ class HTTPZScanner: :param match_codes: Status codes to match :param exclude_codes: Status codes to exclude :param shard: Tuple of (shard_index, total_shards) for distributed scanning + :param paths: List of additional paths to check on each domain ''' self.concurrent_limit = concurrent_limit @@ -52,6 +54,7 @@ class HTTPZScanner: self.debug_mode = debug_mode self.jsonl_output = jsonl_output self.shard = shard + self.paths = paths or [] self.show_fields = show_fields or { 'status_code' : True, @@ -81,104 +84,130 @@ class HTTPZScanner: :param session: aiohttp.ClientSession :param domain: str ''' - - # Get random nameserver - nameserver = random.choice(self.resolvers) if self.resolvers else None - # Parse domain base_domain, port, protocols = parse_domain_url(domain) - - # Initialize result dictionary - result = { - 'domain' : base_domain, - 'status' : 0, - 'url' : protocols[0], - 'port' : port, - } - - # Try each protocol - for url in protocols: + + results = [] + + # For each protocol (http/https) + for base_url in protocols: try: - # Set random user agent for each request - headers = {'User-Agent': random.choice(USER_AGENTS)} + # Check base URL first + if result := await self._check_url(session, base_url): + results.append(result) - async with session.get(url, timeout=self.timeout, allow_redirects=self.follow_redirects, max_redirects=10 if self.follow_redirects else 0, headers=headers) as response: - result['status'] = response.status - - # Bail immediately if it's a failed lookup - no point processing further - if result['status'] == -1: - return None - - # Early exit if status code doesn't match criteria - if self.match_codes and result['status'] not in self.match_codes: - return result - if self.exclude_codes and result['status'] in self.exclude_codes: - return result - - # Continue with full processing only if status code matches criteria - result['url'] = str(response.url) - - # Add headers if requested - headers = dict(response.headers) - if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')): - result['headers'] = headers - else: - # Only add content type/length if headers aren't included - if content_type := response.headers.get('content-type', '').split(';')[0]: - result['content_type'] = content_type - if content_length := response.headers.get('content-length'): - result['content_length'] = content_length - - # Only add redirect chain if it exists - if self.follow_redirects and response.history: - result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)] - - # Do DNS lookups only if we're going to use the result - ips, cname, nameservers, _ = await resolve_all_dns( - base_domain, self.timeout, nameserver, self.check_axfr - ) - - # Only add DNS fields if they have values - if ips: - result['ips'] = ips - if cname: - result['cname'] = cname - if nameservers: - result['nameservers'] = nameservers - - # Only add TLS info if available - if response.url.scheme == 'https': - try: - if ssl_object := response._protocol.transport.get_extra_info('ssl_object'): - if tls_info := await get_cert_info(ssl_object, str(response.url)): - # Only add TLS fields that have values - result['tls'] = {k: v for k, v in tls_info.items() if v} - except AttributeError: - debug(f'Failed to get SSL info for {url}') - - content_type = response.headers.get('Content-Type', '') - html = await response.text() if any(x in content_type.lower() for x in ['text/html', 'application/xhtml']) else None - - # Only add title if it exists - if soup := bs4.BeautifulSoup(html, 'html.parser'): - if soup.title and soup.title.string: - result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300] - - # Only add body if it exists - if body_text := soup.get_text(): - result['body'] = ' '.join(body_text.split()).rstrip('.')[:500] - - # Only add favicon hash if it exists - if favicon_hash := await get_favicon_hash(session, url, html): - result['favicon_hash'] = favicon_hash - + # Check additional paths + for path in self.paths: + path = path.strip('/') + url = f'{base_url}/{path}' + if result := await self._check_url(session, url): + results.append(result) + + if results: # If we got any successful results, return them break + except Exception as e: - debug(f'Error checking {url}: {str(e)}') - continue # Just continue to next protocol without setting status = -1 + debug(f'Error checking {base_url}: {str(e)}') + continue + + return results[0] if results else None # Return first successful result or None - # Return None if we never got a valid status - return None if result['status'] == 0 else result + async def _check_url(self, session: aiohttp.ClientSession, url: str): + ''' + Check a single URL and return results + + :param session: aiohttp.ClientSession + :param url: URL to check + ''' + try: + headers = {'User-Agent': random.choice(USER_AGENTS)} + + async with session.get(url, timeout=self.timeout, + allow_redirects=self.follow_redirects, + max_redirects=10 if self.follow_redirects else 0, + headers=headers) as response: + + # Properly parse the URL + parsed_url = urllib.parse.urlparse(url) + parsed_domain = parsed_url.hostname + + result = { + 'domain': parsed_domain, + 'status': response.status, + 'url': str(response.url), + 'port': parsed_url.port or ('443' if parsed_url.scheme == 'https' else '80') + } + + # Early exit conditions + if result['status'] == -1: + return None + if self.match_codes and result['status'] not in self.match_codes: + return result + if self.exclude_codes and result['status'] in self.exclude_codes: + return result + + # Continue with full processing only if status code matches criteria + result['url'] = str(response.url) + + # Add headers if requested + headers = dict(response.headers) + if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')): + result['headers'] = headers + else: + # Only add content type/length if headers aren't included + if content_type := response.headers.get('content-type', '').split(';')[0]: + result['content_type'] = content_type + if content_length := response.headers.get('content-length'): + result['content_length'] = content_length + + # Only add redirect chain if it exists + if self.follow_redirects and response.history: + result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)] + + # Do DNS lookups only if we're going to use the result + ips, cname, nameservers, _ = await resolve_all_dns( + parsed_domain, self.timeout, None, self.check_axfr + ) + + # Only add DNS fields if they have values + if ips: + result['ips'] = ips + if cname: + result['cname'] = cname + if nameservers: + result['nameservers'] = nameservers + + # Only add TLS info if available + if response.url.scheme == 'https': + try: + if ssl_object := response._protocol.transport.get_extra_info('ssl_object'): + if tls_info := await get_cert_info(ssl_object, str(response.url)): + # Only add TLS fields that have values + result['tls'] = {k: v for k, v in tls_info.items() if v} + except AttributeError: + debug(f'Failed to get SSL info for {url}') + + content_type = response.headers.get('Content-Type', '') + html = await response.text() if any(x in content_type.lower() for x in ['text/html', 'application/xhtml']) else None + + # Only add title if it exists + if soup := bs4.BeautifulSoup(html, 'html.parser'): + if soup.title and soup.title.string: + result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300] + + # Only add body if it exists + if body_text := soup.get_text(): + result['body'] = ' '.join(body_text.split()).rstrip('.')[:500] + + # Only add favicon hash if it exists + if favicon_hash := await get_favicon_hash(session, url, html): + result['favicon_hash'] = favicon_hash + + return result + + except Exception as e: + debug(f'Error checking {url}: {str(e)}') + return None async def scan(self, input_source): diff --git a/setup.py b/setup.py index e28f238..928f4ef 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ with open('README.md', 'r', encoding='utf-8') as f: setup( name='httpz_scanner', - version='2.1.3', + version='2.1.4', author='acidvegas', author_email='acid.vegas@acid.vegas', description='Hyper-fast HTTP Scraping Tool',