Get certificate info for expired and invalid and self signed certs also

This commit is contained in:
Dionysus 2025-02-10 19:55:49 -05:00
parent 7e2b90a92b
commit 73e3f07cdf
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE

299
httpz.py
View File

@ -109,45 +109,78 @@ def load_resolvers(resolver_file: str = None) -> list:
return ['1.1.1.1', '8.8.8.8', '9.9.9.9']
async def resolve_dns(domain: str, timeout: int = 5, nameserver: str = None) -> tuple:
async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None) -> tuple:
'''
Resolve A, AAAA, and CNAME records for a domain
Resolve all DNS records (NS, A, AAAA, CNAME) for a domain
:param domain: domain to resolve
:param timeout: timeout in seconds
:param nameserver: specific nameserver to use
:return: tuple of (ips, cname)
:param domain: Domain to resolve
:param timeout: Timeout in seconds
:param nameserver: Specific nameserver to use
:return: Tuple of (ips, cname, nameservers, ns_ips)
'''
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
if nameserver:
resolver.nameservers = [nameserver]
ips = []
cname = None
try:
# Check for CNAME first
cname_result = await resolver.resolve(domain, 'CNAME')
cname = str(cname_result[0].target).rstrip('.')
except Exception:
pass
try:
# Query A records
a_result = await resolver.resolve(domain, 'A')
ips.extend(str(ip) for ip in a_result)
except Exception as e:
debug(f'Error resolving A records for {domain}: {str(e)}')
try:
# Query AAAA records
aaaa_result = await resolver.resolve(domain, 'AAAA')
ips.extend(str(ip) for ip in aaaa_result)
except Exception as e:
debug(f'Error resolving AAAA records for {domain}: {str(e)}')
return sorted(set(ips)), cname
# Prepare all DNS queries
tasks = [
resolver.resolve(domain, 'NS'),
resolver.resolve(domain, 'A'),
resolver.resolve(domain, 'AAAA'),
resolver.resolve(domain, 'CNAME')
]
# Run all queries concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
nameservers = []
ips = []
cname = None
ns_ips = {}
# NS records
if isinstance(results[0], dns.resolver.Answer):
nameservers = [str(ns).rstrip('.') for ns in results[0]]
# Resolve each nameserver's IPs
ns_tasks = []
for ns in nameservers:
ns_tasks.extend([
resolver.resolve(ns, 'A'),
resolver.resolve(ns, 'AAAA')
])
if ns_tasks:
ns_results = await asyncio.gather(*ns_tasks, return_exceptions=True)
for i in range(0, len(ns_results), 2):
ns = nameservers[i//2]
ns_ips[ns] = []
# A records
if isinstance(ns_results[i], dns.resolver.Answer):
ns_ips[ns].extend(str(ip) for ip in ns_results[i])
# AAAA records
if isinstance(ns_results[i+1], dns.resolver.Answer):
ns_ips[ns].extend(str(ip) for ip in ns_results[i+1])
# A records
if isinstance(results[1], dns.resolver.Answer):
ips.extend(str(ip) for ip in results[1])
# AAAA records
if isinstance(results[2], dns.resolver.Answer):
ips.extend(str(ip) for ip in results[2])
# CNAME records
if isinstance(results[3], dns.resolver.Answer):
cname = str(results[3][0].target).rstrip('.')
return sorted(set(ips)), cname, nameservers, ns_ips
async def get_favicon_hash(session: aiohttp.ClientSession, base_url: str, html: str) -> str:
@ -200,39 +233,65 @@ async def get_cert_info(session: aiohttp.ClientSession, url: str) -> dict:
Get SSL certificate information for a domain
:param session: aiohttp client session
:param url: URL to check
:param url: URL to get certificate info from
'''
try:
async with session.get(url, timeout=10) as response:
# Get the SSL context from the connection
ssl_object = response.connection.transport.get_extra_info('ssl_object')
if not ssl_object:
return None
# Create a new connector that doesn't verify certificates
connector = aiohttp.TCPConnector(ssl=False, enable_cleanup_closed=True)
# Create a new session just for cert info to avoid SSL verification
async with aiohttp.ClientSession(connector=connector) as cert_session:
async with cert_session.get(url) as response:
# Get the SSL context from the connection
ssl_object = response._protocol.transport.get_extra_info('ssl_object')
cert_bin = ssl_object.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(cert_bin)
# Get certificate details
cert_info = {
'fingerprint' : cert.fingerprint(hashes.SHA256()).hex(),
'subject' : cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
'issuer' : cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
'alt_names' : [],
'not_before' : cert.not_valid_before_utc.isoformat(),
'not_after' : cert.not_valid_after_utc.isoformat()
}
# Get Subject Alternative Names
try:
ext = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
cert_info['alt_names'] = [name.value for name in ext.value]
except x509.ExtensionNotFound:
pass
return cert_info
if not ssl_object:
return None
# Get the certificate in DER format
cert_der = ssl_object.getpeercert(binary_form=True)
if not cert_der:
return None
# Load and parse the certificate
cert = x509.load_der_x509_certificate(cert_der)
# Get connection info
peername = response._protocol.transport.get_extra_info('peername')
port = peername[1] if peername else None
# Extract all subject alternative names
try:
san_extension = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
alt_names = [name.value for name in san_extension.value] if san_extension else []
except x509.extensions.ExtensionNotFound:
alt_names = []
# Get subject CN
try:
subject = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except IndexError:
subject = None
# Get issuer CN
try:
issuer = cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
except IndexError:
issuer = None
return {
'fingerprint': cert.fingerprint(hashes.SHA256()).hex(),
'subject': subject,
'issuer': issuer,
'alt_names': alt_names,
'not_before': cert.not_valid_before_utc.isoformat(),
'not_after': cert.not_valid_after_utc.isoformat(),
'port': port,
'version': cert.version.value,
'serial_number': format(cert.serial_number, 'x'),
}
except Exception as e:
debug(f'Error getting certificate info for {url}: {str(e)}')
debug(f'Error getting cert info for {url}: {str(e)}')
return None
@ -264,7 +323,7 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
'title' : None,
'body' : None,
'content_type' : None,
'url' : f"https://{base_domain}",
'url' : f'https://{base_domain}',
'ips' : [],
'cname' : None,
'nameservers' : [],
@ -275,19 +334,16 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
'tls' : None
}
# Resolve DNS records with chosen nameserver
result['ips'], result['cname'] = await resolve_dns(base_domain, timeout, nameserver)
# Do all DNS lookups at once
ips, cname, nameservers, ns_ips = await resolve_all_dns(base_domain, timeout, nameserver)
# After DNS resolution, add nameserver lookup:
try:
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
if nameserver:
resolver.nameservers = [nameserver]
ns_records = await resolver.resolve(base_domain, 'NS')
result['nameservers'] = [str(ns).rstrip('.') for ns in ns_records]
except Exception as e:
debug(f'Error getting nameservers for {base_domain}: {str(e)}')
result['ips'] = ips
result['cname'] = cname
result['nameservers'] = nameservers
# Try AXFR if enabled (using already resolved nameserver IPs)
if check_axfr:
await try_axfr(base_domain, ns_ips, timeout)
for protocol in protocols:
url = f'{protocol}{base_domain}'
@ -306,9 +362,9 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
if result['redirect_chain']:
result['redirect_chain'].append(str(response.url))
# Get TLS info if HTTPS
if url.startswith('https://'):
result['tls'] = await get_cert_info(session, url)
# Try to get cert info for any successful HTTPS connection
if response.url.scheme == 'https':
result['tls'] = await get_cert_info(session, str(response.url))
if response.status == 200:
html = (await response.text())[:1024*1024]
@ -326,9 +382,6 @@ async def check_domain(session: aiohttp.ClientSession, domain: str, follow_redir
result['status'] = -1
continue
if check_axfr:
await try_axfr(base_domain, timeout)
return result
@ -360,8 +413,8 @@ def human_size(size_bytes: int) -> str:
if not size_bytes:
return '0B'
units = ('B', 'KB', 'MB', 'GB')
size = float(size_bytes)
units = ('B', 'KB', 'MB', 'GB')
size = float(size_bytes)
unit_index = 0
while size >= 1024 and unit_index < len(units) - 1:
@ -467,16 +520,27 @@ def format_status_output(result: dict, debug: bool = False, show_fields: dict =
chain = ' -> '.join(result['redirect_chain'])
parts.append(f"{Colors.YELLOW}[Redirects: {chain}]{Colors.RESET}")
# TLS Certificate Info
if show_fields['tls'] and result['tls']:
# TLS Certificate Info - Modified to always show if available
if result['tls']:
cert = result['tls']
tls_parts = []
tls_parts.append(f"Fingerprint: {cert['fingerprint']}")
tls_parts.append(f"Subject: {cert['subject']}")
tls_parts.append(f"Issuer: {cert['issuer']}")
if cert['alt_names']:
tls_parts.append(f"SANs: {', '.join(cert['alt_names'])}")
tls_parts.append(f"Valid: {cert['not_before']} to {cert['not_after']}")
if cert.get('subject'):
tls_parts.append(f"Subject: {cert['subject']}")
if cert.get('issuer'):
tls_parts.append(f"Issuer: {cert['issuer']}")
if cert.get('fingerprint'):
tls_parts.append(f"Fingerprint: {cert['fingerprint'][:16]}...")
if cert.get('alt_names'):
tls_parts.append(f"SANs: {', '.join(cert['alt_names'][:3])}")
if cert.get('not_before') and cert.get('not_after'):
tls_parts.append(f"Valid: {cert['not_before'].split('T')[0]} to {cert['not_after'].split('T')[0]}")
if cert.get('port'):
tls_parts.append(f"Port: {cert['port']}")
if cert.get('version'):
tls_parts.append(f"Version: {cert['version']}")
if cert.get('serial_number'):
tls_parts.append(f"Serial: {cert['serial_number'][:16]}...")
parts.append(f"{Colors.GREEN}[{' | '.join(tls_parts)}]{Colors.RESET}")
return ' '.join(parts)
@ -557,7 +621,7 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
if show_progress:
info(f"{Colors.GRAY}[{processed_domains}]{Colors.RESET} {formatted}")
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
# Start initial batch of tasks
for domain in itertools.islice(domain_generator(input_source), concurrent_limit):
task = asyncio.create_task(check_domain(session, domain, follow_redirects=show_fields['follow_redirects'], timeout=timeout, check_axfr=check_axfr, resolvers=resolvers))
@ -586,56 +650,29 @@ async def process_domains(input_source: str = None, debug: bool = False, concurr
await write_result(result)
async def try_axfr(domain: str, timeout: int = 5) -> None:
async def try_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
'''
Try AXFR transfer for a domain against all its nameservers
Try AXFR transfer using pre-resolved nameserver IPs
:param domain: Domain to attempt AXFR transfer
:param timeout: timeout in seconds
:param ns_ips: Dictionary of nameserver hostnames to their IPs
:param timeout: Timeout in seconds
'''
try:
# Ensure output directory exists
os.makedirs('axfrout', exist_ok=True)
# Get nameservers
resolver = dns.asyncresolver.Resolver()
resolver.lifetime = timeout
ns_records = await resolver.resolve(domain, 'NS')
nameservers = [str(ns).rstrip('.') for ns in ns_records]
# Try AXFR against each nameserver's IPs
for ns_host in nameservers:
try:
# Get A records
a_ips = []
for ns_host, ips in ns_ips.items():
for ns_ip in ips:
try:
a_records = await resolver.resolve(ns_host, 'A')
a_ips.extend(str(ip) for ip in a_records)
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
filename = f'axfrout/{domain}_{ns_ip}.zone'
with open(filename, 'w') as f:
zone.to_text(f)
info(f'{Colors.RED}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}')
except Exception as e:
debug(f'Failed to get A records for {ns_host}: {str(e)}')
# Get AAAA records
try:
aaaa_records = await resolver.resolve(ns_host, 'AAAA')
a_ips.extend(str(ip) for ip in aaaa_records)
except Exception as e:
debug(f'Failed to get AAAA records for {ns_host}: {str(e)}')
# Try AXFR against each IP
for ns_ip in a_ips:
try:
zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout+10))
filename = f'axfrout/{domain}_{ns_ip}.zone'
with open(filename, 'w') as f:
zone.to_text(f)
info(f'{Colors.GREEN}[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip}){Colors.RESET}')
except Exception as e:
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e:
debug(f'Failed to resolve {ns_host}: {str(e)}')
debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
except Exception as e:
debug(f'Failed to get nameservers for {domain}: {str(e)}')
debug(f'Failed AXFR for {domain}: {str(e)}')
def main():