Updated zone stats in README, updated code to be more verbose

This commit is contained in:
Dionysus 2024-01-03 00:08:48 -05:00
parent 1b90b6e099
commit 4be93ce367
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
3 changed files with 100 additions and 96 deletions

View File

@ -7,7 +7,7 @@ Zone files are updated once every 24 hours, specifically from 00:00 UTC to 06:00
At the time of writing this repository, the CZDS offers access to 1,150 zones in total. At the time of writing this repository, the CZDS offers access to 1,150 zones in total.
1,076 have been approved, 58 are still pending *(after 3 months)*, 10 have been revoked because the TLDs are longer active, and 6 have been denied. 1,079 have been approved, 55 are still pending *(after 3 months)*, 10 have been revoked because the TLDs are longer active, and 6 have been denied.
## Usage ## Usage
### Authentication ### Authentication

47
czds
View File

@ -1,4 +1,4 @@
#!/bin/sh #!/bin/bash
# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds) # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
# https://czds.icann.org # https://czds.icann.org
@ -6,30 +6,31 @@
# Function to authenticate and get access token # Function to authenticate and get access token
authenticate() { authenticate() {
username="$1" username="$1"
password="$2" password="$2"
# Make an authentication request and inline the URL # Make an authentication request and inline the URL
response=$(curl -s -X POST "https://account-api.icann.org/api/authenticate" \ response=$(curl -s -X POST "https://account-api.icann.org/api/authenticate" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-H "Accept: application/json" \ -H "Accept: application/json" \
-d "{\"username\":\"$username\",\"password\":\"$password\"}") -d "{\"username\":\"$username\",\"password\":\"$password\"}")
# Extract and return the access token # Extract and return the access token
echo "$response" | grep -o '"accessToken":"[^"]*' | cut -d '"' -f 4 echo "$response" | grep -o '"accessToken":"[^"]*' | cut -d '"' -f 4
} }
# Function to download a zone file # Function to download a zone file
download_zone() { download_zone() {
url="$1" url="$1"
token="$2" token="$2"
filename=$(basename "$url") filename=$(basename "$url")
filepath="zonefiles/$filename" filepath="zonefiles/$filename"
# Create output directory if it does not exist
mkdir -p "zonefiles"
# Make the GET request and save the response to a file # Create output directory if it does not exist
curl -s -o "$filepath" -H "Authorization: Bearer $token" "$url" mkdir -p zonefiles
echo "Downloaded zone file to $filepath"
# Make the GET request and save the response to a file
curl -s -o "$filepath" -H "Authorization: Bearer $token" "$url"
echo "Downloaded zone file to $filepath"
} }
# Main program starts here # Main program starts here
@ -45,8 +46,8 @@ token=$(authenticate "$username" "$password")
# Check if authentication was successful # Check if authentication was successful
if [ -z "$token" ]; then if [ -z "$token" ]; then
echo "Authentication failed." echo "Authentication failed."
exit 1 exit 1
fi fi
echo "Fetching zone file links..." echo "Fetching zone file links..."
@ -55,8 +56,8 @@ zone_links=$(curl -s -H "Authorization: Bearer $token" "https://czds-api.icann.o
# Download zone files # Download zone files
for url in $zone_links; do for url in $zone_links; do
echo "Downloading $url..." echo "Downloading $url..."
download_zone "$url" "$token" download_zone "$url" "$token"
done done
echo "All zone files downloaded." echo "All zone files downloaded."

147
czds.py
View File

@ -3,8 +3,8 @@
''' '''
References: References:
- https://czds.icann.org - https://czds.icann.org
- https://czds.icann.org/sites/default/files/czds-api-documentation.pdf - https://czds.icann.org/sites/default/files/czds-api-documentation.pdf
''' '''
import argparse import argparse
@ -15,90 +15,93 @@ import os
try: try:
import requests import requests
except ImportError: except ImportError:
raise ImportError('Missing dependency: requests (pip install requests)') raise ImportError('Missing dependency: requests (pip install requests)')
# Setting up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
def authenticate(username: str, password: str) -> str: def authenticate(username: str, password: str) -> str:
''' '''
Authenticate with ICANN's API and return the access token. Authenticate with ICANN's API and return the access token.
:param username: ICANN Username :param username: ICANN Username
:param password: ICANN Password :param password: ICANN Password
''' '''
response = requests.post('https://account-api.icann.org/api/authenticate', json={'username': username, 'password': password}) response = requests.post('https://account-api.icann.org/api/authenticate', json={'username': username, 'password': password})
response.raise_for_status() response.raise_for_status()
return response.json()['accessToken'] return response.json()['accessToken']
def download_zone(url: str, token: str, output_directory: str): def download_zone(url: str, token: str, output_directory: str):
''' '''
Download a single zone file. Download a single zone file.
:param url: URL to download :param url: URL to download
:param token: ICANN access token :param token: ICANN access token
:param output_directory: Directory to save the zone file :param output_directory: Directory to save the zone file
''' '''
headers = {'Authorization': f'Bearer {token}'} headers = {'Authorization': f'Bearer {token}'}
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
filename = response.headers.get('Content-Disposition').split('filename=')[-1].strip('"') filename = response.headers.get('Content-Disposition').split('filename=')[-1].strip('"')
filepath = os.path.join(output_directory, filename) filepath = os.path.join(output_directory, filename)
with open(filepath, 'wb') as file: with open(filepath, 'wb') as file:
for chunk in response.iter_content(chunk_size=1024): for chunk in response.iter_content(chunk_size=1024):
file.write(chunk) file.write(chunk)
return filepath return filepath
def main(username: str, password: str, concurrency: int): def main(username: str, password: str, concurrency: int):
''' '''
Main function to download all zone files. Main function to download all zone files.
:param username: ICANN Username :param username: ICANN Username
:param password: ICANN Password :param password: ICANN Password
:param concurrency: Number of concurrent downloads :param concurrency: Number of concurrent downloads
''' '''
token = authenticate(username, password) token = authenticate(username, password)
headers = {'Authorization': f'Bearer {token}'} headers = {'Authorization': f'Bearer {token}'}
response = requests.get('https://czds-api.icann.org/czds/downloads/links', headers=headers) response = requests.get('https://czds-api.icann.org/czds/downloads/links', headers=headers)
response.raise_for_status() response.raise_for_status()
zone_links = response.json() zone_links = response.json()
output_directory = 'zonefiles'
output_directory = 'zonefiles' os.makedirs(output_directory, exist_ok=True)
os.makedirs(output_directory, exist_ok=True)
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links}
future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links} for future in concurrent.futures.as_completed(future_to_url):
for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future]
url = future_to_url[future] try:
try: filepath = future.result()
filepath = future.result() logging.info(f'Completed downloading {url} to file {filepath}')
logging.info(f'Completed downloading {url} to file {filepath}') except Exception as e:
except Exception as e: logging.error(f'{url} generated an exception: {e}')
logging.error(f'{url} generated an exception: {e}')
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description="ICANN Zone Files Downloader") parser = argparse.ArgumentParser(description="ICANN Zone Files Downloader")
parser.add_argument('-u', '--username', help='ICANN Username') parser.add_argument('-u', '--username', help='ICANN Username')
parser.add_argument('-p', '--password', help='ICANN Password') parser.add_argument('-p', '--password', help='ICANN Password')
parser.add_argument('-c', '--concurrency', type=int, default=5, help='Number of concurrent downloads') parser.add_argument('-c', '--concurrency', type=int, default=5, help='Number of concurrent downloads')
args = parser.parse_args() args = parser.parse_args()
username = args.username or os.getenv('CZDS_USER') username = args.username or os.getenv('CZDS_USER')
password = args.password or os.getenv('CZDS_PASS') password = args.password or os.getenv('CZDS_PASS')
if not username: if not username:
username = input('ICANN Username: ') username = input('ICANN Username: ')
if not password: if not password:
password = getpass.getpass('ICANN Password: ') password = getpass.getpass('ICANN Password: ')
try: try:
main(username, password, args.concurrency) main(username, password, args.concurrency)
except requests.HTTPError as e: except requests.HTTPError as e:
logging.error(f'HTTP error occurred: {e.response.status_code} - {e.response.reason}') logging.error(f'HTTP error occurred: {e.response.status_code} - {e.response.reason}')
except Exception as e: except Exception as e:
logging.error(f'An error occurred: {e}') logging.error(f'An error occurred: {e}')