From b40c930d67f94c85a0420a58f68bf3ce8a150f1e Mon Sep 17 00:00:00 2001 From: acidvegas Date: Mon, 10 Feb 2025 20:51:36 -0500 Subject: [PATCH] Added port to json output --- httpz.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/httpz.py b/httpz.py index 10a537e..ef649de 100644 --- a/httpz.py +++ b/httpz.py @@ -236,29 +236,20 @@ async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict: :param url: URL to get certificate info from ''' try: - # Create a new connector that doesn't verify certificates connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True) - # Create a new session just for cert info to avoid SSL verification async with aiohttp.ClientSession(connector=connector) as cert_session: async with cert_session.get(url) as response: - # Get the SSL context from the connection ssl_object = response._protocol.transport.get_extra_info('ssl_object') if not ssl_object: return None - # Get the certificate in DER format cert_der = ssl_object.getpeercert(binary_form=True) if not cert_der: return None - # Load and parse the certificate cert = x509.load_der_x509_certificate(cert_der) - - # Get connection info - peername = response._protocol.transport.get_extra_info('peername') - port = peername[1] if peername else None # Extract all subject alternative names try: @@ -286,7 +277,6 @@ async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict: 'alt_names': alt_names, 'not_before': cert.not_valid_before_utc.isoformat(), 'not_after': cert.not_valid_after_utc.isoformat(), - 'port': port, 'version': cert.version.value, 'serial_number': format(cert.serial_number, 'x'), } @@ -335,18 +325,22 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir port = int(port_str.split('/')[0]) except ValueError: port = 443 # Default to HTTPS port + else: + port = 443 # Default to HTTPS port when no port specified protocols = [ f'https://{base_domain}{":" + str(port) if port else ""}', f'http://{base_domain}{":" + str(port) if port else ""}' ] - result = { + result = {} # Start with empty dict + base_result = { 'domain' : base_domain, 'status' : 0, 'title' : None, 'body' : None, 'content_type' : None, - 'url' : protocols[0], # Use first protocol as default URL + 'url' : protocols[0], + 'port' : port, # Set port here 'ips' : [], 'cname' : None, 'nameservers' : [], @@ -356,6 +350,7 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir 'redirect_chain' : [], 'tls' : None } + result.update(base_result) # Update result with base fields # Do all DNS lookups at once ips, cname, nameservers, ns_ips = await resolve_all_dns(base_domain, timeout, nameserver) @@ -404,6 +399,10 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir result['status'] = -1 continue + # Make absolutely sure port is in the result before returning + if 'port' not in result: + result['port'] = port + return result @@ -556,8 +555,6 @@ def format_status_output(result: dict, debug: bool = False, show_fields: dict = tls_parts.append(f"SANs: {', '.join(cert['alt_names'][:3])}") if cert.get('not_before') and cert.get('not_after'): tls_parts.append(f"Valid: {cert['not_before'].split('T')[0]} to {cert['not_after'].split('T')[0]}") - if cert.get('port'): - tls_parts.append(f"Port: {cert['port']}") if cert.get('version'): tls_parts.append(f"Version: {cert['version']}") if cert.get('serial_number'): @@ -602,8 +599,13 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr '''Write a single result to the output file''' nonlocal processed_domains - # Create JSON output dict - output_dict = {'url': result['url'], 'domain': result['domain'], 'status': result['status']} + # Create JSON output dict with required fields + output_dict = { + 'url': result['url'], + 'domain': result['domain'], + 'status': result['status'], + 'port': result['port'] # Always include port in output + } # Add optional fields if they exist if result['title']: