diff --git a/LICENSE b/LICENSE index 54ec6ab..9f32a49 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ ISC License -Copyright (c) 2024, acidvegas +Copyright (c) 2025, acidvegas Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above diff --git a/czds.py b/czds.py index a6d8448..fc03618 100644 --- a/czds.py +++ b/czds.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds) # Reference: https://czds.icann.org @@ -12,87 +12,92 @@ import time import urllib.request -# Setting up logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +class CZDS: + '''Class for the ICANN Centralized Zones Data Service''' + + def __init__(self, username: str, password: str): + self.headers = {'Authorization': f'Bearer {self.authenticate(username, password)}'} -def authenticate(username: str, password: str) -> str: - ''' - Authenticate with the ICANN API and return the access token. + def authenticate(self, username: str, password: str) -> dict: + ''' + Authenticate with the ICANN API and return the access token. - :param username: ICANN Username - :param password: ICANN Password - ''' + :param username: ICANN Username + :param password: ICANN Password + ''' - data = json.dumps({'username': username, 'password': password}).encode('utf-8') - headers = {'Content-Type': 'application/json'} - request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers) + try: + # Prepare the request + data = json.dumps({'username': username, 'password': password}).encode('utf-8') + headers = {'Content-Type': 'application/json'} + request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers) - with urllib.request.urlopen(request) as response: - response = response.read().decode('utf-8') - return json.loads(response)['accessToken'] + # Make the request + with urllib.request.urlopen(request) as response: + response = response.read().decode('utf-8') + + return json.loads(response)['accessToken'] + + except Exception as e: + raise Exception(f'Failed to authenticate with ICANN API: {e}') -def fetch_zone_links(token: str) -> list: - ''' - Fetch the list of zone files available for download. + def fetch_zone_links(self) -> list: + ''' + Fetch the list of zone files available for download. - :param token: ICANN access token - ''' + :param token: ICANN access token + ''' - headers = {'Authorization': f'Bearer {token}'} - request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=headers) + request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=self.headers) - with urllib.request.urlopen(request) as response: - if response.status == 200: - return json.loads(response.read().decode('utf-8')) - else: - raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}') + with urllib.request.urlopen(request) as response: + if response.status == 200: + return json.loads(response.read().decode('utf-8')) + else: + raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}') -def download_report(token: str, output_directory: str, username: str): - ''' - Downloads the zone report stats from the API and scrubs the report for privacy. + def download_report(self, output_directory): + ''' + Downloads the zone report stats from the API and scrubs the report for privacy. - :param token: ICANN access token - :param output_directory: Directory to save the scrubbed report - :param username: Username to be redacted - ''' + :param token: ICANN access token + :param output_directory: Directory to save the scrubbed report + :param username: Username to be redacted + ''' - filepath = os.path.join(output_directory, '.stats.csv') - headers = {'Authorization': f'Bearer {token}'} - request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=headers) + filepath = os.path.join(output_directory, '.stats.csv') + request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=self.headers) + + with urllib.request.urlopen(request) as response: + if not (response.status == 200): + raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}') - with urllib.request.urlopen(request) as response: - if response.status == 200: report_data = response.read().decode('utf-8').replace(username, 'nobody@no.name') with open(filepath, 'w') as file: file.write(report_data) - else: - raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}') + def download_zone(self, url: str, output_directory: str): + ''' + Download a single zone file using urllib.request. -def download_zone(url: str, token: str, output_directory: str): - ''' - Download a single zone file using urllib.request. + :param url: URL to download + :param output_directory: Directory to save the zone file + ''' - :param url: URL to download - :param token: ICANN access token - :param output_directory: Directory to save the zone file - ''' + request = urllib.request.Request(url, headers=self.headers) - headers = {'Authorization': f'Bearer {token}'} - request = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(request) as response: + if response.status != 200: + raise Exception(f'Failed to download {url}: {response.status} {response.reason}') - with urllib.request.urlopen(request) as response: - if response.status == 200: - content_disposition = response.getheader('Content-Disposition') - if content_disposition: - filename = content_disposition.split('filename=')[-1].strip('"') - else: - raise ValueError(f'Failed to get filename from Content-Disposition header: {content_disposition}') + if not (content_disposition := response.getheader('Content-Disposition')): + raise ValueError('Missing Content-Disposition header') + filename = content_disposition.split('filename=')[-1].strip('"') filepath = os.path.join(output_directory, filename) with open(filepath, 'wb') as file: @@ -103,8 +108,6 @@ def download_zone(url: str, token: str, output_directory: str): file.write(chunk) return filepath - else: - raise Exception(f'Failed to download {url}: {response.status} {response.reason}') def main(username: str, password: str, concurrency: int): @@ -119,31 +122,32 @@ def main(username: str, password: str, concurrency: int): now = time.strftime('%Y-%m-%d') logging.info(f'Authenticating with ICANN API...') - try: - token = authenticate(username, password) - except Exception as e: - raise Exception(f'Failed to authenticate with ICANN API: {e}') - #logging.info(f'Authenticated with token: {token}') - # The above line is commented out to avoid printing the token to the logs, you can uncomment it for debugging purposes + + CZDS_client = CZDS(username, password) + logging.debug('Created CZDS client') + output_directory = os.path.join(os.getcwd(), 'zones', now) os.makedirs(output_directory, exist_ok=True) - logging.info('Fetching zone stats report...') + logging.info('Fetching zone stats report...') + try: - download_report(token, output_directory, username) + CZDS_client.download_report(output_directory) except Exception as e: raise Exception(f'Failed to download zone stats report: {e}') logging.info('Fetching zone links...') + try: - zone_links = fetch_zone_links(token) + zone_links = CZDS_client.fetch_zone_links() except Exception as e: raise Exception(f'Failed to fetch zone links: {e}') - logging.info(f'Fetched {len(zone_links)} zone links') + + logging.info(f'Fetched {len(zone_links):,} zone links') with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: - future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links} + future_to_url = {executor.submit(CZDS_client.download_zone, url, output_directory): url for url in sorted(zone_links)} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: @@ -155,13 +159,23 @@ def main(username: str, password: str, concurrency: int): if __name__ == '__main__': + # Create argument parser parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service') - parser.add_argument('-u', '--username', help='ICANN Username') - parser.add_argument('-p', '--password', help='ICANN Password') + + # Add arguments + parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username') + parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password') parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads') + + # Parse arguments args = parser.parse_args() - username = args.username or os.getenv('CZDS_USER') or input('ICANN Username: ') - password = args.password or os.getenv('CZDS_PASS') or getpass.getpass('ICANN Password: ') + # Setting up logging + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + # Get username and password + username = args.username or input('ICANN Username: ') + password = args.password or getpass.getpass('ICANN Password: ') + + # Execute main function main(username, password, args.concurrency) diff --git a/extras/service b/extras/service deleted file mode 100755 index 975bdf6..0000000 --- a/extras/service +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/sh -# systemd user service timer setup for czds - developed by acidvegas (https://git.acid.vegas/czds) -# dont forget to export your CZDS_USER and CZDS_PASS before running - -CZDS='/path/to/czds' - -systemd_service() { - mkdir -p $HOME/.config/systemd/user - printf "[Unit]\nDescription=ICANN Centralized Zone Data Service (CZDS) Updater\n\n[Service]\nType=oneshot\nExecStart=$CZDS" > $HOME/.config/systemd/user/czds.service - printf "[Unit]\nDescription=Timer for ICANN Centralized Zone Data Service (CZDS) Updater\n\n[Timer]\nOnCalendar=monthly\nPersistent=true\n\n[Install]\nWantedBy=timers.target" > $HOME/.config/systemd/user/czds.timer - systemctl --user daemon-reload - systemctl --user enable czds.timer && systemctl --user start czds.timer -} - -cronjob() { - (crontab -l 2>/dev/null; echo "0 3 1 * * $CZDS") | crontab - -} \ No newline at end of file