From 90b5134c252197c2c0dbd895ca7251aa5b908093 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Tue, 11 Feb 2025 22:30:22 -0500 Subject: [PATCH] Added unit test --- httpz_scanner/__init__.py | 4 +- httpz_scanner/__main__.py | 3 + httpz_scanner/cli.py | 63 ++++++++--------- httpz_scanner/colors.py | 15 ++-- httpz_scanner/dns.py | 60 ++++++++++------ httpz_scanner/formatters.py | 1 + httpz_scanner/parsers.py | 5 +- httpz_scanner/scanner.py | 57 ++++++++-------- httpz_scanner/utils.py | 8 +-- setup.py | 2 +- unit_test.py | 133 ++++++++++++++++++++++++++++++++++++ 11 files changed, 254 insertions(+), 97 deletions(-) create mode 100644 unit_test.py diff --git a/httpz_scanner/__init__.py b/httpz_scanner/__init__.py index 38c1017..25900a5 100644 --- a/httpz_scanner/__init__.py +++ b/httpz_scanner/__init__.py @@ -2,8 +2,8 @@ # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) # httpz_scanner/__init__.py -from .scanner import HTTPZScanner from .colors import Colors +from .scanner import HTTPZScanner -__version__ = '2.0.11' \ No newline at end of file +__version__ = '2.1.0' \ No newline at end of file diff --git a/httpz_scanner/__main__.py b/httpz_scanner/__main__.py index bc937a7..7985460 100644 --- a/httpz_scanner/__main__.py +++ b/httpz_scanner/__main__.py @@ -4,8 +4,11 @@ import asyncio import sys + from .cli import main + + if __name__ == '__main__': try: asyncio.run(main()) diff --git a/httpz_scanner/cli.py b/httpz_scanner/cli.py index b069742..55e01ab 100644 --- a/httpz_scanner/cli.py +++ b/httpz_scanner/cli.py @@ -4,16 +4,19 @@ import argparse import asyncio +import json import logging import os import sys -import json + +from datetime import datetime from .colors import Colors +from .formatters import format_console_output +from .parsers import parse_status_codes, parse_shard from .scanner import HTTPZScanner from .utils import SILENT_MODE, info -from .parsers import parse_status_codes, parse_shard -from .formatters import format_console_output + def setup_logging(level='INFO', log_to_disk=False): ''' @@ -22,16 +25,16 @@ def setup_logging(level='INFO', log_to_disk=False): :param level: Logging level (INFO or DEBUG) :param log_to_disk: Whether to also log to file ''' + class ColoredFormatter(logging.Formatter): - def formatTime(self, record, datefmt=None): - # Format: MM-DD HH:MM - from datetime import datetime + def formatTime(self, record): dt = datetime.fromtimestamp(record.created) - return f"{Colors.GRAY}{dt.strftime('%m-%d %H:%M')}{Colors.RESET}" + return f'{Colors.GRAY}{dt.strftime("%m-%d %H:%M")}{Colors.RESET}' def format(self, record): return f'{self.formatTime(record)} {record.getMessage()}' + # Setup logging handlers handlers = [] # Console handler @@ -47,44 +50,39 @@ def setup_logging(level='INFO', log_to_disk=False): handlers.append(file_handler) # Setup logger - logging.basicConfig( - level=getattr(logging, level.upper()), - handlers=handlers - ) + logging.basicConfig(level=getattr(logging, level.upper()), handlers=handlers) + async def main(): - parser = argparse.ArgumentParser( - description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', - formatter_class=argparse.RawDescriptionHelpFormatter - ) + parser = argparse.ArgumentParser(description=f'{Colors.GREEN}Hyper-fast HTTP Scraping Tool{Colors.RESET}', formatter_class=argparse.RawDescriptionHelpFormatter) # Add arguments parser.add_argument('file', nargs='?', default='-', help='File containing domains to check (one per line), use - for stdin') parser.add_argument('-all', '--all-flags', action='store_true', help='Enable all output flags') - parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information') - parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks') - parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console') - parser.add_argument('-o', '--output', help='Output file path (JSONL format)') + parser.add_argument('-d', '--debug', action='store_true', help='Show error states and debug information') + parser.add_argument('-c', '--concurrent', type=int, default=100, help='Number of concurrent checks') + parser.add_argument('-j', '--jsonl', action='store_true', help='Output JSON Lines format to console') + parser.add_argument('-o', '--output', help='Output file path (JSONL format)') # Output field flags - parser.add_argument('-b', '--body', action='store_true', help='Show body preview') - parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records') - parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length') - parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type') - parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash') - parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)') - parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers') - parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses') - parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code') - parser.add_argument('-ti', '--title', action='store_true', help='Show page title') + parser.add_argument('-b', '--body', action='store_true', help='Show body preview') + parser.add_argument('-cn', '--cname', action='store_true', help='Show CNAME records') + parser.add_argument('-cl', '--content-length', action='store_true', help='Show content length') + parser.add_argument('-ct', '--content-type', action='store_true', help='Show content type') + parser.add_argument('-f', '--favicon', action='store_true', help='Show favicon hash') + parser.add_argument('-fr', '--follow-redirects', action='store_true', help='Follow redirects (max 10)') + parser.add_argument('-hr', '--headers', action='store_true', help='Show response headers') + parser.add_argument('-i', '--ip', action='store_true', help='Show IP addresses') + parser.add_argument('-sc', '--status-code', action='store_true', help='Show status code') + parser.add_argument('-ti', '--title', action='store_true', help='Show page title') parser.add_argument('-tls', '--tls-info', action='store_true', help='Show TLS certificate information') # Other arguments parser.add_argument('-ax', '--axfr', action='store_true', help='Try AXFR transfer against nameservers') parser.add_argument('-ec', '--exclude-codes', type=parse_status_codes, help='Exclude these status codes (comma-separated, e.g., 404,500)') parser.add_argument('-mc', '--match-codes', type=parse_status_codes, help='Only show these status codes (comma-separated, e.g., 200,301,404)') - parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter') - parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)') + parser.add_argument('-p', '--progress', action='store_true', help='Show progress counter') + parser.add_argument('-r', '--resolvers', help='File containing DNS resolvers (one per line)') parser.add_argument('-to', '--timeout', type=int, default=5, help='Request timeout in seconds') # Add shard argument @@ -176,9 +174,12 @@ async def main(): logging.error(f'Unexpected error: {str(e)}') sys.exit(1) + def run(): '''Entry point for the CLI''' asyncio.run(main()) + + if __name__ == '__main__': run() \ No newline at end of file diff --git a/httpz_scanner/colors.py b/httpz_scanner/colors.py index b5dbd41..e04ec0d 100644 --- a/httpz_scanner/colors.py +++ b/httpz_scanner/colors.py @@ -4,7 +4,8 @@ class Colors: '''ANSI color codes for terminal output''' - HEADER = '\033[95m' # Light purple + + HEADER = '\033[95m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' @@ -12,9 +13,9 @@ class Colors: BOLD = '\033[1m' UNDERLINE = '\033[4m' RESET = '\033[0m' - PURPLE = '\033[35m' # Dark purple - LIGHT_RED = '\033[38;5;203m' # Light red - DARK_GREEN = '\033[38;5;22m' # Dark green - PINK = '\033[38;5;198m' # Bright pink - GRAY = '\033[90m' # Gray color - CYAN = '\033[96m' # Cyan color \ No newline at end of file + PURPLE = '\033[35m' + LIGHT_RED = '\033[38;5;203m' + DARK_GREEN = '\033[38;5;22m' + PINK = '\033[38;5;198m' + GRAY = '\033[90m' + CYAN = '\033[96m' \ No newline at end of file diff --git a/httpz_scanner/dns.py b/httpz_scanner/dns.py index 769daf7..5a7c675 100644 --- a/httpz_scanner/dns.py +++ b/httpz_scanner/dns.py @@ -4,14 +4,23 @@ import asyncio import os -import aiohttp -import dns.asyncresolver -import dns.query -import dns.resolver -import dns.zone + +try: + import aiohttp +except ImportError: + raise ImportError('missing aiohttp library (pip install aiohttp)') + +try: + import dns.asyncresolver + import dns.query + import dns.resolver + import dns.zone +except ImportError: + raise ImportError('missing dnspython library (pip install dnspython)') from .utils import debug, info, SILENT_MODE + async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple: ''' Resolve all DNS records for a domain @@ -21,36 +30,35 @@ async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, :param nameserver: Specific nameserver to use :param check_axfr: Whether to attempt zone transfer ''' + + # Setup resolver resolver = dns.asyncresolver.Resolver() resolver.lifetime = timeout if nameserver: resolver.nameservers = [nameserver] - results = await asyncio.gather(*[resolver.resolve(domain, rtype) - for rtype in ('NS', 'A', 'AAAA', 'CNAME')], - return_exceptions=True) + # Resolve all DNS records + results = await asyncio.gather(*[resolver.resolve(domain, rtype) for rtype in ('NS', 'A', 'AAAA', 'CNAME')], return_exceptions=True) + # Parse results nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else [] - ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + \ - ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else []) - cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None - + ips = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else []) + cname = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None + + # Get NS IPs ns_ips = {} if nameservers: - ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) - for ns in nameservers - for rtype in ('A', 'AAAA')], - return_exceptions=True) + ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) for ns in nameservers for rtype in ('A', 'AAAA')], return_exceptions=True) for i, ns in enumerate(nameservers): - ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] - if isinstance(records, dns.resolver.Answer) - for ip in records] + ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] if isinstance(records, dns.resolver.Answer) for ip in records] + # Attempt zone transfer if check_axfr: await attempt_axfr(domain, ns_ips, timeout) return sorted(set(ips)), cname, nameservers, ns_ips + async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None: ''' Attempt zone transfer for a domain @@ -59,28 +67,37 @@ async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None: :param ns_ips: Dictionary of nameserver hostnames to their IPs :param timeout: Timeout in seconds ''' + try: os.makedirs('axfrout', exist_ok=True) - + + # Loop through each NS for ns_host, ips in ns_ips.items(): + # Loop through each NS IP for ns_ip in ips: try: + # Attempt zone transfer zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout)) + + # Write zone to file with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f: zone.to_text(f) + info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})') except Exception as e: debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}') except Exception as e: debug(f'Failed AXFR for {domain}: {str(e)}') + async def load_resolvers(resolver_file: str = None) -> list: ''' Load DNS resolvers from file or default source :param resolver_file: Path to file containing resolver IPs - :return: List of resolver IPs ''' + + # Load from file if resolver_file: try: with open(resolver_file) as f: @@ -90,6 +107,7 @@ async def load_resolvers(resolver_file: str = None) -> list: except Exception as e: debug(f'Error loading resolvers from {resolver_file}: {str(e)}') + # Load from GitHub async with aiohttp.ClientSession() as session: async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response: resolvers = await response.text() diff --git a/httpz_scanner/formatters.py b/httpz_scanner/formatters.py index 5e4a2ae..9264fcc 100644 --- a/httpz_scanner/formatters.py +++ b/httpz_scanner/formatters.py @@ -5,6 +5,7 @@ from .colors import Colors from .utils import human_size + def format_console_output(result: dict, debug: bool = False, show_fields: dict = None, match_codes: set = None, exclude_codes: set = None) -> str: ''' Format the output with colored sections diff --git a/httpz_scanner/parsers.py b/httpz_scanner/parsers.py index b0b3b07..c3efdbc 100644 --- a/httpz_scanner/parsers.py +++ b/httpz_scanner/parsers.py @@ -2,6 +2,8 @@ # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) # httpz_scanner/parsers.py +import argparse + try: import bs4 except ImportError: @@ -20,7 +22,6 @@ except ImportError: raise ImportError('missing mmh3 module (pip install mmh3)') from .utils import debug, error -import argparse def parse_domain_url(domain: str) -> tuple: @@ -188,7 +189,7 @@ def parse_title(html: str, content_type: str = None) -> str: :param html: HTML content of the page :param content_type: Content-Type header value ''' - + # Only parse title for HTML content if content_type and not any(x in content_type.lower() for x in ['text/html', 'application/xhtml']): return None diff --git a/httpz_scanner/scanner.py b/httpz_scanner/scanner.py index 6532e1c..27b0453 100644 --- a/httpz_scanner/scanner.py +++ b/httpz_scanner/scanner.py @@ -3,7 +3,6 @@ # httpz_scanner/scanner.py import asyncio -import json import random try: @@ -16,11 +15,9 @@ try: except ImportError: raise ImportError('missing bs4 module (pip install beautifulsoup4)') -from .dns import resolve_all_dns, load_resolvers -from .formatters import format_console_output -from .colors import Colors -from .parsers import parse_domain_url, get_cert_info, get_favicon_hash, parse_title -from .utils import debug, info, USER_AGENTS, input_generator +from .dns import resolve_all_dns, load_resolvers +from .parsers import parse_domain_url, get_cert_info, get_favicon_hash +from .utils import debug, USER_AGENTS, input_generator class HTTPZScanner: @@ -77,21 +74,26 @@ class HTTPZScanner: self.progress_count = 0 - async def init(self): - '''Initialize resolvers - must be called before scanning''' - self.resolvers = await load_resolvers(self.resolver_file) - - async def check_domain(self, session: aiohttp.ClientSession, domain: str): - '''Check a single domain and return results''' - nameserver = random.choice(self.resolvers) if self.resolvers else None - base_domain, port, protocols = parse_domain_url(domain) + ''' + Check a single domain and return results + :param session: aiohttp.ClientSession + :param domain: str + ''' + + # Get random nameserver + nameserver = random.choice(self.resolvers) if self.resolvers else None + + # Parse domain + base_domain, port, protocols = parse_domain_url(domain) + + # Initialize result dictionary result = { - 'domain' : base_domain, - 'status' : 0, - 'url' : protocols[0], - 'port' : port, + 'domain' : base_domain, + 'status' : 0, + 'url' : protocols[0], + 'port' : port, } # Try each protocol @@ -100,11 +102,7 @@ class HTTPZScanner: # Set random user agent for each request headers = {'User-Agent': random.choice(USER_AGENTS)} - async with session.get(url, timeout=self.timeout, - allow_redirects=self.follow_redirects, - max_redirects=10 if self.follow_redirects else 0, - headers=headers) as response: - + async with session.get(url, timeout=self.timeout, allow_redirects=self.follow_redirects, max_redirects=10 if self.follow_redirects else 0, headers=headers) as response: result['status'] = response.status # Bail immediately if it's a failed lookup - no point processing further @@ -196,7 +194,7 @@ class HTTPZScanner: ''' if not self.resolvers: - await self.init() + self.resolvers = await load_resolvers(self.resolver_file) async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: tasks = set() @@ -212,15 +210,16 @@ class HTTPZScanner: tasks, return_when=asyncio.FIRST_COMPLETED ) for task in done: - if result := await task: # Only yield if result is not None + if result := await task: if self.show_progress: count += 1 yield result task = asyncio.create_task(self.check_domain(session, domain)) tasks.add(task) + + # List/tuple input elif isinstance(input_source, (list, tuple)): - # List/tuple input for line_num, domain in enumerate(input_source): if domain := str(domain).strip(): if self.shard is None or line_num % self.shard[1] == self.shard[0]: @@ -229,7 +228,7 @@ class HTTPZScanner: tasks, return_when=asyncio.FIRST_COMPLETED ) for task in done: - if result := await task: # Only yield if result is not None + if result := await task: if self.show_progress: count += 1 yield result @@ -251,7 +250,7 @@ class HTTPZScanner: tasks, return_when=asyncio.FIRST_COMPLETED ) for task in done: - if result := await task: # Only yield if result is not None + if result := await task: if self.show_progress: count += 1 yield result @@ -264,7 +263,7 @@ class HTTPZScanner: if tasks: done, _ = await asyncio.wait(tasks) for task in done: - if result := await task: # Only yield if result is not None + if result := await task: if self.show_progress: count += 1 yield result \ No newline at end of file diff --git a/httpz_scanner/utils.py b/httpz_scanner/utils.py index 84cb74f..792641c 100644 --- a/httpz_scanner/utils.py +++ b/httpz_scanner/utils.py @@ -117,7 +117,7 @@ async def input_generator(input_source, shard: tuple = None): # Handle stdin if input_source == '-' or input_source is None: for line in sys.stdin: - await asyncio.sleep(0) # Yield control + await asyncio.sleep(0) if line := line.strip(): if shard is None or line_num % shard[1] == shard[0]: yield line @@ -127,7 +127,7 @@ async def input_generator(input_source, shard: tuple = None): elif isinstance(input_source, str) and os.path.exists(input_source): with open(input_source, 'r') as f: for line in f: - await asyncio.sleep(0) # Yield control + await asyncio.sleep(0) if line := line.strip(): if shard is None or line_num % shard[1] == shard[0]: yield line @@ -136,7 +136,7 @@ async def input_generator(input_source, shard: tuple = None): # Handle iterables (generators, lists, etc) elif hasattr(input_source, '__iter__') and not isinstance(input_source, (str, bytes)): for line in input_source: - await asyncio.sleep(0) # Yield control + await asyncio.sleep(0) if isinstance(line, bytes): line = line.decode() if line := line.strip(): @@ -149,7 +149,7 @@ async def input_generator(input_source, shard: tuple = None): if isinstance(input_source, bytes): input_source = input_source.decode() for line in input_source.splitlines(): - await asyncio.sleep(0) # Yield control + await asyncio.sleep(0) if line := line.strip(): if shard is None or line_num % shard[1] == shard[0]: yield line diff --git a/setup.py b/setup.py index 50a6be8..8c1954e 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ with open('README.md', 'r', encoding='utf-8') as f: setup( name='httpz_scanner', - version='2.0.11', + version='2.1.0', author='acidvegas', author_email='acid.vegas@acid.vegas', description='Hyper-fast HTTP Scraping Tool', diff --git a/unit_test.py b/unit_test.py new file mode 100644 index 0000000..68ef402 --- /dev/null +++ b/unit_test.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# HTTPZ Web Scanner - Unit Tests +# unit_test.py + +import asyncio +import logging +import sys + +try: + from httpz_scanner import HTTPZScanner + from httpz_scanner.colors import Colors +except ImportError: + raise ImportError('missing httpz_scanner library (pip install httpz_scanner)') + + +class ColoredFormatter(logging.Formatter): + '''Custom formatter for colored log output''' + + def format(self, record): + if record.levelno == logging.INFO: + color = Colors.GREEN + elif record.levelno == logging.WARNING: + color = Colors.YELLOW + elif record.levelno == logging.ERROR: + color = Colors.RED + else: + color = Colors.RESET + + record.msg = f'{color}{record.msg}{Colors.RESET}' + return super().format(record) + + +# Configure logging with colors +logger = logging.getLogger() +handler = logging.StreamHandler() +handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s')) +logger.setLevel(logging.INFO) +logger.addHandler(handler) + + +async def get_domains_from_url(): + ''' + Fetch domains from SecLists URL + + :return: List of domains + ''' + + try: + import aiohttp + except ImportError: + raise ImportError('missing aiohttp library (pip install aiohttp)') + + url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt' + + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + content = await response.text() + return [line.strip() for line in content.splitlines() if line.strip()] + + +async def domain_generator(domains): + ''' + Async generator that yields domains + + :param domains: List of domains to yield + ''' + + for domain in domains: + await asyncio.sleep(0) # Allow other coroutines to run + yield domain + + +async def test_list_input(domains): + ''' + Test scanning using a list input + + :param domains: List of domains to scan + ''' + + logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}') + scanner = HTTPZScanner(concurrent_limit=20, timeout=3, show_progress=True, debug_mode=True) + + count = 0 + async for result in scanner.scan(domains): + if result: + count += 1 + status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED + logging.info(f'List Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}') + + +async def test_generator_input(domains): + ''' + Test scanning using an async generator input + + :param domains: List of domains to generate from + ''' + + logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}') + scanner = HTTPZScanner(concurrent_limit=20, timeout=3, show_progress=True, debug_mode=True) + + count = 0 + async for result in scanner.scan(domain_generator(domains)): + if result: + count += 1 + status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED + logging.info(f'Generator Result {count}: {Colors.CYAN}{result["domain"]}{Colors.RESET} - Status: {status_color}{result["status"]}{Colors.RESET}') + + +async def main() -> None: + '''Main test function''' + + try: + # Fetch domains + domains = await get_domains_from_url() + logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing') + + # Run tests + await test_list_input(domains) + await test_generator_input(domains) + + logging.info(f'{Colors.GREEN}All tests completed successfully!{Colors.RESET}') + + except Exception as e: + logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}') + sys.exit(1) + + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}') + sys.exit(1) \ No newline at end of file