From c5bc87326be563bd712dfd7d580fe40903f4469e Mon Sep 17 00:00:00 2001 From: acidvegas Date: Fri, 21 Feb 2025 00:15:28 -0500 Subject: [PATCH] Fixed report downloading --- czds.py | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/czds.py b/czds.py index fc03618..a9cab67 100644 --- a/czds.py +++ b/czds.py @@ -16,10 +16,11 @@ class CZDS: '''Class for the ICANN Centralized Zones Data Service''' def __init__(self, username: str, password: str): - self.headers = {'Authorization': f'Bearer {self.authenticate(username, password)}'} + self.username = username + self.headers = {'Authorization': f'Bearer {self.authenticate(username, password)}'} - def authenticate(self, username: str, password: str) -> dict: + def authenticate(self, username: str, password: str) -> str: ''' Authenticate with the ICANN API and return the access token. @@ -50,34 +51,37 @@ class CZDS: :param token: ICANN access token ''' + # Create the request request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=self.headers) + # Make the request with urllib.request.urlopen(request) as response: - if response.status == 200: - return json.loads(response.read().decode('utf-8')) - else: + if response.status != 200: raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}') + + return json.loads(response.read().decode('utf-8')) + - - def download_report(self, output_directory): + def download_report(self, filepath: str): ''' Downloads the zone report stats from the API and scrubs the report for privacy. - :param token: ICANN access token - :param output_directory: Directory to save the scrubbed report - :param username: Username to be redacted + :param filepath: Filepath to save the scrubbed report ''' - filepath = os.path.join(output_directory, '.stats.csv') - request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=self.headers) - + # Create the request + request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=self.headers) + + # Make the request with urllib.request.urlopen(request) as response: - if not (response.status == 200): + if response.status != 200: raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}') + + content = response.read().decode('utf-8') - report_data = response.read().decode('utf-8').replace(username, 'nobody@no.name') - with open(filepath, 'w') as file: - file.write(report_data) + # Write the content to the file + with open(filepath, 'w') as file: + file.write(content.replace(self.username, 'nobody@no.name')) # Wipe the email address from the report for privacy def download_zone(self, url: str, output_directory: str): @@ -88,8 +92,10 @@ class CZDS: :param output_directory: Directory to save the zone file ''' + # Create the request request = urllib.request.Request(url, headers=self.headers) + # Make the request with urllib.request.urlopen(request) as response: if response.status != 200: raise Exception(f'Failed to download {url}: {response.status} {response.reason}') @@ -97,9 +103,11 @@ class CZDS: if not (content_disposition := response.getheader('Content-Disposition')): raise ValueError('Missing Content-Disposition header') + # Extract the filename from the Content-Disposition header filename = content_disposition.split('filename=')[-1].strip('"') filepath = os.path.join(output_directory, filename) + # Write the content to the file with open(filepath, 'wb') as file: while True: chunk = response.read(1024) @@ -133,7 +141,7 @@ def main(username: str, password: str, concurrency: int): logging.info('Fetching zone stats report...') try: - CZDS_client.download_report(output_directory) + CZDS_client.download_report(os.path.join(output_directory, '.report.csv')) except Exception as e: raise Exception(f'Failed to download zone stats report: {e}')