Made into a class now for modular usage

This commit is contained in:
Dionysus 2025-02-20 23:16:31 -05:00
parent 01d1e6c4d8
commit 5471e5cb97
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
3 changed files with 89 additions and 92 deletions

View File

@ -1,6 +1,6 @@
ISC License ISC License
Copyright (c) 2024, acidvegas <acid.vegas@acid.vegas> Copyright (c) 2025, acidvegas <acid.vegas@acid.vegas>
Permission to use, copy, modify, and/or distribute this software for any Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above purpose with or without fee is hereby granted, provided that the above

162
czds.py
View File

@ -1,4 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python3
# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds) # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
# Reference: https://czds.icann.org # Reference: https://czds.icann.org
@ -12,87 +12,92 @@ import time
import urllib.request import urllib.request
# Setting up logging class CZDS:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') '''Class for the ICANN Centralized Zones Data Service'''
def __init__(self, username: str, password: str):
self.headers = {'Authorization': f'Bearer {self.authenticate(username, password)}'}
def authenticate(username: str, password: str) -> str: def authenticate(self, username: str, password: str) -> dict:
''' '''
Authenticate with the ICANN API and return the access token. Authenticate with the ICANN API and return the access token.
:param username: ICANN Username :param username: ICANN Username
:param password: ICANN Password :param password: ICANN Password
''' '''
data = json.dumps({'username': username, 'password': password}).encode('utf-8') try:
headers = {'Content-Type': 'application/json'} # Prepare the request
request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers) data = json.dumps({'username': username, 'password': password}).encode('utf-8')
headers = {'Content-Type': 'application/json'}
request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers)
with urllib.request.urlopen(request) as response: # Make the request
response = response.read().decode('utf-8') with urllib.request.urlopen(request) as response:
return json.loads(response)['accessToken'] response = response.read().decode('utf-8')
return json.loads(response)['accessToken']
except Exception as e:
raise Exception(f'Failed to authenticate with ICANN API: {e}')
def fetch_zone_links(token: str) -> list: def fetch_zone_links(self) -> list:
''' '''
Fetch the list of zone files available for download. Fetch the list of zone files available for download.
:param token: ICANN access token :param token: ICANN access token
''' '''
headers = {'Authorization': f'Bearer {token}'} request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=self.headers)
request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=headers)
with urllib.request.urlopen(request) as response: with urllib.request.urlopen(request) as response:
if response.status == 200: if response.status == 200:
return json.loads(response.read().decode('utf-8')) return json.loads(response.read().decode('utf-8'))
else: else:
raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}') raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}')
def download_report(token: str, output_directory: str, username: str): def download_report(self, output_directory):
''' '''
Downloads the zone report stats from the API and scrubs the report for privacy. Downloads the zone report stats from the API and scrubs the report for privacy.
:param token: ICANN access token :param token: ICANN access token
:param output_directory: Directory to save the scrubbed report :param output_directory: Directory to save the scrubbed report
:param username: Username to be redacted :param username: Username to be redacted
''' '''
filepath = os.path.join(output_directory, '.stats.csv') filepath = os.path.join(output_directory, '.stats.csv')
headers = {'Authorization': f'Bearer {token}'} request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=self.headers)
request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=headers)
with urllib.request.urlopen(request) as response:
if not (response.status == 200):
raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}')
with urllib.request.urlopen(request) as response:
if response.status == 200:
report_data = response.read().decode('utf-8').replace(username, 'nobody@no.name') report_data = response.read().decode('utf-8').replace(username, 'nobody@no.name')
with open(filepath, 'w') as file: with open(filepath, 'w') as file:
file.write(report_data) file.write(report_data)
else:
raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}')
def download_zone(self, url: str, output_directory: str):
'''
Download a single zone file using urllib.request.
def download_zone(url: str, token: str, output_directory: str): :param url: URL to download
''' :param output_directory: Directory to save the zone file
Download a single zone file using urllib.request. '''
:param url: URL to download request = urllib.request.Request(url, headers=self.headers)
:param token: ICANN access token
:param output_directory: Directory to save the zone file
'''
headers = {'Authorization': f'Bearer {token}'} with urllib.request.urlopen(request) as response:
request = urllib.request.Request(url, headers=headers) if response.status != 200:
raise Exception(f'Failed to download {url}: {response.status} {response.reason}')
with urllib.request.urlopen(request) as response: if not (content_disposition := response.getheader('Content-Disposition')):
if response.status == 200: raise ValueError('Missing Content-Disposition header')
content_disposition = response.getheader('Content-Disposition')
if content_disposition:
filename = content_disposition.split('filename=')[-1].strip('"')
else:
raise ValueError(f'Failed to get filename from Content-Disposition header: {content_disposition}')
filename = content_disposition.split('filename=')[-1].strip('"')
filepath = os.path.join(output_directory, filename) filepath = os.path.join(output_directory, filename)
with open(filepath, 'wb') as file: with open(filepath, 'wb') as file:
@ -103,8 +108,6 @@ def download_zone(url: str, token: str, output_directory: str):
file.write(chunk) file.write(chunk)
return filepath return filepath
else:
raise Exception(f'Failed to download {url}: {response.status} {response.reason}')
def main(username: str, password: str, concurrency: int): def main(username: str, password: str, concurrency: int):
@ -119,31 +122,32 @@ def main(username: str, password: str, concurrency: int):
now = time.strftime('%Y-%m-%d') now = time.strftime('%Y-%m-%d')
logging.info(f'Authenticating with ICANN API...') logging.info(f'Authenticating with ICANN API...')
try:
token = authenticate(username, password) CZDS_client = CZDS(username, password)
except Exception as e:
raise Exception(f'Failed to authenticate with ICANN API: {e}')
#logging.info(f'Authenticated with token: {token}')
# The above line is commented out to avoid printing the token to the logs, you can uncomment it for debugging purposes
logging.debug('Created CZDS client')
output_directory = os.path.join(os.getcwd(), 'zones', now) output_directory = os.path.join(os.getcwd(), 'zones', now)
os.makedirs(output_directory, exist_ok=True) os.makedirs(output_directory, exist_ok=True)
logging.info('Fetching zone stats report...') logging.info('Fetching zone stats report...')
try: try:
download_report(token, output_directory, username) CZDS_client.download_report(output_directory)
except Exception as e: except Exception as e:
raise Exception(f'Failed to download zone stats report: {e}') raise Exception(f'Failed to download zone stats report: {e}')
logging.info('Fetching zone links...') logging.info('Fetching zone links...')
try: try:
zone_links = fetch_zone_links(token) zone_links = CZDS_client.fetch_zone_links()
except Exception as e: except Exception as e:
raise Exception(f'Failed to fetch zone links: {e}') raise Exception(f'Failed to fetch zone links: {e}')
logging.info(f'Fetched {len(zone_links)} zone links')
logging.info(f'Fetched {len(zone_links):,} zone links')
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links} future_to_url = {executor.submit(CZDS_client.download_zone, url, output_directory): url for url in sorted(zone_links)}
for future in concurrent.futures.as_completed(future_to_url): for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future] url = future_to_url[future]
try: try:
@ -155,13 +159,23 @@ def main(username: str, password: str, concurrency: int):
if __name__ == '__main__': if __name__ == '__main__':
# Create argument parser
parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service') parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service')
parser.add_argument('-u', '--username', help='ICANN Username')
parser.add_argument('-p', '--password', help='ICANN Password') # Add arguments
parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username')
parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password')
parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads') parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads')
# Parse arguments
args = parser.parse_args() args = parser.parse_args()
username = args.username or os.getenv('CZDS_USER') or input('ICANN Username: ') # Setting up logging
password = args.password or os.getenv('CZDS_PASS') or getpass.getpass('ICANN Password: ') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Get username and password
username = args.username or input('ICANN Username: ')
password = args.password or getpass.getpass('ICANN Password: ')
# Execute main function
main(username, password, args.concurrency) main(username, password, args.concurrency)

View File

@ -1,17 +0,0 @@
#!/bin/sh
# systemd user service timer setup for czds - developed by acidvegas (https://git.acid.vegas/czds)
# dont forget to export your CZDS_USER and CZDS_PASS before running
CZDS='/path/to/czds'
systemd_service() {
mkdir -p $HOME/.config/systemd/user
printf "[Unit]\nDescription=ICANN Centralized Zone Data Service (CZDS) Updater\n\n[Service]\nType=oneshot\nExecStart=$CZDS" > $HOME/.config/systemd/user/czds.service
printf "[Unit]\nDescription=Timer for ICANN Centralized Zone Data Service (CZDS) Updater\n\n[Timer]\nOnCalendar=monthly\nPersistent=true\n\n[Install]\nWantedBy=timers.target" > $HOME/.config/systemd/user/czds.timer
systemctl --user daemon-reload
systemctl --user enable czds.timer && systemctl --user start czds.timer
}
cronjob() {
(crontab -l 2>/dev/null; echo "0 3 1 * * $CZDS") | crontab -
}