Code has been refined and improved, README updated, LICENSE added

This commit is contained in:
Dionysus 2023-12-15 23:02:37 -05:00
parent 21ea59bebc
commit 3b96fa7bfd
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
3 changed files with 160 additions and 99 deletions

15
LICENSE Normal file
View File

@ -0,0 +1,15 @@
ISC License
Copyright (c) 2023, acidvegas <acid.vegas@acid.vegas>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

View File

@ -1,6 +1,24 @@
# HTTP-Z
###### This is still a work in progress...stay tuned for updates!
## Information
This is still a work in progress & was made because HTTPX fucking sucks.
This script is developed as a robust alternative to HTTPX, addressing the limitations in customizing JSON outputs and other functionalities that HTTPX lacks. It is specifically designed for asynchronous lookups on a list of domains, efficiently gathering DNS information and web content details such as page titles and body previews.
Stay tuned
## Usage
| Argument | Description |
| ---------------------- | ----------------------------------------------------------- |
| `<input_file>` | File containing list of domains |
| `-c`, `--concurrency` | Number of concurrent requests |
| `-m`, `--memory_limit` | Number of results to store in memory before syncing to file |
| `-o`, `--output` | Output file |
| `-t`, `--timeout` | Timeout for HTTP requests |
| `-u`, `--user_agent` | User agent to use for HTTP requests |
| `-x`, `--proxy` | Proxy to use for HTTP requests |
| `-r`, `--retry` | Number of times to retry failed requests |
| `-v`, `--verbose` | Increase output verbosity |
| `-p`, `--preview` | Preview size in bytes for body & title *(default: 500)* |
___
###### Mirrors
[acid.vegas](https://git.acid.vegas/httpz) • [GitHub](https://github.com/acidvegas/httpz) • [GitLab](https://gitlab.com/acidvegas/httpz) • [SuperNETs](https://git.supernets.org/acidvegas/httpz)

222
httpz.py
View File

@ -14,190 +14,222 @@ import logging
import ssl
import urllib.request
try:
import aiodns
except ImportError:
print('Missing required module \'aiodns\'. (pip install aiodns)')
exit(1)
try:
import aiohttp
except ImportError:
print('Missing required module \'aiohttp\'. (pip install aiohttp)')
exit(1)
# ANSI escape codes for colors
RED = '\033[91m'
GREEN = '\033[92m'
RED = '\033[91m'
GREEN = '\033[92m'
DARK_GREY = '\033[90m'
RESET = '\033[0m'
RESET = '\033[0m'
# Globals
DNS_SERVERS = None
args = None # Global args variable
def vlog(msg: str):
'''
Verbose logging only if enabled
:param msg: Message to print to console
'''
if args.verbose:
logging.info(msg)
def get_dns_servers() -> list:
def get_dns_servers() -> dict:
'''Get a list of DNS servers to use for lookups.'''
source = urllib.request.urlopen('https://public-dns.info/nameservers.txt')
results = source.read().decode().split('\n')
with urllib.request.urlopen('https://public-dns.info/nameservers.txt') as source:
results = source.read().decode().split('\n')
v4_servers = [server for server in results if ':' not in server]
v6_servers = [server for server in results if ':' in server]
v6_servers = [server for server in results if ':' in server]
return {'4': v4_servers, '6': v6_servers}
async def dns_lookup(domain: str, record_type: str) -> list:
async def dns_lookup(domain: str, record_type: str, timeout: int) -> list:
'''
Resolve DNS information from a domain
:param domain: Domain name to resolve
:param record_type: DNS record type to resolve
:param timeout: Timeout for DNS request
'''
try:
version = '4' if record_type == 'A' else '6' if record_type == 'AAAA' else random.choice(['4','6'])
resolver = aiodns.DNSResolver(nameservers=[random.choice(DNS_SERVERS[version])])
records = await resolver.query(domain, record_type)
return [record.host for record in records]
except Exception:
pass
for i in range(args.retry):
try:
version = '4' if record_type == 'A' else '6' if record_type == 'AAAA' else random.choice(['4','6'])
nameserver = random.choice(DNS_SERVERS[version])
resolver = aiodns.DNSResolver(nameservers=[nameserver], timeout=timeout)
records = await resolver.query(domain, record_type)
return records.cname if record_type == 'CNAME' else [record.host for record in records]
except Exception as e:
vlog(f'{RED}[ERROR]{RESET} {domain} - Failed to resolve {record_type} record using {nameserver} {DARK_GREY}({str(e)}){RESET}')
return []
async def get_title(session: aiohttp.ClientSession, domain: str, max_redirects: int, timeout: int):
async def get_body(source: str, preview: int) -> str:
'''
Get the title of a webpage
Get the body of a webpage
:param source: HTML source of the webpage
:param preview: Number of bytes to preview
'''
body_content = re.search(r'<body.*?>(.*?)</body>', source, re.DOTALL | re.IGNORECASE)
processed_content = body_content.group(1) if body_content else source
clean_content = re.sub(r'<[^>]+>', '', processed_content)
return clean_content[:preview]
async def get_title(session: aiohttp.ClientSession, domain: str):
'''
Get the title of a webpage and its status code
:param session: aiohttp session
:param domain: URL to get the title of
:param max_redirects: Maximum number of redirects to follow
:param timeout: Timeout for HTTP requests
'''
body = None
status_code = None
title = None
try:
async with session.get(domain, timeout=timeout, allow_redirects=False) as response:
if response.status in (200, 201):
async with session.get(domain, timeout=args.timeout, allow_redirects=False) as response:
status_code = response.status
if status_code in (200, 201):
html_content = await response.text()
match = re.search(r'<title>(.*?)</title>', html_content, re.IGNORECASE | re.DOTALL)
return match.group(1).strip() if match else None
elif response.status in (301, 302, 303, 307, 308) and max_redirects > 0:
title = match.group(1).strip() if match else None
title = re.sub(r'[\r\n]+', ' ', title)[:300] if title else None # Fix this ugly shit
body = await get_body(html_content, args.preview)
elif status_code in (301, 302, 303, 307, 308) and args.retry > 0: # Need to implement a max redirect limit
redirect_url = response.headers.get('Location')
if redirect_url:
return await get_title(session, redirect_url, max_redirects - 1, timeout)
return await get_title(session, redirect_url)
else:
vlog(f'{RED}[ERROR]{RESET} {domain} - No redirect URL found for {status_code} status code')
else:
logging.error(f'{RED}[ERROR]{RESET} {domain} - Invalid HTTP status code {DARK_GREY}({response.status}){RESET}')
vlog(f'{RED}[ERROR]{RESET} {domain} - Invalid status code {DARK_GREY}{status_code}{RESET}')
except asyncio.TimeoutError:
vlog(f'{RED}[ERROR]{RESET} {domain} - HTTP request timed out')
except Exception as e:
logging.error(f'{RED}[ERROR]{RESET} {domain} - {e}')
return None
vlog(f'{RED}[ERROR]{RESET} Failed to get title for {domain} {DARK_GREY}({e}){RESET}')
return title, body, status_code
async def check_url(session: aiohttp.ClientSession, domain: str, timeout: int, retry: int):
async def check_url(session: aiohttp.ClientSession, domain: str):
'''
Process a domain name
:param session: aiohttp session
:param domain: URL to get the title of
:param timeout: Timeout for HTTP requests
:param retry: Number of retries for failed requests
'''
dns_records = {}
for record_type in ('A', 'AAAA'):
records = await dns_lookup(domain, record_type)
records = await dns_lookup(domain, record_type, args.timeout)
if records:
dns_records[record_type] = records
break
if not dns_records:
cname_records = await dns_lookup(domain, 'CNAME')
if cname_records:
dns_records['CNAME'] = cname_records
domain = cname_records[0]
cname_record = await dns_lookup(domain, 'CNAME', args.timeout)
if cname_record:
dns_records['CNAME'] = cname_record
domain = cname_record
else:
vlog(f'{RED}[ERROR]{RESET} No DNS records found for {domain}')
return domain, None, None, None, None, None
if not dns_records:
logging.info(f'{DARK_GREY}[NO DNS RECORDS]{RESET} {domain}')
return domain, None, None, None
title, body, status_code = await get_title(session, f'https://{domain}')
if not title and not body:
title, body, status_code = await get_title(session, f'http://{domain}')
title = await get_title(session, f'https://{domain}', retry, timeout)
if not title:
title = await get_title(session, f'http://{domain}', retry, timeout)
if title:
logging.info(f'{GREEN}[SUCCESS]{RESET} {domain} - {title} - DNS: {dns_records}')
return domain, 'https', title, dns_records
if title or body:
logging.info(f'[{GREEN}SUCCESS{RESET}] {domain} - {title} - {body}')
return domain, 'https', title, body, dns_records, status_code
else:
logging.error(f'{RED}[ERROR]{RESET} {domain} - Failed to retrieve title')
vlog(f'{RED}[ERROR]{RESET} {domain} - Failed to retrieve title')
return domain, None, None, None
return domain, None, None, None, None, status_code
async def process_file(file_path: str, concurrency: int, memory_limit: int, output_file: str, timeout: int, user_agent: str, proxy: str, retry: int):
async def process_file():
'''
Process a list of domains from file
:param file_path: Path to the file to read from
:param concurrency: Number of domains to look up concurrently
:param memory_limit: Number of successful domain lookups to store in memory before syncing to file
:param output_file: Output file for results
:param timeout: Timeout for HTTP requests
:param user_agent: User agent for HTTP requests
:param proxy: Proxy for HTTP requests
:param retry: Number of retries for failed requests
'''
results = {}
counter = 0
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
headers = {'User-Agent': user_agent}
headers = {'User-Agent': args.user_agent}
connector = aiohttp.TCPConnector(ssl=ssl_context)
session_params = {
'connector': connector,
'headers': headers,
'timeout': aiohttp.ClientTimeout(total=timeout)
'timeout': aiohttp.ClientTimeout(total=args.timeout)
}
if proxy:
session_params['proxy'] = proxy
if args.proxy:
session_params['proxy'] = args.proxy
async with aiohttp.ClientSession(**session_params) as session:
tasks = set()
with open(file_path, 'r') as file:
with open(args.file, 'r') as file:
for line in file:
domain = line.strip()
if domain:
tasks.add(asyncio.create_task(check_url(session, domain, timeout, retry)))
tasks.add(asyncio.create_task(check_url(session, domain)))
if len(tasks) >= concurrency:
if len(tasks) >= args.concurrency:
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
domain, protocol, title, dns_records = task.result()
if title:
results[domain] = {'protocol': protocol, 'title': title, 'dns_records': dns_records}
domain, protocol, title, body, dns_records, status_code = task.result()
if title or body: # log results for dns?
write_result_to_file(domain, protocol, title, body, dns_records, status_code)
counter += 1
if counter >= memory_limit:
with open(output_file, 'w') as f:
json.dump(results, f, indent=4)
counter = 0
results.clear()
if counter % args.memory_limit == 0:
logging.info(f'Processed {counter} domains')
if tasks:
await asyncio.wait(tasks)
for task in tasks:
domain, protocol, title, dns_records = task.result()
done, _ = await asyncio.wait(tasks)
for task in done:
domain, protocol, title, body, dns_records, status_code = task.result()
if title:
results[domain] = {'protocol': protocol, 'title': title, 'dns_records': dns_records}
write_result_to_file(domain, protocol, title, body, dns_records, status_code)
with open(output_file, 'a') as f:
json.dump(results, f, indent=4)
def write_result_to_file(domain, protocol, title, body, dns_records, status_code):
'''
Write a single domain result to file
:param domain: Domain name
:param protocol: Protocol used (http or https)
:param title: Title of the domain
:param dns_records: DNS records of the domain
:param status_code: HTTP status code
'''
result = {
'domain': domain,
'protocol': protocol,
'status_code': status_code,
'title': title,
'body': body,
'dns_records': dns_records
}
with open(args.output, 'a') as f:
json.dump(result, f)
f.write('\n')
def main():
global DNS_SERVERS
global DNS_SERVERS, args
parser = argparse.ArgumentParser(description='Check URLs from a file asynchronously, perform DNS lookups and store results in JSON.')
parser.add_argument('file', help='File containing list of domains')
@ -209,23 +241,19 @@ def main():
parser.add_argument('-x', '--proxy', help='Proxy to use for HTTP requests')
parser.add_argument('-r', '--retry', type=int, default=3, help='Number of times to retry failed requests')
parser.add_argument('-v', '--verbose', action='store_true', help='Increase output verbosity')
parser.add_argument('-p', '--preview', type=int, default=500, help='Preview size in bytes for body & title (default: 500)')
args = parser.parse_args()
log_level = logging.INFO if args.verbose else logging.WARNING
logging.basicConfig(level=log_level, format=f'{DARK_GREY}%(asctime)s{RESET} - %(message)s', datefmt='%H:%M:%S')
log_level = logging.INFO
logging.basicConfig(level=log_level, format=f'{DARK_GREY}%(asctime)s{RESET} %(message)s', datefmt='%H:%M:%S')
logging.info('Loading DNS servers...')
DNS_SERVERS = get_dns_servers()
if not DNS_SERVERS:
logging.fatal('Failed to get DNS servers.')
exit(1)
logging.info(f'Found {len(DNS_SERVERS["4"])} IPv4 and {len(DNS_SERVERS["6"])} IPv6 DNS servers.')
asyncio.run(process_file(args.file, args.concurrency, args.memory_limit, args.output, args.timeout, args.user_agent, args.proxy, args.retry))
asyncio.run(process_file())
if __name__ == '__main__':
main()