httpz/httpz.py

282 lines
10 KiB
Python

#!/usr/bin/env python
# HTTPZ Crawler - Developed by acidvegas in Python (https://git.acid.vegas/httpz)
'''
BCUZ FUCK HTTPX PYTHON STILL GO HARD
'''
import argparse
import asyncio
import json
import random
import re
import logging
import ssl
import urllib.request
try:
import aiodns
except ImportError:
print('Missing required module \'aiodns\'. (pip install aiodns)')
exit(1)
try:
import aiohttp
except ImportError:
print('Missing required module \'aiohttp\'. (pip install aiohttp)')
exit(1)
# ANSI escape codes for colors
BLUE = '\033[34m'
CYAN = '\033[36m'
RED = '\033[91m'
GREEN = '\033[92m'
DARK_GREY = '\033[90m'
YELLOW = '\033[93m'
RESET = '\033[0m'
# Globals
DNS_SERVERS = None
args = None # Global args variable
def vlog(msg: str):
'''
Verbose logging only if enabled
:param msg: Message to print to console
'''
if args.verbose:
logging.info(msg)
def create_session(user_agent: str, timeout: int, proxy: str = None) -> dict:
'''
Create a custom aiohttp session
:param user_agent: User agent to use for HTTP requests
:param timeout: Timeout for HTTP requests
'''
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
headers = {'User-Agent': user_agent}
connector = aiohttp.TCPConnector(ssl=ssl_context)
session_params = {
'connector': connector,
'headers': headers,
'timeout': aiohttp.ClientTimeout(total=timeout)
}
return session_params
def get_dns_servers() -> dict:
'''Get a list of DNS servers to use for lookups.'''
with urllib.request.urlopen('https://public-dns.info/nameservers.txt') as source:
results = source.read().decode().split('\n')
v4_servers = [server for server in results if ':' not in server]
v6_servers = [server for server in results if ':' in server]
return {'4': v4_servers, '6': v6_servers}
async def dns_lookup(domain: str, record_type: str, timeout: int, retry: int) -> list:
'''
Resolve DNS information from a domain
:param domain: Domain name to resolve
:param record_type: DNS record type to resolve
:param timeout: Timeout for DNS request
:param retry: Number of times to retry failed requests
'''
for i in range(retry):
try:
version = '4' if record_type == 'A' else '6' if record_type == 'AAAA' else random.choice(['4','6'])
nameserver = random.choice(DNS_SERVERS[version])
resolver = aiodns.DNSResolver(nameservers=[nameserver], timeout=timeout)
records = await resolver.query(domain, record_type)
return records.cname if record_type == 'CNAME' else [record.host for record in records]
except Exception as e:
vlog(f'{RED}[ERROR]{RESET} {domain} - Failed to resolve {record_type} record using {nameserver} {DARK_GREY}({str(e)}){RESET}')
return []
async def get_body(source: str, preview: int) -> str:
'''
Get the body of a webpage
:param source: HTML source of the webpage
:param preview: Number of bytes to preview
'''
body_content = re.search(r'<body.*?>(.*?)</body>', source[:5000], re.DOTALL | re.IGNORECASE)
processed_content = body_content.group(1) if body_content else source
clean_content = re.sub(r'<[^>]+>', '', processed_content)
return clean_content[:preview]
async def get_title(session: aiohttp.ClientSession, domain: str):
'''
Get the title of a webpage and its status code
:param session: aiohttp session
:param domain: URL to get the title of
'''
title = None
body = None
status_code = None
try:
async with session.get(domain, timeout=args.timeout, allow_redirects=False) as response:
status_code = response.status
if status_code in (200, 201):
html_content = await response.text()
match = re.search(r'<title>(.*?)</title>', html_content, re.IGNORECASE | re.DOTALL)
title = match.group(1).strip() if match else None
title = bytes(title, 'utf-8').decode('unicode_escape') if title else None
title = re.sub(r'[\r\n]+', ' ', title)[:300] if title else None # Fix this ugly shit
body = await get_body(html_content, args.preview)
body = re.sub(r'\s+', ' ', body).strip() if body else None
elif status_code in (301, 302, 303, 307, 308) and args.retry > 0: # Need to implement a max redirect limit
redirect_url = response.headers.get('Location')
if redirect_url:
vlog(f'{YELLOW}[WARN]{RESET} {domain} -> {redirect_url} {DARK_GREY}({status_code}){RESET}')
return await get_title(session, redirect_url)
else:
vlog(f'{RED}[ERROR]{RESET} No redirect URL found for {domain} {DARK_GREY}({status_code}){RESET}')
else:
vlog(f'{RED}[ERROR]{RESET} {domain} - Invalid status code {DARK_GREY}{status_code}{RESET}')
except asyncio.TimeoutError:
vlog(f'{RED}[ERROR]{RESET} {domain} - HTTP request timed out')
except Exception as e:
vlog(f'{RED}[ERROR]{RESET} Failed to get title for {domain} {DARK_GREY}({e}){RESET}')
return title, body, status_code # Fix this ugly shit
async def check_url(session: aiohttp.ClientSession, domain: str):
'''
Process a domain name
:param session: aiohttp session
:param domain: URL to get the title of
'''
dns_records = {}
for record_type in ('A', 'AAAA'):
records = await dns_lookup(domain, record_type, args.timeout, args.retry)
if records:
dns_records[record_type] = records
if not dns_records:
cname_record = await dns_lookup(domain, 'CNAME', args.timeout, args.retry)
if cname_record:
dns_records['CNAME'] = cname_record
domain = cname_record
else:
vlog(f'{RED}[ERROR]{RESET} No DNS records found for {domain}')
return domain, None, None, None, None, None
title, body, status_code = await get_title(session, f'https://{domain}')
if not title and not body:
title, body, status_code = await get_title(session, f'http://{domain}')
if title or body:
if status_code in (200, 201):
status_code = f'[{GREEN}200{RESET}]'
elif status_code in (301, 302, 303, 307, 308):
status_code = f'[{YELLOW}{status_code}{RESET}]'
logging.info(f'{domain} {status_code} [{CYAN}{title}{RESET}] - [{BLUE}{body}{RESET}]')
return domain, 'https', title, body, dns_records, status_code
else:
vlog(f'{RED}[ERROR]{RESET} {domain} - Failed to retrieve title')
return domain, None, None, None, None, status_code
async def process_file():
'''
Process a list of domains from file
'''
session_params = create_session(args.user_agent, args.timeout, args.proxy)
async with aiohttp.ClientSession(**session_params) as session:
tasks = set()
with open(args.file, 'r') as file:
for line in file:
domain = line.strip()
if domain:
tasks.add(asyncio.create_task(check_url(session, domain)))
if len(tasks) >= args.concurrency: # Should be a better way to do this
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
domain, protocol, title, body, dns_records, status_code = task.result()
if title or body or dns_records:
write_result_to_file(domain, protocol, title, body, dns_records, status_code)
if tasks:
done, _ = await asyncio.wait(tasks)
for task in done:
domain, protocol, title, body, dns_records, status_code = task.result()
if title:
write_result_to_file(domain, protocol, title, body, dns_records, status_code)
def write_result_to_file(domain, protocol, title, body, dns_records, status_code):
'''
Write a single domain result to file
:param domain: Domain name
:param protocol: Protocol used (http or https)
:param title: Title of the domain
:param dns_records: DNS records of the domain
:param status_code: HTTP status code
'''
result = {
'domain': domain,
'protocol': protocol,
'status_code': status_code,
'title': title,
'body': body,
'dns_records': dns_records
}
with open(args.output, 'a') as f:
json.dump(result, f)
f.write('\n')
def main():
global DNS_SERVERS, args
parser = argparse.ArgumentParser(description='Check URLs from a file asynchronously, perform DNS lookups and store results in JSON.')
parser.add_argument('file', help='File containing list of domains')
parser.add_argument('-c', '--concurrency', type=int, default=10, help='Number of concurrent requests')
parser.add_argument('-m', '--memory_limit', type=int, default=1000, help='Number of results to store in memory before syncing to file')
parser.add_argument('-o', '--output', default='results.json', help='Output file')
parser.add_argument('-t', '--timeout', type=int, default=10, help='Timeout for HTTP requests')
parser.add_argument('-u', '--user_agent', default='Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)', help='User agent to use for HTTP requests')
parser.add_argument('-x', '--proxy', type=str, help='Proxy to use for HTTP requests')
parser.add_argument('-r', '--retry', type=int, default=2, help='Number of times to retry failed requests')
parser.add_argument('-v', '--verbose', action='store_true', help='Increase output verbosity')
parser.add_argument('-p', '--preview', type=int, default=500, help='Preview size in bytes for body & title (default: 500)')
args = parser.parse_args()
log_level = logging.INFO
logging.basicConfig(level=log_level, format=f'{DARK_GREY}%(asctime)s{RESET} %(message)s', datefmt='%H:%M:%S')
logging.info('Loading DNS servers...')
DNS_SERVERS = get_dns_servers()
if not DNS_SERVERS:
logging.fatal('Failed to get DNS servers.')
logging.info(f'Found {len(DNS_SERVERS["4"])} IPv4 and {len(DNS_SERVERS["6"])} IPv6 DNS servers.')
asyncio.run(process_file())
if __name__ == '__main__':
main()