From 73e3f07cdfd9c86487deaeed85735b20c7bb4be8 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Mon, 10 Feb 2025 19:55:49 -0500 Subject: [PATCH] Get certificate info for expired and invalid and self signed certs also --- httpz.py | 299 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 168 insertions(+), 131 deletions(-) diff --git a/httpz.py b/httpz.py index 50766bc..78bc322 100644 --- a/httpz.py +++ b/httpz.py @@ -109,45 +109,78 @@ def load_resolvers(resolver_file: str = None) -> list: return ['1.1.1.1', '8.8.8.8', '9.9.9.9'] -async def resolve_dns(domain: str, timeout: int = 5, nameserver: str = None) -> tuple: +async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None) -> tuple: ''' - Resolve A, AAAA, and CNAME records for a domain + Resolve all DNS records (NS, A, AAAA, CNAME) for a domain - :param domain: domain to resolve - :param timeout: timeout in seconds - :param nameserver: specific nameserver to use - :return: tuple of (ips, cname) + :param domain: Domain to resolve + :param timeout: Timeout in seconds + :param nameserver: Specific nameserver to use + :return: Tuple of (ips, cname, nameservers, ns_ips) ''' - + resolver = dns.asyncresolver.Resolver() resolver.lifetime = timeout if nameserver: resolver.nameservers = [nameserver] - ips = [] - cname = None - - try: - # Check for CNAME first - cname_result = await resolver.resolve(domain, 'CNAME') - cname = str(cname_result[0].target).rstrip('.') - except Exception: - pass - - try: - # Query A records - a_result = await resolver.resolve(domain, 'A') - ips.extend(str(ip) for ip in a_result) - except Exception as e: - debug(f'Error resolving A records for {domain}: {str(e)}') - - try: - # Query AAAA records - aaaa_result = await resolver.resolve(domain, 'AAAA') - ips.extend(str(ip) for ip in aaaa_result) - except Exception as e: - debug(f'Error resolving AAAA records for {domain}: {str(e)}') - - return sorted(set(ips)), cname + + # Prepare all DNS queries + tasks = [ + resolver.resolve(domain, 'NS'), + resolver.resolve(domain, 'A'), + resolver.resolve(domain, 'AAAA'), + resolver.resolve(domain, 'CNAME') + ] + + # Run all queries concurrently + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results + nameservers = [] + ips = [] + cname = None + ns_ips = {} + + # NS records + if isinstance(results[0], dns.resolver.Answer): + nameservers = [str(ns).rstrip('.') for ns in results[0]] + + # Resolve each nameserver's IPs + ns_tasks = [] + for ns in nameservers: + ns_tasks.extend([ + resolver.resolve(ns, 'A'), + resolver.resolve(ns, 'AAAA') + ]) + + if ns_tasks: + ns_results = await asyncio.gather(*ns_tasks, return_exceptions=True) + + for i in range(0, len(ns_results), 2): + ns = nameservers[i//2] + ns_ips[ns] = [] + + # A records + if isinstance(ns_results[i], dns.resolver.Answer): + ns_ips[ns].extend(str(ip) for ip in ns_results[i]) + + # AAAA records + if isinstance(ns_results[i+1], dns.resolver.Answer): + ns_ips[ns].extend(str(ip) for ip in ns_results[i+1]) + + # A records + if isinstance(results[1], dns.resolver.Answer): + ips.extend(str(ip) for ip in results[1]) + + # AAAA records + if isinstance(results[2], dns.resolver.Answer): + ips.extend(str(ip) for ip in results[2]) + + # CNAME records + if isinstance(results[3], dns.resolver.Answer): + cname = str(results[3][0].target).rstrip('.') + + return sorted(set(ips)), cname, nameservers, ns_ips async def get_favicon_hash(session: aiohttp.ClientSession, base_url: str, html: str) -> str: @@ -200,39 +233,65 @@ async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict: Get SSL certificate information for a domain :param session: aiohttp client session - :param url: URL to check + :param url: URL to get certificate info from ''' - try: - async with session.get(url, timeout=10) as response: - # Get the SSL context from the connection - ssl_object = response.connection.transport.get_extra_info('ssl_object') - if not ssl_object: - return None + # Create a new connector that doesn't verify certificates + connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True) + + # Create a new session just for cert info to avoid SSL verification + async with aiohttp.ClientSession(connector=connector) as cert_session: + async with cert_session.get(url) as response: + # Get the SSL context from the connection + ssl_object = response._protocol.transport.get_extra_info('ssl_object') - cert_bin = ssl_object.getpeercert(binary_form=True) - cert = x509.load_der_x509_certificate(cert_bin) - - # Get certificate details - cert_info = { - 'fingerprint' : cert.fingerprint(hashes.SHA256()).hex(), - 'subject' : cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, - 'issuer' : cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, - 'alt_names' : [], - 'not_before' : cert.not_valid_before_utc.isoformat(), - 'not_after' : cert.not_valid_after_utc.isoformat() - } - - # Get Subject Alternative Names - try: - ext = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) - cert_info['alt_names'] = [name.value for name in ext.value] - except x509.ExtensionNotFound: - pass - - return cert_info + if not ssl_object: + return None + + # Get the certificate in DER format + cert_der = ssl_object.getpeercert(binary_form=True) + if not cert_der: + return None + + # Load and parse the certificate + cert = x509.load_der_x509_certificate(cert_der) + + # Get connection info + peername = response._protocol.transport.get_extra_info('peername') + port = peername[1] if peername else None + + # Extract all subject alternative names + try: + san_extension = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) + alt_names = [name.value for name in san_extension.value] if san_extension else [] + except x509.extensions.ExtensionNotFound: + alt_names = [] + + # Get subject CN + try: + subject = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + except IndexError: + subject = None + + # Get issuer CN + try: + issuer = cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + except IndexError: + issuer = None + + return { + 'fingerprint': cert.fingerprint(hashes.SHA256()).hex(), + 'subject': subject, + 'issuer': issuer, + 'alt_names': alt_names, + 'not_before': cert.not_valid_before_utc.isoformat(), + 'not_after': cert.not_valid_after_utc.isoformat(), + 'port': port, + 'version': cert.version.value, + 'serial_number': format(cert.serial_number, 'x'), + } except Exception as e: - debug(f'Error getting certificate info for {url}: {str(e)}') + debug(f'Error getting cert info for {url}: {str(e)}') return None @@ -264,7 +323,7 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir 'title' : None, 'body' : None, 'content_type' : None, - 'url' : f"https://{base_domain}", + 'url' : f'https://{base_domain}', 'ips' : [], 'cname' : None, 'nameservers' : [], @@ -275,19 +334,16 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir 'tls' : None } - # Resolve DNS records with chosen nameserver - result['ips'], result['cname'] = await resolve_dns(base_domain, timeout, nameserver) + # Do all DNS lookups at once + ips, cname, nameservers, ns_ips = await resolve_all_dns(base_domain, timeout, nameserver) - # After DNS resolution, add nameserver lookup: - try: - resolver = dns.asyncresolver.Resolver() - resolver.lifetime = timeout - if nameserver: - resolver.nameservers = [nameserver] - ns_records = await resolver.resolve(base_domain, 'NS') - result['nameservers'] = [str(ns).rstrip('.') for ns in ns_records] - except Exception as e: - debug(f'Error getting nameservers for {base_domain}: {str(e)}') + result['ips'] = ips + result['cname'] = cname + result['nameservers'] = nameservers + + # Try AXFR if enabled (using already resolved nameserver IPs) + if check_axfr: + await try_axfr(base_domain, ns_ips, timeout) for protocol in protocols: url = f'{protocol}{base_domain}' @@ -306,9 +362,9 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir if result['redirect_chain']: result['redirect_chain'].append(str(response.url)) - # Get TLS info if HTTPS - if url.startswith('https://'): - result['tls'] = await get_cert_info(session, url) + # Try to get cert info for any successful HTTPS connection + if response.url.scheme == 'https': + result['tls'] = await get_cert_info(session, str(response.url)) if response.status == 200: html = (await response.text())[:1024*1024] @@ -326,9 +382,6 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir result['status'] = -1 continue - if check_axfr: - await try_axfr(base_domain, timeout) - return result @@ -360,8 +413,8 @@ def human_size(size_bytes: int) -> str: if not size_bytes: return '0B' - units = ('B', 'KB', 'MB', 'GB') - size = float(size_bytes) + units = ('B', 'KB', 'MB', 'GB') + size = float(size_bytes) unit_index = 0 while size >= 1024 and unit_index < len(units) - 1: @@ -467,16 +520,27 @@ def format_status_output(result: dict, debug: bool = False, show_fields: dict = chain = ' -> '.join(result['redirect_chain']) parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}") - # TLS Certificate Info - if show_fields['tls'] and result['tls']: + # TLS Certificate Info - Modified to always show if available + if result['tls']: cert = result['tls'] tls_parts = [] - tls_parts.append(f"Fingerprint: {cert['fingerprint']}") - tls_parts.append(f"Subject: {cert['subject']}") - tls_parts.append(f"Issuer: {cert['issuer']}") - if cert['alt_names']: - tls_parts.append(f"SANs: {', '.join(cert['alt_names'])}") - tls_parts.append(f"Valid: {cert['not_before']} to {cert['not_after']}") + if cert.get('subject'): + tls_parts.append(f"Subject: {cert['subject']}") + if cert.get('issuer'): + tls_parts.append(f"Issuer: {cert['issuer']}") + if cert.get('fingerprint'): + tls_parts.append(f"Fingerprint: {cert['fingerprint'][:16]}...") + if cert.get('alt_names'): + tls_parts.append(f"SANs: {', '.join(cert['alt_names'][:3])}") + if cert.get('not_before') and cert.get('not_after'): + tls_parts.append(f"Valid: {cert['not_before'].split('T')[0]} to {cert['not_after'].split('T')[0]}") + if cert.get('port'): + tls_parts.append(f"Port: {cert['port']}") + if cert.get('version'): + tls_parts.append(f"Version: {cert['version']}") + if cert.get('serial_number'): + tls_parts.append(f"Serial: {cert['serial_number'][:16]}...") + parts.append(f"{Colors.GREEN}[{' | '.join(tls_parts)}]{Colors.RESET}") return ' '.join(parts) @@ -557,7 +621,7 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr if show_progress: info(f"{Colors.GRAY}[{processed_domains}]{Colors.RESET} {formatted}") - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: # Start initial batch of tasks for domain in itertools.islice(domain_generator(input_source), concurrent_limit): task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout, check_axfr=check_axfr, resolvers=resolvers)) @@ -586,56 +650,29 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr await write_result(result) -async def try_axfr(domain: str, timeout: int = 5) -> None: +async def try_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None: ''' - Try AXFR transfer for a domain against all its nameservers + Try AXFR transfer using pre-resolved nameserver IPs :param domain: Domain to attempt AXFR transfer - :param timeout: timeout in seconds + :param ns_ips: Dictionary of nameserver hostnames to their IPs + :param timeout: Timeout in seconds ''' - try: - # Ensure output directory exists os.makedirs('axfrout', exist_ok=True) - # Get nameservers - resolver = dns.asyncresolver.Resolver() - resolver.lifetime = timeout - ns_records = await resolver.resolve(domain, 'NS') - nameservers = [str(ns).rstrip('.') for ns in ns_records] - - # Try AXFR against each nameserver's IPs - for ns_host in nameservers: - try: - # Get A records - a_ips = [] + for ns_host, ips in ns_ips.items(): + for ns_ip in ips: try: - a_records = await resolver.resolve(ns_host, 'A') - a_ips.extend(str(ip) for ip in a_records) + zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout)) + filename = f'axfrout/{domain}_{ns_ip}.zone' + with open(filename, 'w') as f: + zone.to_text(f) + info(f'{Colors.RED}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}') except Exception as e: - debug(f'Failed to get A records for {ns_host}: {str(e)}') - - # Get AAAA records - try: - aaaa_records = await resolver.resolve(ns_host, 'AAAA') - a_ips.extend(str(ip) for ip in aaaa_records) - except Exception as e: - debug(f'Failed to get AAAA records for {ns_host}: {str(e)}') - - # Try AXFR against each IP - for ns_ip in a_ips: - try: - zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout+10)) - filename = f'axfrout/{domain}_{ns_ip}.zone' - with open(filename, 'w') as f: - zone.to_text(f) - info(f'{Colors.GREEN}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}') - except Exception as e: - debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}') - except Exception as e: - debug(f'Failed to resolve {ns_host}: {str(e)}') + debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}') except Exception as e: - debug(f'Failed to get nameservers for {domain}: {str(e)}') + debug(f'Failed AXFR for {domain}: {str(e)}') def main():