Updated both Python and POSIX version for minor improvements and code structure. Both files will now download a stats report from ICANN prior to downloading zones.

This commit is contained in:
Dionysus 2024-03-05 21:10:57 -05:00
parent 34fd31e33d
commit 542573a469
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
3 changed files with 146 additions and 87 deletions

87
czds
View File

@ -2,60 +2,63 @@
# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds) # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
# Reference: https://czds.icann.org # Reference: https://czds.icann.org
# Function to authenticate and get access token # Main program starts here
authenticate() { echo "ICANN Zone Data Service Script"
username="$1"
password="$2" # Define the current date for data organization
# Make an authentication request and inline the URL now=$(date +"%Y-%m-%d")
response=$(curl -s -X POST "https://account-api.icann.org/api/authenticate" \
# Get username and password (interactive if not set by environment variables)
username=${CZDS_USER:-$(read -p "ICANN Username: " user && echo "$user")}
password=${CZDS_PASS:-$(read -sp "ICANN Password: " pass && echo "$pass")}
echo "Authenticating as $username..."
# Make an authentication request
response=$(curl -s -X POST "https://account-api.icann.org/api/authenticate" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-H "Accept: application/json" \ -H "Accept: application/json" \
-d "{\"username\":\"$username\",\"password\":\"$password\"}") -d "{\"username\":\"$username\",\"password\":\"$password\"}")
# Extract and return the access token # Extract and return the access access_token
echo "$response" | grep -o '"accessToken":"[^"]*' | cut -d '"' -f 4 access_token=$(echo "$response" | grep -o '"accessToken":"[^"]*' | cut -d '"' -f 4)
}
# Function to download a zone file
download_zone() {
url="$1"
token="$2"
tld=$(basename "$url" .zone)
# Make the GET request and save the response to a file
echo "Downloading $url..."
curl --progress-bar -o zonefiles/$tld.txt.gz -H "Authorization: Bearer $token" "$url"
echo "Downloaded zone file to zonefiles/$tld.txt.gz"
}
# Main program starts here
echo "ICANN Zone Data Service Script"
# Get username and password
username=${CZDS_USER:-$(read -p "ICANN Username: " user && echo "$user")}
password=${CZDS_PASS:-$(read -sp "ICANN Password: " pass && echo "$pass" && echo)}
# Authenticate and get token
echo "Authenticating..."
token=$(authenticate "$username" "$password")
# Check if authentication was successful # Check if authentication was successful
if [ -z "$token" ]; then [ -z $access_token ] && echo "error: authentication failed" && exit 1
echo "Authentication failed."
exit 1 echo "Authenticated successfully & recieved access_token $access_token"
fi
# Create output directory
mkdir -p zonefiles/$now
echo "Fetching zone report..."
# Get your zone report stats from the API
curl --progress-bar -o zonefiles/$now/.stats.csv -H "Authorization: Bearer $access_token" https://czds-api.icann.org/czds/requests/report
echo "Scrubbing report for privacy..."
# Redact username from report for privacy
sed -i 's/$username/nobody@no.name/g' zonefiles/$now/report.csv
echo "Fetching zone file links..." echo "Fetching zone file links..."
# Fetch zone links with inline URL and download zone files
zone_links=$(curl -s -H "Authorization: Bearer $token" "https://czds-api.icann.org/czds/downloads/links" | grep -o 'https://[^"]*')
# Create output directory if it does not exist # Get the zone file links from the API
mkdir -p zonefiles zone_links=$(curl -s -H "Authorization: Bearer $access_token" https://czds-api.icann.org/czds/downloads/links | grep -o 'https://[^"]*')
# Download zone files # Download zone files
for url in $zone_links; do for url in $zone_links; do
download_zone "$url" "$token" tld=$(basename "$url" .zone)
echo "Downloading $url..."
# Make the GET request and save the response to a file
curl --progress-bar -o zonefiles/$now/$tld.txt.gz -H "Authorization: Bearer $access_token" "$url"
echo "Downloaded $tld zone file to zonefiles/$tld.txt.gz (extracting...)"
# Unzip the zone file
gunzip zonefiles/$now/$tld.txt.gz
done done
echo "All zone files downloaded." echo "All zone files downloaded."

134
czds.py
View File

@ -5,13 +5,11 @@
import argparse import argparse
import concurrent.futures import concurrent.futures
import getpass import getpass
import json
import logging import logging
import os import os
import time
try: import urllib.request
import requests
except ImportError:
raise ImportError('Missing dependency: requests (pip install requests)')
# Setting up logging # Setting up logging
@ -20,21 +18,64 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(level
def authenticate(username: str, password: str) -> str: def authenticate(username: str, password: str) -> str:
''' '''
Authenticate with ICANN's API and return the access token. Authenticate with the ICANN API and return the access token.
:param username: ICANN Username :param username: ICANN Username
:param password: ICANN Password :param password: ICANN Password
''' '''
response = requests.post('https://account-api.icann.org/api/authenticate', json={'username': username, 'password': password}) data = json.dumps({'username': username, 'password': password}).encode('utf-8')
response.raise_for_status() headers = {'Content-Type': 'application/json'}
request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers)
with urllib.request.urlopen(request) as response:
response = response.read().decode('utf-8')
return json.loads(response)['accessToken']
def fetch_zone_links(token: str) -> list:
'''
Fetch the list of zone files available for download.
:param token: ICANN access token
'''
headers = {'Authorization': f'Bearer {token}'}
request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=headers)
with urllib.request.urlopen(request) as response:
if response.status == 200:
return json.loads(response.read().decode('utf-8'))
else:
raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}')
def download_report(token: str, output_directory: str, username: str):
'''
Downloads the zone report stats from the API and scrubs the report for privacy.
:param token: ICANN access token
:param output_directory: Directory to save the scrubbed report
:param username: Username to be redacted
'''
filepath = os.path.join(output_directory, '.stats.csv')
headers = {'Authorization': f'Bearer {token}'}
request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=headers)
with urllib.request.urlopen(request) as response:
if response.status == 200:
report_data = response.read().decode('utf-8').replace(username, 'nobody@no.name')
with open(filepath, 'w') as file:
file.write(report_data)
else:
raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}')
return response.json()['accessToken']
def download_zone(url: str, token: str, output_directory: str): def download_zone(url: str, token: str, output_directory: str):
''' '''
Download a single zone file. Download a single zone file using urllib.request.
:param url: URL to download :param url: URL to download
:param token: ICANN access token :param token: ICANN access token
@ -42,15 +83,28 @@ def download_zone(url: str, token: str, output_directory: str):
''' '''
headers = {'Authorization': f'Bearer {token}'} headers = {'Authorization': f'Bearer {token}'}
response = requests.get(url, headers=headers) request = urllib.request.Request(url, headers=headers)
response.raise_for_status()
filename = response.headers.get('Content-Disposition').split('filename=')[-1].strip('"') with urllib.request.urlopen(request) as response:
if response.status == 200:
content_disposition = response.getheader('Content-Disposition')
if content_disposition:
filename = content_disposition.split('filename=')[-1].strip('"')
else:
raise ValueError(f'Failed to get filename from Content-Disposition header: {content_disposition}')
filepath = os.path.join(output_directory, filename) filepath = os.path.join(output_directory, filename)
with open(filepath, 'wb') as file: with open(filepath, 'wb') as file:
for chunk in response.iter_content(chunk_size=1024): while True:
chunk = response.read(1024)
if not chunk:
break
file.write(chunk) file.write(chunk)
return filepath return filepath
else:
raise Exception(f'Failed to download {url}: {response.status} {response.reason}')
def main(username: str, password: str, concurrency: int): def main(username: str, password: str, concurrency: int):
@ -62,23 +116,36 @@ def main(username: str, password: str, concurrency: int):
:param concurrency: Number of concurrent downloads :param concurrency: Number of concurrent downloads
''' '''
token = authenticate(username, password) now = time.strftime('%Y-%m-%d')
headers = {'Authorization': f'Bearer {token}'}
response = requests.get('https://czds-api.icann.org/czds/downloads/links', headers=headers)
response.raise_for_status()
zone_links = response.json()
output_directory = 'zonefiles'
logging.info(f'Authenticating with ICANN API...')
try:
token = authenticate(username, password)
except Exception as e:
raise Exception(f'Failed to authenticate with ICANN API: {e}')
#logging.info(f'Authenticated with token: {token}')
# The above line is commented out to avoid printing the token to the logs, you can uncomment it for debugging purposes
output_directory = os.path.join(os.getcwd(), 'zones', now)
os.makedirs(output_directory, exist_ok=True) os.makedirs(output_directory, exist_ok=True)
logging.info('Fetching zone stats report...')
try:
download_report(token, output_directory, username)
except Exception as e:
raise Exception(f'Failed to download zone stats report: {e}')
logging.info('Fetching zone links...')
try:
zone_links = fetch_zone_links(token)
except Exception as e:
raise Exception(f'Failed to fetch zone links: {e}')
logging.info(f'Fetched {len(zone_links)} zone links')
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links} future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links}
for future in concurrent.futures.as_completed(future_to_url): for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future] url = future_to_url[future]
try: try:
filepath = future.result() filepath = future.result()
logging.info(f'Completed downloading {url} to file {filepath}') logging.info(f'Completed downloading {url} to file {filepath}')
@ -88,24 +155,13 @@ def main(username: str, password: str, concurrency: int):
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description="ICANN Zone Files Downloader") parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service')
parser.add_argument('-u', '--username', help='ICANN Username') parser.add_argument('-u', '--username', help='ICANN Username')
parser.add_argument('-p', '--password', help='ICANN Password') parser.add_argument('-p', '--password', help='ICANN Password')
parser.add_argument('-c', '--concurrency', type=int, default=5, help='Number of concurrent downloads') parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads')
args = parser.parse_args() args = parser.parse_args()
username = args.username or os.getenv('CZDS_USER') username = args.username or os.getenv('CZDS_USER') or input('ICANN Username: ')
password = args.password or os.getenv('CZDS_PASS') password = args.password or os.getenv('CZDS_PASS') or getpass.getpass('ICANN Password: ')
if not username:
username = input('ICANN Username: ')
if not password:
password = getpass.getpass('ICANN Password: ')
try:
main(username, password, args.concurrency) main(username, password, args.concurrency)
except requests.HTTPError as e:
logging.error(f'HTTP error occurred: {e.response.status_code} - {e.response.reason}')
except Exception as e:
logging.error(f'An error occurred: {e}')