fixed chunk output
This commit is contained in:
parent
7fe571ddad
commit
e220648a1a
@ -6,4 +6,4 @@ from .colors import Colors
|
|||||||
from .scanner import HTTPZScanner
|
from .scanner import HTTPZScanner
|
||||||
|
|
||||||
|
|
||||||
__version__ = '2.1.4'
|
__version__ = '2.1.5'
|
@ -91,6 +91,10 @@ async def main():
|
|||||||
# Add this to the argument parser section
|
# Add this to the argument parser section
|
||||||
parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")')
|
parser.add_argument('-pa', '--paths', help='Additional paths to check (comma-separated, e.g., ".git/config,.env")')
|
||||||
|
|
||||||
|
# Add these arguments in the parser section
|
||||||
|
parser.add_argument('-hd', '--headers', help='Custom headers to send with each request (format: "Header1: value1,Header2: value2")')
|
||||||
|
parser.add_argument('-p', '--post', help='Send POST request with this data')
|
||||||
|
|
||||||
# If no arguments provided, print help and exit
|
# If no arguments provided, print help and exit
|
||||||
if len(sys.argv) == 1:
|
if len(sys.argv) == 1:
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
@ -147,7 +151,9 @@ async def main():
|
|||||||
match_codes=args.match_codes,
|
match_codes=args.match_codes,
|
||||||
exclude_codes=args.exclude_codes,
|
exclude_codes=args.exclude_codes,
|
||||||
shard=args.shard,
|
shard=args.shard,
|
||||||
paths=args.paths.split(',') if args.paths else None
|
paths=args.paths.split(',') if args.paths else None,
|
||||||
|
custom_headers=dict(h.split(': ', 1) for h in args.headers.split(',')) if args.headers else None,
|
||||||
|
post_data=args.post
|
||||||
)
|
)
|
||||||
|
|
||||||
count = 0
|
count = 0
|
||||||
|
@ -42,20 +42,13 @@ def parse_domain_url(domain: str) -> tuple:
|
|||||||
try:
|
try:
|
||||||
port = int(port_str.split('/')[0])
|
port = int(port_str.split('/')[0])
|
||||||
except ValueError:
|
except ValueError:
|
||||||
port = 443 if protocol == 'https://' else 80
|
port = None
|
||||||
else:
|
|
||||||
port = 443 if protocol == 'https://' else 80
|
|
||||||
protocols = [f'{protocol}{base_domain}{":" + str(port) if port else ""}']
|
|
||||||
else:
|
else:
|
||||||
if ':' in base_domain.split('/')[0]:
|
if ':' in base_domain.split('/')[0]:
|
||||||
base_domain, port_str = base_domain.split(':', 1)
|
base_domain, port_str = base_domain.split(':', 1)
|
||||||
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else 443
|
port = int(port_str.split('/')[0]) if port_str.split('/')[0].isdigit() else None
|
||||||
else:
|
|
||||||
port = 443
|
protocols = ['http://', 'https://'] # Always try HTTP first
|
||||||
protocols = [
|
|
||||||
f'https://{base_domain}{":" + str(port) if port else ""}',
|
|
||||||
f'http://{base_domain}{":" + str(port) if port else ""}'
|
|
||||||
]
|
|
||||||
|
|
||||||
return base_domain, port, protocols
|
return base_domain, port, protocols
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import random
|
import random
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
|
import json
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import aiohttp
|
import aiohttp
|
||||||
@ -24,7 +25,7 @@ from .utils import debug, USER_AGENTS, input_generator
|
|||||||
class HTTPZScanner:
|
class HTTPZScanner:
|
||||||
'''Core scanner class for HTTP domain checking'''
|
'''Core scanner class for HTTP domain checking'''
|
||||||
|
|
||||||
def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None, paths = None):
|
def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None, shard = None, paths = None, custom_headers=None, post_data=None):
|
||||||
'''
|
'''
|
||||||
Initialize the HTTPZScanner class
|
Initialize the HTTPZScanner class
|
||||||
|
|
||||||
@ -42,6 +43,8 @@ class HTTPZScanner:
|
|||||||
:param exclude_codes: Status codes to exclude
|
:param exclude_codes: Status codes to exclude
|
||||||
:param shard: Tuple of (shard_index, total_shards) for distributed scanning
|
:param shard: Tuple of (shard_index, total_shards) for distributed scanning
|
||||||
:param paths: List of additional paths to check on each domain
|
:param paths: List of additional paths to check on each domain
|
||||||
|
:param custom_headers: Dictionary of custom headers to send with each request
|
||||||
|
:param post_data: Data to send with POST requests
|
||||||
'''
|
'''
|
||||||
|
|
||||||
self.concurrent_limit = concurrent_limit
|
self.concurrent_limit = concurrent_limit
|
||||||
@ -55,6 +58,8 @@ class HTTPZScanner:
|
|||||||
self.jsonl_output = jsonl_output
|
self.jsonl_output = jsonl_output
|
||||||
self.shard = shard
|
self.shard = shard
|
||||||
self.paths = paths or []
|
self.paths = paths or []
|
||||||
|
self.custom_headers = custom_headers or {}
|
||||||
|
self.post_data = post_data
|
||||||
|
|
||||||
self.show_fields = show_fields or {
|
self.show_fields = show_fields or {
|
||||||
'status_code' : True,
|
'status_code' : True,
|
||||||
@ -78,137 +83,110 @@ class HTTPZScanner:
|
|||||||
|
|
||||||
|
|
||||||
async def check_domain(self, session: aiohttp.ClientSession, domain: str):
|
async def check_domain(self, session: aiohttp.ClientSession, domain: str):
|
||||||
'''
|
'''Check a single domain and return results'''
|
||||||
Check a single domain and return results
|
|
||||||
|
|
||||||
:param session: aiohttp.ClientSession
|
|
||||||
:param domain: str
|
|
||||||
'''
|
|
||||||
# Parse domain
|
|
||||||
base_domain, port, protocols = parse_domain_url(domain)
|
base_domain, port, protocols = parse_domain_url(domain)
|
||||||
|
|
||||||
results = []
|
for protocol in protocols:
|
||||||
|
url = f'{protocol}{base_domain}'
|
||||||
|
if port:
|
||||||
|
url += f':{port}'
|
||||||
|
|
||||||
# For each protocol (http/https)
|
|
||||||
for base_url in protocols:
|
|
||||||
try:
|
try:
|
||||||
# Check base URL first
|
debug(f'Trying {url}...')
|
||||||
if result := await self._check_url(session, base_url):
|
result = await self._check_url(session, url)
|
||||||
results.append(result)
|
debug(f'Got result for {url}: {result}')
|
||||||
|
if result and (result['status'] != 400 or result.get('redirect_chain')): # Accept redirects
|
||||||
# Check additional paths
|
|
||||||
for path in self.paths:
|
|
||||||
path = path.strip('/')
|
|
||||||
url = f'{base_url}/{path}'
|
|
||||||
if result := await self._check_url(session, url):
|
|
||||||
results.append(result)
|
|
||||||
|
|
||||||
if results: # If we got any successful results, return them
|
|
||||||
break
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
debug(f'Error checking {base_url}: {str(e)}')
|
|
||||||
continue
|
|
||||||
|
|
||||||
return results[0] if results else None # Return first successful result or None
|
|
||||||
|
|
||||||
async def _check_url(self, session: aiohttp.ClientSession, url: str):
|
|
||||||
'''
|
|
||||||
Check a single URL and return results
|
|
||||||
|
|
||||||
:param session: aiohttp.ClientSession
|
|
||||||
:param url: URL to check
|
|
||||||
'''
|
|
||||||
try:
|
|
||||||
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
|
||||||
|
|
||||||
async with session.get(url, timeout=self.timeout,
|
|
||||||
allow_redirects=self.follow_redirects,
|
|
||||||
max_redirects=10 if self.follow_redirects else 0,
|
|
||||||
headers=headers) as response:
|
|
||||||
|
|
||||||
# Properly parse the URL
|
|
||||||
parsed_url = urllib.parse.urlparse(url)
|
|
||||||
parsed_domain = parsed_url.hostname
|
|
||||||
|
|
||||||
result = {
|
|
||||||
'domain': parsed_domain,
|
|
||||||
'status': response.status,
|
|
||||||
'url': str(response.url),
|
|
||||||
'port': parsed_url.port or ('443' if parsed_url.scheme == 'https' else '80')
|
|
||||||
}
|
|
||||||
|
|
||||||
# Early exit conditions
|
|
||||||
if result['status'] == -1:
|
|
||||||
return None
|
|
||||||
if self.match_codes and result['status'] not in self.match_codes:
|
|
||||||
return result
|
return result
|
||||||
if self.exclude_codes and result['status'] in self.exclude_codes:
|
|
||||||
return result
|
|
||||||
|
|
||||||
# Continue with full processing only if status code matches criteria
|
|
||||||
result['url'] = str(response.url)
|
|
||||||
|
|
||||||
# Add headers if requested
|
|
||||||
headers = dict(response.headers)
|
|
||||||
if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')):
|
|
||||||
result['headers'] = headers
|
|
||||||
else:
|
|
||||||
# Only add content type/length if headers aren't included
|
|
||||||
if content_type := response.headers.get('content-type', '').split(';')[0]:
|
|
||||||
result['content_type'] = content_type
|
|
||||||
if content_length := response.headers.get('content-length'):
|
|
||||||
result['content_length'] = content_length
|
|
||||||
|
|
||||||
# Only add redirect chain if it exists
|
|
||||||
if self.follow_redirects and response.history:
|
|
||||||
result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
|
|
||||||
|
|
||||||
# Do DNS lookups only if we're going to use the result
|
|
||||||
ips, cname, nameservers, _ = await resolve_all_dns(
|
|
||||||
parsed_domain, self.timeout, None, self.check_axfr
|
|
||||||
)
|
|
||||||
|
|
||||||
# Only add DNS fields if they have values
|
|
||||||
if ips:
|
|
||||||
result['ips'] = ips
|
|
||||||
if cname:
|
|
||||||
result['cname'] = cname
|
|
||||||
if nameservers:
|
|
||||||
result['nameservers'] = nameservers
|
|
||||||
|
|
||||||
# Only add TLS info if available
|
|
||||||
if response.url.scheme == 'https':
|
|
||||||
try:
|
|
||||||
if ssl_object := response._protocol.transport.get_extra_info('ssl_object'):
|
|
||||||
if tls_info := await get_cert_info(ssl_object, str(response.url)):
|
|
||||||
# Only add TLS fields that have values
|
|
||||||
result['tls'] = {k: v for k, v in tls_info.items() if v}
|
|
||||||
except AttributeError:
|
|
||||||
debug(f'Failed to get SSL info for {url}')
|
|
||||||
|
|
||||||
content_type = response.headers.get('Content-Type', '')
|
|
||||||
html = await response.text() if any(x in content_type.lower() for x in ['text/html', 'application/xhtml']) else None
|
|
||||||
|
|
||||||
# Only add title if it exists
|
|
||||||
if soup := bs4.BeautifulSoup(html, 'html.parser'):
|
|
||||||
if soup.title and soup.title.string:
|
|
||||||
result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300]
|
|
||||||
|
|
||||||
# Only add body if it exists
|
|
||||||
if body_text := soup.get_text():
|
|
||||||
result['body'] = ' '.join(body_text.split()).rstrip('.')[:500]
|
|
||||||
|
|
||||||
# Only add favicon hash if it exists
|
|
||||||
if favicon_hash := await get_favicon_hash(session, url, html):
|
|
||||||
result['favicon_hash'] = favicon_hash
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
debug(f'Error checking {url}: {str(e)}')
|
debug(f'Error checking {url}: {str(e)}')
|
||||||
|
continue
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def _check_url(self, session: aiohttp.ClientSession, url: str):
|
||||||
|
'''Check a single URL and return results'''
|
||||||
|
try:
|
||||||
|
headers = {'User-Agent': random.choice(USER_AGENTS)}
|
||||||
|
headers.update(self.custom_headers)
|
||||||
|
|
||||||
|
debug(f'Making request to {url} with headers: {headers}')
|
||||||
|
async with session.request('GET', url,
|
||||||
|
timeout=self.timeout,
|
||||||
|
allow_redirects=True, # Always follow redirects
|
||||||
|
max_redirects=10,
|
||||||
|
ssl=False, # Don't verify SSL
|
||||||
|
headers=headers) as response:
|
||||||
|
|
||||||
|
debug(f'Got response from {url}: status={response.status}, headers={dict(response.headers)}')
|
||||||
|
|
||||||
|
result = {
|
||||||
|
'domain': urllib.parse.urlparse(url).hostname,
|
||||||
|
'status': response.status,
|
||||||
|
'url': str(response.url),
|
||||||
|
'response_headers': dict(response.headers)
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.history:
|
||||||
|
result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)]
|
||||||
|
debug(f'Redirect chain for {url}: {result["redirect_chain"]}')
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except aiohttp.ClientSSLError as e:
|
||||||
|
debug(f'SSL Error for {url}: {str(e)}')
|
||||||
|
return {
|
||||||
|
'domain': urllib.parse.urlparse(url).hostname,
|
||||||
|
'status': -1,
|
||||||
|
'error': f'SSL Error: {str(e)}',
|
||||||
|
'protocol': 'https' if url.startswith('https://') else 'http',
|
||||||
|
'error_type': 'SSL'
|
||||||
|
}
|
||||||
|
except aiohttp.ClientConnectorCertificateError as e:
|
||||||
|
debug(f'Certificate Error for {url}: {str(e)}')
|
||||||
|
return {
|
||||||
|
'domain': urllib.parse.urlparse(url).hostname,
|
||||||
|
'status': -1,
|
||||||
|
'error': f'Certificate Error: {str(e)}',
|
||||||
|
'protocol': 'https' if url.startswith('https://') else 'http',
|
||||||
|
'error_type': 'CERT'
|
||||||
|
}
|
||||||
|
except aiohttp.ClientConnectorError as e:
|
||||||
|
debug(f'Connection Error for {url}: {str(e)}')
|
||||||
|
return {
|
||||||
|
'domain': urllib.parse.urlparse(url).hostname,
|
||||||
|
'status': -1,
|
||||||
|
'error': f'Connection Failed: {str(e)}',
|
||||||
|
'protocol': 'https' if url.startswith('https://') else 'http',
|
||||||
|
'error_type': 'CONN'
|
||||||
|
}
|
||||||
|
except aiohttp.ClientError as e:
|
||||||
|
debug(f'HTTP Error for {url}: {e.__class__.__name__}: {str(e)}')
|
||||||
|
return {
|
||||||
|
'domain': urllib.parse.urlparse(url).hostname,
|
||||||
|
'status': -1,
|
||||||
|
'error': f'HTTP Error: {e.__class__.__name__}: {str(e)}',
|
||||||
|
'protocol': 'https' if url.startswith('https://') else 'http',
|
||||||
|
'error_type': 'HTTP'
|
||||||
|
}
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
debug(f'Timeout for {url}')
|
||||||
|
return {
|
||||||
|
'domain': urllib.parse.urlparse(url).hostname,
|
||||||
|
'status': -1,
|
||||||
|
'error': f'Connection Timed Out after {self.timeout}s',
|
||||||
|
'protocol': 'https' if url.startswith('https://') else 'http',
|
||||||
|
'error_type': 'TIMEOUT'
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
debug(f'Unexpected error for {url}: {e.__class__.__name__}: {str(e)}')
|
||||||
|
return {
|
||||||
|
'domain': urllib.parse.urlparse(url).hostname,
|
||||||
|
'status': -1,
|
||||||
|
'error': f'Error: {e.__class__.__name__}: {str(e)}',
|
||||||
|
'protocol': 'https' if url.startswith('https://') else 'http',
|
||||||
|
'error_type': 'UNKNOWN'
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async def scan(self, input_source):
|
async def scan(self, input_source):
|
||||||
'''
|
'''
|
||||||
@ -225,7 +203,9 @@ class HTTPZScanner:
|
|||||||
if not self.resolvers:
|
if not self.resolvers:
|
||||||
self.resolvers = await load_resolvers(self.resolver_file)
|
self.resolvers = await load_resolvers(self.resolver_file)
|
||||||
|
|
||||||
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
|
# Just use ssl=False, that's all we need
|
||||||
|
connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True)
|
||||||
|
async with aiohttp.ClientSession(connector=connector) as session:
|
||||||
tasks = {} # Change to dict to track domain for each task
|
tasks = {} # Change to dict to track domain for each task
|
||||||
domain_queue = asyncio.Queue()
|
domain_queue = asyncio.Queue()
|
||||||
queue_empty = False
|
queue_empty = False
|
||||||
@ -233,89 +213,74 @@ class HTTPZScanner:
|
|||||||
async def process_domain(domain):
|
async def process_domain(domain):
|
||||||
try:
|
try:
|
||||||
result = await self.check_domain(session, domain)
|
result = await self.check_domain(session, domain)
|
||||||
if result:
|
|
||||||
if self.show_progress:
|
if self.show_progress:
|
||||||
self.progress_count += 1
|
self.progress_count += 1
|
||||||
return result
|
if result:
|
||||||
except Exception as e:
|
return domain, result
|
||||||
debug(f'Error processing {domain}: {str(e)}')
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Add domains to queue based on input type
|
|
||||||
async def queue_domains():
|
|
||||||
try:
|
|
||||||
if isinstance(input_source, str):
|
|
||||||
# File or stdin input
|
|
||||||
gen = input_generator(input_source, self.shard)
|
|
||||||
async for domain in gen:
|
|
||||||
await domain_queue.put(domain)
|
|
||||||
|
|
||||||
elif isinstance(input_source, (list, tuple)):
|
|
||||||
# List/tuple input
|
|
||||||
for line_num, domain in enumerate(input_source):
|
|
||||||
if domain := str(domain).strip():
|
|
||||||
if self.shard is None or line_num % self.shard[1] == self.shard[0]:
|
|
||||||
await domain_queue.put(domain)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Async generator input
|
# Create a proper error result if check_domain returns None
|
||||||
line_num = 0
|
return domain, {
|
||||||
async for domain in input_source:
|
'domain': domain,
|
||||||
if isinstance(domain, bytes):
|
'status': -1,
|
||||||
domain = domain.decode()
|
'error': 'No successful response from either HTTP or HTTPS',
|
||||||
if domain := domain.strip():
|
'protocol': 'unknown',
|
||||||
if self.shard is None or line_num % self.shard[1] == self.shard[0]:
|
'error_type': 'NO_RESPONSE'
|
||||||
await domain_queue.put(domain)
|
}
|
||||||
line_num += 1
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
debug(f'Error queuing domains: {str(e)}')
|
debug(f'Error processing {domain}: {e.__class__.__name__}: {str(e)}')
|
||||||
finally:
|
# Return structured error information
|
||||||
# Signal queue completion
|
return domain, {
|
||||||
await domain_queue.put(None)
|
'domain': domain,
|
||||||
|
'status': -1,
|
||||||
|
'error': f'{e.__class__.__name__}: {str(e)}',
|
||||||
|
'protocol': 'unknown',
|
||||||
|
'error_type': 'PROCESS'
|
||||||
|
}
|
||||||
|
|
||||||
# Start domain queuing task
|
# Queue processor
|
||||||
queue_task = asyncio.create_task(queue_domains())
|
async def queue_processor():
|
||||||
|
async for domain in input_generator(input_source, self.shard):
|
||||||
try:
|
await domain_queue.put(domain)
|
||||||
while not queue_empty or tasks:
|
self.processed_domains += 1
|
||||||
# Start new tasks if needed
|
nonlocal queue_empty
|
||||||
while len(tasks) < self.concurrent_limit and not queue_empty:
|
|
||||||
try:
|
|
||||||
domain = await domain_queue.get()
|
|
||||||
if domain is None:
|
|
||||||
queue_empty = True
|
queue_empty = True
|
||||||
break
|
|
||||||
|
# Start queue processor
|
||||||
|
queue_task = asyncio.create_task(queue_processor())
|
||||||
|
|
||||||
|
try:
|
||||||
|
while not (queue_empty and domain_queue.empty() and not tasks):
|
||||||
|
# Fill up tasks until we hit concurrent limit
|
||||||
|
while len(tasks) < self.concurrent_limit and not domain_queue.empty():
|
||||||
|
domain = await domain_queue.get()
|
||||||
task = asyncio.create_task(process_domain(domain))
|
task = asyncio.create_task(process_domain(domain))
|
||||||
tasks[task] = domain
|
tasks[task] = domain
|
||||||
except Exception as e:
|
|
||||||
debug(f'Error creating task: {str(e)}')
|
|
||||||
|
|
||||||
if not tasks:
|
if tasks:
|
||||||
break
|
# Wait for at least one task to complete
|
||||||
|
|
||||||
# Wait for the FIRST task to complete
|
|
||||||
try:
|
|
||||||
done, _ = await asyncio.wait(
|
done, _ = await asyncio.wait(
|
||||||
tasks.keys(),
|
tasks.keys(),
|
||||||
timeout=self.timeout,
|
|
||||||
return_when=asyncio.FIRST_COMPLETED
|
return_when=asyncio.FIRST_COMPLETED
|
||||||
)
|
)
|
||||||
|
|
||||||
# Process completed task immediately
|
# Process completed tasks
|
||||||
for task in done:
|
for task in done:
|
||||||
domain = tasks.pop(task)
|
domain = tasks.pop(task)
|
||||||
try:
|
try:
|
||||||
if result := await task:
|
_, result = await task
|
||||||
|
if result:
|
||||||
yield result
|
yield result
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
debug(f'Error processing result for {domain}: {str(e)}')
|
debug(f'Task error for {domain}: {e.__class__.__name__}: {str(e)}')
|
||||||
|
yield {
|
||||||
except Exception as e:
|
'domain': domain,
|
||||||
debug(f'Error in task processing loop: {str(e)}')
|
'status': -1,
|
||||||
# Remove any failed tasks
|
'error': f'Task Error: {e.__class__.__name__}: {str(e)}',
|
||||||
failed_tasks = [t for t in tasks if t.done() and t.exception()]
|
'protocol': 'unknown',
|
||||||
for task in failed_tasks:
|
'error_type': 'TASK'
|
||||||
tasks.pop(task)
|
}
|
||||||
|
else:
|
||||||
|
await asyncio.sleep(0.1) # Prevent CPU spin when no tasks
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Clean up
|
# Clean up
|
||||||
|
@ -69,7 +69,8 @@ USER_AGENTS = [
|
|||||||
|
|
||||||
|
|
||||||
def debug(msg: str):
|
def debug(msg: str):
|
||||||
if not SILENT_MODE: logging.debug(msg)
|
if not SILENT_MODE:
|
||||||
|
logging.debug(msg)
|
||||||
def error(msg: str):
|
def error(msg: str):
|
||||||
if not SILENT_MODE: logging.error(msg)
|
if not SILENT_MODE: logging.error(msg)
|
||||||
def info(msg: str):
|
def info(msg: str):
|
||||||
|
2
setup.py
2
setup.py
@ -10,7 +10,7 @@ with open('README.md', 'r', encoding='utf-8') as f:
|
|||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='httpz_scanner',
|
name='httpz_scanner',
|
||||||
version='2.1.4',
|
version='2.1.5',
|
||||||
author='acidvegas',
|
author='acidvegas',
|
||||||
author_email='acid.vegas@acid.vegas',
|
author_email='acid.vegas@acid.vegas',
|
||||||
description='Hyper-fast HTTP Scraping Tool',
|
description='Hyper-fast HTTP Scraping Tool',
|
||||||
|
144
unit_test.py
144
unit_test.py
@ -5,6 +5,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from httpz_scanner import HTTPZScanner
|
from httpz_scanner import HTTPZScanner
|
||||||
@ -38,7 +39,7 @@ logger.setLevel(logging.INFO)
|
|||||||
logger.addHandler(handler)
|
logger.addHandler(handler)
|
||||||
|
|
||||||
|
|
||||||
async def get_domains_from_url():
|
async def get_domains_from_url() -> list:
|
||||||
'''
|
'''
|
||||||
Fetch domains from SecLists URL
|
Fetch domains from SecLists URL
|
||||||
|
|
||||||
@ -58,7 +59,7 @@ async def get_domains_from_url():
|
|||||||
return [line.strip() for line in content.splitlines() if line.strip()]
|
return [line.strip() for line in content.splitlines() if line.strip()]
|
||||||
|
|
||||||
|
|
||||||
async def domain_generator(domains):
|
async def domain_generator(domains: list):
|
||||||
'''
|
'''
|
||||||
Async generator that yields domains
|
Async generator that yields domains
|
||||||
|
|
||||||
@ -70,40 +71,113 @@ async def domain_generator(domains):
|
|||||||
yield domain
|
yield domain
|
||||||
|
|
||||||
|
|
||||||
async def test_list_input(domains):
|
async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple:
|
||||||
'''
|
'''Run a single benchmark test'''
|
||||||
Test scanning using a list input
|
|
||||||
|
|
||||||
:param domains: List of domains to scan
|
logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}')
|
||||||
'''
|
scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
got_first = False
|
||||||
|
start_time = None
|
||||||
|
|
||||||
|
if test_type == 'List':
|
||||||
|
async for result in scanner.scan(domains):
|
||||||
|
if result:
|
||||||
|
if not got_first:
|
||||||
|
got_first = True
|
||||||
|
start_time = time.time()
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
# More detailed status reporting
|
||||||
|
status_str = ''
|
||||||
|
if result['status'] < 0:
|
||||||
|
error_type = result.get('error_type', 'UNKNOWN')
|
||||||
|
error_msg = result.get('error', 'Unknown Error')
|
||||||
|
status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}"
|
||||||
|
elif 200 <= result['status'] < 300:
|
||||||
|
status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
|
||||||
|
elif 300 <= result['status'] < 400:
|
||||||
|
status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
|
||||||
|
else:
|
||||||
|
status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
|
||||||
|
|
||||||
|
# Show protocol and response headers if available
|
||||||
|
protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else ''
|
||||||
|
headers_info = ''
|
||||||
|
if result.get('response_headers'):
|
||||||
|
important_headers = ['server', 'location', 'content-type']
|
||||||
|
headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers]
|
||||||
|
if headers:
|
||||||
|
headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}"
|
||||||
|
|
||||||
|
# Show redirect chain if present
|
||||||
|
redirect_info = ''
|
||||||
|
if result.get('redirect_chain'):
|
||||||
|
redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}"
|
||||||
|
|
||||||
|
# Show error details if present
|
||||||
|
error_info = ''
|
||||||
|
if result.get('error'):
|
||||||
|
error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}"
|
||||||
|
|
||||||
|
# Show final URL if different from original
|
||||||
|
url_info = ''
|
||||||
|
if result.get('url') and result['url'] != f"http(s)://{result['domain']}":
|
||||||
|
url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}"
|
||||||
|
|
||||||
|
logging.info(
|
||||||
|
f"{test_type}-{concurrency} Result {count}: "
|
||||||
|
f"{status_str}{protocol_info} "
|
||||||
|
f"{Colors.CYAN}{result['domain']}{Colors.RESET}"
|
||||||
|
f"{redirect_info}"
|
||||||
|
f"{url_info}"
|
||||||
|
f"{headers_info}"
|
||||||
|
f"{error_info}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Skip generator test
|
||||||
|
pass
|
||||||
|
|
||||||
|
elapsed = time.time() - start_time if start_time else 0
|
||||||
|
domains_per_sec = count/elapsed if elapsed > 0 else 0
|
||||||
|
logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}')
|
||||||
|
|
||||||
|
return elapsed, domains_per_sec
|
||||||
|
|
||||||
|
|
||||||
|
async def test_list_input(domains: list):
|
||||||
|
'''Test scanning using a list input'''
|
||||||
|
|
||||||
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
|
logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
|
||||||
scanner = HTTPZScanner(concurrent_limit=100, timeout=3, show_progress=True, debug_mode=True)
|
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
count = 0
|
count = 0
|
||||||
async for result in scanner.scan(domains):
|
async for result in scanner.scan(domains):
|
||||||
if result:
|
if result:
|
||||||
count += 1
|
count += 1
|
||||||
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
|
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
|
||||||
logging.info(f'List Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}')
|
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
|
||||||
|
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
|
||||||
|
logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
|
||||||
|
|
||||||
|
|
||||||
async def test_generator_input(domains):
|
async def test_generator_input(domains: list):
|
||||||
'''
|
'''Test scanning using an async generator input'''
|
||||||
Test scanning using an async generator input
|
|
||||||
|
|
||||||
:param domains: List of domains to generate from
|
|
||||||
'''
|
|
||||||
|
|
||||||
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
|
logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
|
||||||
scanner = HTTPZScanner(concurrent_limit=100, timeout=3, show_progress=True, debug_mode=True)
|
scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
count = 0
|
count = 0
|
||||||
async for result in scanner.scan(domain_generator(domains)):
|
async for result in scanner.scan(domain_generator(domains)):
|
||||||
if result:
|
if result:
|
||||||
count += 1
|
count += 1
|
||||||
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
|
status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
|
||||||
logging.info(f'Generator Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}')
|
title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
|
||||||
|
error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
|
||||||
|
logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
async def main() -> None:
|
||||||
@ -114,11 +188,39 @@ async def main() -> None:
|
|||||||
domains = await get_domains_from_url()
|
domains = await get_domains_from_url()
|
||||||
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
|
logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
|
||||||
|
|
||||||
# Run tests
|
# Store benchmark results
|
||||||
await test_generator_input(domains)
|
results = []
|
||||||
await test_list_input(domains)
|
|
||||||
|
|
||||||
logging.info(f'{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
|
# Run tests with different concurrency levels
|
||||||
|
for concurrency in [25, 50, 100]:
|
||||||
|
# Generator tests
|
||||||
|
gen_result = await run_benchmark('Generator', domains, concurrency)
|
||||||
|
results.append(('Generator', concurrency, *gen_result))
|
||||||
|
|
||||||
|
# List tests
|
||||||
|
list_result = await run_benchmark('List', domains, concurrency)
|
||||||
|
results.append(('List', concurrency, *list_result))
|
||||||
|
|
||||||
|
# Print benchmark comparison
|
||||||
|
logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}')
|
||||||
|
logging.info('-' * 80)
|
||||||
|
logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}')
|
||||||
|
logging.info('-' * 80)
|
||||||
|
|
||||||
|
# Sort by domains per second (fastest first)
|
||||||
|
results.sort(key=lambda x: x[3], reverse=True)
|
||||||
|
|
||||||
|
for test_type, concurrency, elapsed, domains_per_sec in results:
|
||||||
|
logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}')
|
||||||
|
|
||||||
|
# Highlight fastest result
|
||||||
|
fastest = results[0]
|
||||||
|
logging.info('-' * 80)
|
||||||
|
logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections')
|
||||||
|
logging.info(f'Time: {fastest[2]:.2f} seconds')
|
||||||
|
logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}')
|
||||||
|
|
||||||
|
logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
|
logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
|
||||||
|
Loading…
Reference in New Issue
Block a user