diff --git a/czds/__init__.py b/czds/__init__.py index e4bf1a2..72531f5 100644 --- a/czds/__init__.py +++ b/czds/__init__.py @@ -5,7 +5,7 @@ from .client import CZDS -__version__ = '1.3.0' +__version__ = '1.3.1' __author__ = 'acidvegas' __email__ = 'acid.vegas@acid.vegas' __github__ = 'https://github.com/acidvegas/czds' \ No newline at end of file diff --git a/czds/__main__.py b/czds/__main__.py index f056a80..33b653f 100644 --- a/czds/__main__.py +++ b/czds/__main__.py @@ -7,7 +7,6 @@ import asyncio import getpass import logging import os -import time from .client import CZDS @@ -21,14 +20,12 @@ async def main(): # Authentication parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username') parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password') - parser.add_argument('-o', '--output', default=os.getcwd(), help='Output directory') + parser.add_argument('-o', '--output', default=os.getcwd(), help='Output directory') # Zone download options zone_group = parser.add_argument_group('Zone download options') zone_group.add_argument('-z', '--zones', action='store_true', help='Download zone files') zone_group.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads') - zone_group.add_argument('-d', '--decompress', action='store_true', help='Decompress zone files after download') - zone_group.add_argument('-k', '--keep', action='store_true', help='Keep the original gzip files after decompression') # Report options report_group = parser.add_argument_group('Report options') @@ -39,6 +36,7 @@ async def main(): # Parse arguments args = parser.parse_args() + # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Get username and password @@ -46,20 +44,25 @@ async def main(): password = args.password or getpass.getpass('ICANN Password: ') # Create output directory - now = time.strftime('%Y-%m-%d') - output_directory = os.path.join(args.output, 'zones', now) + output_directory = os.path.join(args.output, 'zones') os.makedirs(output_directory, exist_ok=True) logging.info('Authenticating with ICANN API...') + # Create the CZDS client async with CZDS(username, password) as client: # Download zone stats report if requested if args.report: logging.info('Fetching zone stats report...') try: + # Create the report directory output = os.path.join(output_directory, '.report.csv') + + # Download the report await client.get_report(output, scrub=args.scrub, format=args.format) + logging.info(f'Zone stats report saved to {output}') + return except Exception as e: raise Exception(f'Failed to download zone stats report: {e}') @@ -68,13 +71,15 @@ async def main(): if args.zones: logging.info('Downloading zone files...') try: - await client.download_zones(output_directory, args.concurrency, decompress=args.decompress, cleanup=not args.keep) + # Download the zone files + await client.download_zones(output_directory, args.concurrency) except Exception as e: raise Exception(f'Failed to download zone files: {e}') def cli_entry(): '''Synchronous entry point for console script''' + return asyncio.run(main()) diff --git a/czds/client.py b/czds/client.py index dbb4d30..1d1c44d 100644 --- a/czds/client.py +++ b/czds/client.py @@ -3,10 +3,9 @@ # czds/client.py import asyncio -import gzip +import json import logging import os -import io try: import aiohttp @@ -18,6 +17,13 @@ try: except ImportError: raise ImportError('missing aiofiles library (pip install aiofiles)') +try: + from tqdm import tqdm +except ImportError: + raise ImportError('missing tqdm library (pip install tqdm)') + +from .utils import gzip_decompress, humanize_bytes + # Configure logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) @@ -33,29 +39,40 @@ class CZDS: :param username: ICANN Username :param password: ICANN Password ''' - + + # Set the username and password self.username = username self.password = password - # Configure longer timeouts and proper SSL settings - timeout = aiohttp.ClientTimeout(total=None, connect=60, sock_connect=60, sock_read=60) - self.session = aiohttp.ClientSession(timeout=timeout) + # Set the session with longer timeouts + self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None, connect=60, sock_connect=60, sock_read=60)) + + # Placeholder for the headers after authentication self.headers = None logging.info('Initialized CZDS client') + async def __aenter__(self): '''Async context manager entry''' + + # Authenticate with the ICANN API await self.authenticate() + return self + async def __aexit__(self, exc_type, exc_val, exc_tb): '''Async context manager exit''' + + # Close the client session await self.close() + async def close(self): '''Close the client session''' + # Close the client session if it exists if self.session: await self.session.close() logging.debug('Closed aiohttp session') @@ -64,43 +81,46 @@ class CZDS: async def authenticate(self) -> str: '''Authenticate with the ICANN API and return the access token''' - try: - data = {'username': self.username, 'password': self.password} - logging.info('Authenticating with ICANN API') + # Set the data to be sent to the API + data = {'username': self.username, 'password': self.password} - async with self.session.post('https://account-api.icann.org/api/authenticate', json=data) as response: - if response.status != 200: - error_msg = f'Authentication failed: {response.status} {await response.text()}' - logging.error(error_msg) - raise Exception(error_msg) + logging.info('Authenticating with ICANN API...') - result = await response.json() - logging.info('Successfully authenticated with ICANN API') - self.headers = {'Authorization': f'Bearer {result["accessToken"]}'} - return result['accessToken'] + # Send the request to the API + async with self.session.post('https://account-api.icann.org/api/authenticate', json=data) as response: + if response.status != 200: + raise Exception(f'Authentication failed: {response.status} {await response.text()}') - except Exception as e: - error_msg = f'Failed to authenticate with ICANN API: {e}' - logging.error(error_msg) - raise Exception(error_msg) + # Get the result from the API + result = await response.json() + + logging.info('Successfully authenticated with ICANN API') + + # Set the headers for the API requests + self.headers = {'Authorization': f'Bearer {result["accessToken"]}'} + + return result['accessToken'] async def fetch_zone_links(self) -> list: '''Fetch the list of zone files available for download''' - logging.info('Fetching zone links') + logging.info('Fetching zone file links...') + + # Send the request to the API async with self.session.get('https://czds-api.icann.org/czds/downloads/links', headers=self.headers) as response: if response.status != 200: - error_msg = f'Failed to fetch zone links: {response.status} {await response.text()}' - logging.error(error_msg) - raise Exception(error_msg) + raise Exception(f'Failed to fetch zone links: {response.status} {await response.text()}') + # Get the result from the API links = await response.json() + logging.info(f'Successfully fetched {len(links):,} zone links') + return links - async def get_report(self, filepath: str = None, scrub: bool = True, format: str = 'csv') -> str | dict: + async def get_report(self, filepath: str = None, format: str = 'csv') -> str | dict: ''' Downloads the zone report stats from the API and scrubs the report for privacy @@ -110,210 +130,165 @@ class CZDS: ''' logging.info('Downloading zone stats report') - async with self.session.get('https://czds-api.icann.org/czds/requests/report', headers=self.headers) as response: - if response.status != 200: - error_msg = f'Failed to download the zone stats report: {response.status} {await response.text()}' - logging.error(error_msg) - raise Exception(error_msg) + # Send the request to the API + async with self.session.get('https://czds-api.icann.org/czds/requests/report', headers=self.headers) as response: + # Check if the request was successful + if response.status != 200: + raise Exception(f'Failed to download the zone stats report: {response.status} {await response.text()}') + + # Get the content of the report content = await response.text() - if scrub: - content = content.replace(self.username, 'nobody@no.name') - logging.debug('Scrubbed username from report') + # Scrub the username from the report + content = content.replace(self.username, 'nobody@no.name') + logging.debug('Scrubbed username from report') + # Convert the report to JSON format if requested (default is CSV) if format.lower() == 'json': - rows = [row.split(',') for row in content.strip().split('\n')] - header = rows[0] - content = [dict(zip(header, row)) for row in rows[1:]] + content = json.dumps(content, indent=4) logging.debug('Converted report to JSON format') + # Save the report to a file if a filepath is provided if filepath: async with aiofiles.open(filepath, 'w') as file: - if format.lower() == 'json': - import json - await file.write(json.dumps(content, indent=4)) - else: - await file.write(content) - logging.info(f'Saved report to {filepath}') + await file.write(content) + logging.info(f'Saved report to {filepath}') return content - async def gzip_decompress(self, filepath: str, cleanup: bool = True): - ''' - Decompress a gzip file in place - - :param filepath: Path to the gzip file - :param cleanup: Whether to remove the original gzip file after decompressions - ''' - - logging.debug(f'Decompressing {filepath}') - output_path = filepath[:-3] # Remove .gz extension - chunk_size = 1024 * 1024 # 1MB chunks - - try: - with gzip.open(filepath, 'rb') as gz: - async with aiofiles.open(output_path, 'wb') as f_out: - while True: - chunk = gz.read(chunk_size) - if not chunk: - break - await f_out.write(chunk) - - if cleanup: - os.remove(filepath) - logging.debug(f'Removed original gzip file: {filepath}') - - except Exception as e: - error_msg = f'Failed to decompress {filepath}: {str(e)}' - logging.error(error_msg) - # Clean up any partial files - if os.path.exists(output_path): - os.remove(output_path) - raise Exception(error_msg) - - - async def download_zone(self, url: str, output_directory: str, decompress: bool = False, cleanup: bool = True, semaphore: asyncio.Semaphore = None): + async def download_zone(self, url: str, output_directory: str, semaphore: asyncio.Semaphore): ''' Download a single zone file :param url: URL to download :param output_directory: Directory to save the zone file - :param decompress: Whether to decompress the gzip file after download - :param cleanup: Whether to remove the original gzip file after decompression :param semaphore: Optional semaphore for controlling concurrency ''' async def _download(): - tld = url.split('/')[-1].split('.')[0] # Extract TLD from URL - max_retries = 3 - retry_delay = 5 # seconds + tld_name = url.split('/')[-1].split('.')[0] # Extract TLD from URL + max_retries = 10 # Maximum number of retries for failed downloads + retry_delay = 5 # Delay between retries in seconds + timeout = aiohttp.ClientTimeout(total=120) # Timeout for the download + # Start the attempt loop for attempt in range(max_retries): try: - logging.info(f'Starting download of {tld} zone file{" (attempt " + str(attempt + 1) + ")" if attempt > 0 else ""}') - - async with self.session.get(url, headers=self.headers, timeout=aiohttp.ClientTimeout(total=3600)) as response: + logging.info(f'Starting download of {tld_name} zone file{" (attempt " + str(attempt + 1) + ")" if attempt > 0 else ""}') + + # Send the request to the API + async with self.session.get(url, headers=self.headers, timeout=timeout) as response: + # Check if the request was successful if response.status != 200: - error_msg = f'Failed to download {tld}: {response.status} {await response.text()}' - logging.error(error_msg) + logging.error(f'Failed to download {tld_name}: {response.status} {await response.text()}') + + # Retry the download if there are more attempts if attempt + 1 < max_retries: - logging.info(f'Retrying {tld} in {retry_delay} seconds...') + logging.info(f'Retrying {tld_name} in {retry_delay:,} seconds...') await asyncio.sleep(retry_delay) continue - raise Exception(error_msg) + + raise Exception(f'Failed to download {tld_name}: {response.status} {await response.text()}') # Get expected file size from headers - expected_size = int(response.headers.get('Content-Length', 0)) - if not expected_size: - logging.warning(f'No Content-Length header for {tld}') + if not (expected_size := int(response.headers.get('Content-Length', 0))): + raise ValueError(f'Missing Content-Length header for {tld_name}') + # Check if the Content-Disposition header is present if not (content_disposition := response.headers.get('Content-Disposition')): - error_msg = f'Missing Content-Disposition header for {tld}' - logging.error(error_msg) - raise ValueError(error_msg) + raise ValueError(f'Missing Content-Disposition header for {tld_name}') + # Extract the filename from the Content-Disposition header filename = content_disposition.split('filename=')[-1].strip('"') + + # Create the filepath filepath = os.path.join(output_directory, filename) - async with aiofiles.open(filepath, 'wb') as file: - total_size = 0 - last_progress = 0 - try: - async for chunk in response.content.iter_chunked(8192): - await file.write(chunk) - total_size += len(chunk) - if expected_size: - progress = int((total_size / expected_size) * 100) - if progress >= last_progress + 5: - logging.info(f'Downloading {tld}: {progress}% ({total_size:,}/{expected_size:,} bytes)') - last_progress = progress - except (asyncio.TimeoutError, aiohttp.ClientError) as e: - logging.error(f'Connection error while downloading {tld}: {str(e)}') - if attempt + 1 < max_retries: - logging.info(f'Retrying {tld} in {retry_delay} seconds...') - await asyncio.sleep(retry_delay) - continue - raise + # Create a progress bar to track the download + with tqdm(total=expected_size, unit='B', unit_scale=True, desc=f'Downloading {tld_name}', leave=False) as pbar: + # Open the file for writing + async with aiofiles.open(filepath, 'wb') as file: + # Initialize the total size for tracking + total_size = 0 + + # Write the chunk to the file + try: + async for chunk in response.content.iter_chunked(8192): + await file.write(chunk) + total_size += len(chunk) + pbar.update(len(chunk)) + except (asyncio.TimeoutError, aiohttp.ClientError) as e: + logging.error(f'Connection error while downloading {tld_name}: {str(e)}') + if attempt + 1 < max_retries: + logging.info(f'Retrying {tld_name} in {retry_delay} seconds...') + await asyncio.sleep(retry_delay) + continue + raise # Verify file size if expected_size and total_size != expected_size: - error_msg = f'Incomplete download for {tld}: Got {total_size} bytes, expected {expected_size} bytes' + error_msg = f'Incomplete download for {tld_name}: Got {humanize_bytes(total_size)}, expected {humanize_bytes(expected_size)}' logging.error(error_msg) os.remove(filepath) if attempt + 1 < max_retries: - logging.info(f'Retrying {tld} in {retry_delay} seconds...') + logging.info(f'Retrying {tld_name} in {retry_delay} seconds...') await asyncio.sleep(retry_delay) continue raise Exception(error_msg) - size_mb = total_size / (1024 * 1024) - logging.info(f'Successfully downloaded {tld} zone file ({size_mb:.2f} MB)') + logging.info(f'Successfully downloaded {tld_name} zone file ({humanize_bytes(total_size)})') - if decompress: - try: - with gzip.open(filepath, 'rb') as test_gzip: - test_gzip.read(1) - - await self.gzip_decompress(filepath, cleanup) - filepath = filepath[:-3] - logging.info(f'Decompressed {tld} zone file') - except (gzip.BadGzipFile, OSError) as e: - error_msg = f'Failed to decompress {tld}: {str(e)}' - logging.error(error_msg) - os.remove(filepath) - raise Exception(error_msg) + await gzip_decompress(filepath) + filepath = filepath[:-3] + logging.info(f'Decompressed {tld_name} zone file') return filepath except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt + 1 >= max_retries: - logging.error(f'Failed to download {tld} after {max_retries} attempts: {str(e)}') + logging.error(f'Failed to download {tld_name} after {max_retries} attempts: {str(e)}') if 'filepath' in locals() and os.path.exists(filepath): os.remove(filepath) raise - logging.warning(f'Download attempt {attempt + 1} failed for {tld}: {str(e)}') + logging.warning(f'Download attempt {attempt + 1} failed for {tld_name}: {str(e)}') await asyncio.sleep(retry_delay) except Exception as e: - logging.error(f'Error downloading {tld}: {str(e)}') + logging.error(f'Error downloading {tld_name}: {str(e)}') if 'filepath' in locals() and os.path.exists(filepath): os.remove(filepath) raise - if semaphore: - async with semaphore: - return await _download() - else: + async with semaphore: return await _download() - async def download_zones(self, output_directory: str, concurrency: int, decompress: bool = False, cleanup: bool = True): + async def download_zones(self, output_directory: str, concurrency: int): ''' Download multiple zone files concurrently :param output_directory: Directory to save the zone files :param concurrency: Number of concurrent downloads - :param decompress: Whether to decompress the gzip files after download - :param cleanup: Whether to remove the original gzip files after decompression ''' # Create the output directory if it doesn't exist os.makedirs(output_directory, exist_ok=True) - logging.info(f'Starting concurrent download of zones with concurrency={concurrency}') - # Get the zone links zone_links = await self.fetch_zone_links() + zone_links.sort() # Sort the zone alphabetically for better tracking # Create a semaphore to limit the number of concurrent downloads semaphore = asyncio.Semaphore(concurrency) + logging.info(f'Downloading {len(zone_links):,} zone files...') + # Create a list of tasks to download the zone files - tasks = [self.download_zone(url, output_directory, decompress, cleanup, semaphore) for url in zone_links] + tasks = [self.download_zone(url, output_directory, semaphore) for url in zone_links] # Run the tasks concurrently await asyncio.gather(*tasks) - logging.info('Completed downloading all zone files') + logging.info(f'Completed downloading {len(zone_links):,} zone files') \ No newline at end of file diff --git a/czds/utils.py b/czds/utils.py new file mode 100644 index 0000000..c222671 --- /dev/null +++ b/czds/utils.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds) +# czds/utils.py + +import gzip +import logging +import os + +try: + import aiofiles +except ImportError: + raise ImportError('missing aiofiles library (pip install aiofiles)') + +try: + from tqdm import tqdm +except ImportError: + raise ImportError('missing tqdm library (pip install tqdm)') + + +async def gzip_decompress(filepath: str, cleanup: bool = True): + ''' + Decompress a gzip file in place + + :param filepath: Path to the gzip file + :param cleanup: Whether to remove the original gzip file after decompressions + ''' + + # Get the original size of the file + original_size = os.path.getsize(filepath) + + logging.debug(f'Decompressing {filepath} ({humanize_bytes(original_size)})...') + + # Remove the .gz extension + output_path = filepath[:-3] + + # Set the chunk size to 25MB + chunk_size = 25 * 1024 * 1024 + + # Create progress bar for decompression + with tqdm(total=original_size, unit='B', unit_scale=True, desc=f'Decompressing {os.path.basename(filepath)}', leave=False) as pbar: + # Decompress the file + with gzip.open(filepath, 'rb') as gz: + async with aiofiles.open(output_path, 'wb') as f_out: + while True: + # Read the next chunk + chunk = gz.read(chunk_size) + + # If the chunk is empty, break + if not chunk: + break + + # Write the chunk to the output file + await f_out.write(chunk) + + # Update the progress bar + pbar.update(len(chunk)) + + # Get the decompressed size of the file + decompressed_size = os.path.getsize(output_path) + + logging.debug(f'Decompressed {filepath} ({humanize_bytes(decompressed_size)})') + + # If the cleanup flag is set, remove the original gzip file + if cleanup: + os.remove(filepath) + logging.debug(f'Removed original gzip file: {filepath}') + + +def humanize_bytes(bytes: int) -> str: + ''' + Humanize a number of bytes + + :param bytes: The number of bytes to humanize + ''' + + # List of units + units = ('B','KB','MB','GB','TB','PB','EB','ZB','YB') + + # Iterate over the units + for unit in units: + # If the bytes are less than 1024, return the bytes with the unit + if bytes < 1024: + return f'{bytes:.2f} {unit}' if unit != 'B' else f'{bytes} {unit}' + + # Divide the bytes by 1024 + bytes /= 1024 + + return f'{bytes:.2f} {units[-1]}' \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index c021ffd..4b848e7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ aiohttp -aiofiles \ No newline at end of file +aiofiles +tqdm \ No newline at end of file diff --git a/setup.py b/setup.py index e8f8cbb..b64210a 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ with open('README.md', 'r', encoding='utf-8') as fh: setup( name='czds-api', - version='1.3.0', + version='1.3.1', author='acidvegas', author_email='acid.vegas@acid.vegas', description='ICANN API for the Centralized Zones Data Service',