Compare commits

...

11 Commits
v1.2.8 ... main

7 changed files with 266 additions and 159 deletions

View File

@ -39,8 +39,6 @@ czds [-h] [-u USERNAME] [-p PASSWORD] [-z] [-c CONCURRENCY] [-d] [-k] [-r] [-s]
###### Zone Options ###### Zone Options
| `-z`, `--zones` | Download zone files | | | `-z`, `--zones` | Download zone files | |
| `-c`, `--concurrency` | Number of concurrent downloads | `3` | | `-c`, `--concurrency` | Number of concurrent downloads | `3` |
| `-d`, `--decompress` | Decompress zone files after download | |
| `-k`, `--keep` | Keep original gzip files after decompression | |
###### Report Options ###### Report Options
| `-r`, `--report` | Download the zone stats report | | | `-r`, `--report` | Download the zone stats report | |

View File

@ -4,8 +4,7 @@
from .client import CZDS from .client import CZDS
__version__ = '1.3.8'
__version__ = '1.2.8'
__author__ = 'acidvegas' __author__ = 'acidvegas'
__email__ = 'acid.vegas@acid.vegas' __email__ = 'acid.vegas@acid.vegas'
__github__ = 'https://github.com/acidvegas/czds' __github__ = 'https://github.com/acidvegas/czds'

View File

@ -7,7 +7,6 @@ import asyncio
import getpass import getpass
import logging import logging
import os import os
import time
from .client import CZDS from .client import CZDS
@ -21,14 +20,12 @@ async def main():
# Authentication # Authentication
parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username') parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username')
parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password') parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password')
parser.add_argument('-o', '--output', default=os.getcwd(), help='Output directory') parser.add_argument('-o', '--output', default=os.getcwd(), help='Output directory')
# Zone download options # Zone download options
zone_group = parser.add_argument_group('Zone download options') zone_group = parser.add_argument_group('Zone download options')
zone_group.add_argument('-z', '--zones', action='store_true', help='Download zone files') zone_group.add_argument('-z', '--zones', action='store_true', help='Download zone files')
zone_group.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads') zone_group.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads')
zone_group.add_argument('-d', '--decompress', action='store_true', help='Decompress zone files after download')
zone_group.add_argument('-k', '--keep', action='store_true', help='Keep the original gzip files after decompression')
# Report options # Report options
report_group = parser.add_argument_group('Report options') report_group = parser.add_argument_group('Report options')
@ -39,6 +36,7 @@ async def main():
# Parse arguments # Parse arguments
args = parser.parse_args() args = parser.parse_args()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Get username and password # Get username and password
@ -46,20 +44,25 @@ async def main():
password = args.password or getpass.getpass('ICANN Password: ') password = args.password or getpass.getpass('ICANN Password: ')
# Create output directory # Create output directory
now = time.strftime('%Y-%m-%d') output_directory = os.path.join(args.output, 'zones')
output_directory = os.path.join(args.output, 'zones', now)
os.makedirs(output_directory, exist_ok=True) os.makedirs(output_directory, exist_ok=True)
logging.info('Authenticating with ICANN API...') logging.info('Authenticating with ICANN API...')
# Create the CZDS client
async with CZDS(username, password) as client: async with CZDS(username, password) as client:
# Download zone stats report if requested # Download zone stats report if requested
if args.report: if args.report:
logging.info('Fetching zone stats report...') logging.info('Fetching zone stats report...')
try: try:
# Create the report directory
output = os.path.join(output_directory, '.report.csv') output = os.path.join(output_directory, '.report.csv')
# Download the report
await client.get_report(output, scrub=args.scrub, format=args.format) await client.get_report(output, scrub=args.scrub, format=args.format)
logging.info(f'Zone stats report saved to {output}') logging.info(f'Zone stats report saved to {output}')
return return
except Exception as e: except Exception as e:
raise Exception(f'Failed to download zone stats report: {e}') raise Exception(f'Failed to download zone stats report: {e}')
@ -68,13 +71,15 @@ async def main():
if args.zones: if args.zones:
logging.info('Downloading zone files...') logging.info('Downloading zone files...')
try: try:
await client.download_zones(output_directory, args.concurrency, decompress=args.decompress, cleanup=not args.keep) # Download the zone files
await client.download_zones(output_directory, args.concurrency)
except Exception as e: except Exception as e:
raise Exception(f'Failed to download zone files: {e}') raise Exception(f'Failed to download zone files: {e}')
def cli_entry(): def cli_entry():
'''Synchronous entry point for console script''' '''Synchronous entry point for console script'''
return asyncio.run(main()) return asyncio.run(main())

View File

@ -3,9 +3,10 @@
# czds/client.py # czds/client.py
import asyncio import asyncio
import gzip import json
import logging import logging
import os import os
import csv
import io import io
try: try:
@ -18,6 +19,13 @@ try:
except ImportError: except ImportError:
raise ImportError('missing aiofiles library (pip install aiofiles)') raise ImportError('missing aiofiles library (pip install aiofiles)')
try:
from tqdm import tqdm
except ImportError:
raise ImportError('missing tqdm library (pip install tqdm)')
from .utils import gzip_decompress, humanize_bytes
# Configure logging # Configure logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
@ -34,25 +42,52 @@ class CZDS:
:param password: ICANN Password :param password: ICANN Password
''' '''
# Set the username and password
self.username = username self.username = username
self.password = password self.password = password
self.session = aiohttp.ClientSession()
self.headers = None # Configure TCP keepalive
connector = aiohttp.TCPConnector(
keepalive_timeout=300, # Keep connections alive for 5 minutes
force_close=False, # Don't force close connections
enable_cleanup_closed=True, # Cleanup closed connections
ttl_dns_cache=300, # Cache DNS results for 5 minutes
)
# Set the session with longer timeouts and keepalive
self.session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=None, connect=60, sock_connect=60, sock_read=None),
headers={'Connection': 'keep-alive'},
raise_for_status=True
)
# Placeholder for the headers after authentication
self.headers = None
logging.info('Initialized CZDS client') logging.info('Initialized CZDS client')
async def __aenter__(self): async def __aenter__(self):
'''Async context manager entry''' '''Async context manager entry'''
# Authenticate with the ICANN API
await self.authenticate() await self.authenticate()
return self return self
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
'''Async context manager exit''' '''Async context manager exit'''
# Close the client session
await self.close() await self.close()
async def close(self): async def close(self):
'''Close the client session''' '''Close the client session'''
# Close the client session if it exists
if self.session: if self.session:
await self.session.close() await self.session.close()
logging.debug('Closed aiohttp session') logging.debug('Closed aiohttp session')
@ -61,231 +96,224 @@ class CZDS:
async def authenticate(self) -> str: async def authenticate(self) -> str:
'''Authenticate with the ICANN API and return the access token''' '''Authenticate with the ICANN API and return the access token'''
try: # Set the data to be sent to the API
data = {'username': self.username, 'password': self.password} data = {'username': self.username, 'password': self.password}
logging.info('Authenticating with ICANN API')
async with self.session.post('https://account-api.icann.org/api/authenticate', json=data) as response: logging.info('Authenticating with ICANN API...')
if response.status != 200:
error_msg = f'Authentication failed: {response.status} {await response.text()}'
logging.error(error_msg)
raise Exception(error_msg)
result = await response.json() # Send the request to the API
logging.info('Successfully authenticated with ICANN API') async with self.session.post('https://account-api.icann.org/api/authenticate', json=data) as response:
self.headers = {'Authorization': f'Bearer {result["accessToken"]}'} if response.status != 200:
return result['accessToken'] raise Exception(f'Authentication failed: {response.status} {await response.text()}')
except Exception as e: # Get the result from the API
error_msg = f'Failed to authenticate with ICANN API: {e}' result = await response.json()
logging.error(error_msg)
raise Exception(error_msg) logging.info('Successfully authenticated with ICANN API')
# Set the headers for the API requests
self.headers = {'Authorization': f'Bearer {result["accessToken"]}'}
return result['accessToken']
async def fetch_zone_links(self) -> list: async def fetch_zone_links(self) -> list:
'''Fetch the list of zone files available for download''' '''Fetch the list of zone files available for download'''
logging.info('Fetching zone links') logging.info('Fetching zone file links...')
# Send the request to the API
async with self.session.get('https://czds-api.icann.org/czds/downloads/links', headers=self.headers) as response: async with self.session.get('https://czds-api.icann.org/czds/downloads/links', headers=self.headers) as response:
if response.status != 200: if response.status != 200:
error_msg = f'Failed to fetch zone links: {response.status} {await response.text()}' raise Exception(f'Failed to fetch zone links: {response.status} {await response.text()}')
logging.error(error_msg)
raise Exception(error_msg)
# Get the result from the API
links = await response.json() links = await response.json()
logging.info(f'Successfully fetched {len(links):,} zone links') logging.info(f'Successfully fetched {len(links):,} zone links')
return links return links
async def get_report(self, filepath: str = None, scrub: bool = True, format: str = 'csv') -> str | dict: async def get_report(self, filepath: str = None, format: str = 'csv') -> str | dict:
''' '''
Downloads the zone report stats from the API and scrubs the report for privacy Downloads the zone report stats from the API and scrubs the report for privacy
:param filepath: Filepath to save the scrubbed report :param filepath: Filepath to save the scrubbed report
:param scrub: Whether to scrub the username from the report
:param format: Output format ('csv' or 'json') :param format: Output format ('csv' or 'json')
''' '''
logging.info('Downloading zone stats report') logging.info('Downloading zone stats report')
# Send the request to the API
async with self.session.get('https://czds-api.icann.org/czds/requests/report', headers=self.headers) as response: async with self.session.get('https://czds-api.icann.org/czds/requests/report', headers=self.headers) as response:
if response.status != 200: if response.status != 200:
error_msg = f'Failed to download the zone stats report: {response.status} {await response.text()}' raise Exception(f'Failed to download the zone stats report: {response.status} {await response.text()}')
logging.error(error_msg)
raise Exception(error_msg)
# Get the content of the report
content = await response.text() content = await response.text()
if scrub: # Scrub the username from the report
content = content.replace(self.username, 'nobody@no.name') content = content.replace(self.username, 'nobody@no.name')
logging.debug('Scrubbed username from report') logging.debug('Scrubbed username from report')
# Convert the report to JSON format if requested
if format.lower() == 'json': if format.lower() == 'json':
rows = [row.split(',') for row in content.strip().split('\n')] # Parse CSV content
header = rows[0] csv_reader = csv.DictReader(io.StringIO(content))
content = [dict(zip(header, row)) for row in rows[1:]]
# Convert to list of dicts with formatted keys
json_data = []
for row in csv_reader:
formatted_row = {
key.lower().replace(' ', '_'): value
for key, value in row.items()
}
json_data.append(formatted_row)
content = json.dumps(json_data, indent=4)
logging.debug('Converted report to JSON format') logging.debug('Converted report to JSON format')
# Save the report to a file if a filepath is provided
if filepath: if filepath:
async with aiofiles.open(filepath, 'w') as file: async with aiofiles.open(filepath, 'w') as file:
if format.lower() == 'json': await file.write(content)
import json logging.info(f'Saved report to {filepath}')
await file.write(json.dumps(content, indent=4))
else:
await file.write(content)
logging.info(f'Saved report to {filepath}')
return content return content
async def gzip_decompress(self, filepath: str, cleanup: bool = True): async def download_zone(self, url: str, output_directory: str, semaphore: asyncio.Semaphore):
'''
Decompress a gzip file in place
:param filepath: Path to the gzip file
:param cleanup: Whether to remove the original gzip file after decompressions
'''
logging.debug(f'Decompressing {filepath}')
output_path = filepath[:-3] # Remove .gz extension
try:
async with aiofiles.open(filepath, 'rb') as f_in:
content = await f_in.read()
# Use BytesIO to handle the content as a file-like object
with io.BytesIO(content) as bytes_io:
with gzip.GzipFile(fileobj=bytes_io, mode='rb') as gz:
decompressed_content = gz.read()
async with aiofiles.open(output_path, 'wb') as f_out:
await f_out.write(decompressed_content)
if cleanup:
os.remove(filepath)
logging.debug(f'Removed original gzip file: {filepath}')
except Exception as e:
error_msg = f'Failed to decompress {filepath}: {str(e)}'
logging.error(error_msg)
# Clean up any partial files
if os.path.exists(output_path):
os.remove(output_path)
raise Exception(error_msg)
async def download_zone(self, url: str, output_directory: str, decompress: bool = False, cleanup: bool = True, semaphore: asyncio.Semaphore = None):
''' '''
Download a single zone file Download a single zone file
:param url: URL to download :param url: URL to download
:param output_directory: Directory to save the zone file :param output_directory: Directory to save the zone file
:param decompress: Whether to decompress the gzip file after download
:param cleanup: Whether to remove the original gzip file after decompression
:param semaphore: Optional semaphore for controlling concurrency :param semaphore: Optional semaphore for controlling concurrency
''' '''
async def _download(): async def _download():
tld = url.split('/')[-1].split('.')[0] # Extract TLD from URL tld_name = url.split('/')[-1].split('.')[0] # Extract TLD from URL
logging.info(f'Starting download of {tld} zone file') max_retries = 20 # Maximum number of retries for failed downloads
retry_delay = 5 # Delay between retries in seconds
try: # Headers for better connection stability
async with self.session.get(url, headers=self.headers) as response: download_headers = {
if response.status != 200: **self.headers,
error_msg = f'Failed to download {tld}: {response.status} {await response.text()}' 'Connection': 'keep-alive',
logging.error(error_msg) 'Keep-Alive': 'timeout=600', # 10 minutes
raise Exception(error_msg) 'Accept-Encoding': 'gzip'
}
# Get expected file size from headers # Start the attempt loop
expected_size = int(response.headers.get('Content-Length', 0)) for attempt in range(max_retries):
if not expected_size: try:
logging.warning(f'No Content-Length header for {tld}') logging.info(f'Starting download of {tld_name} zone file{" (attempt " + str(attempt + 1) + ")" if attempt > 0 else ""}')
if not (content_disposition := response.headers.get('Content-Disposition')): # Send the request to the API
error_msg = f'Missing Content-Disposition header for {tld}' async with self.session.get(url, headers=download_headers) as response:
logging.error(error_msg) # Check if the request was successful
raise ValueError(error_msg) if response.status != 200:
logging.error(f'Failed to download {tld_name}: {response.status} {await response.text()}')
filename = content_disposition.split('filename=')[-1].strip('"') # Retry the download if there are more attempts
filepath = os.path.join(output_directory, filename) if attempt + 1 < max_retries:
logging.info(f'Retrying {tld_name} in {retry_delay:,} seconds...')
await asyncio.sleep(retry_delay)
continue
async with aiofiles.open(filepath, 'wb') as file: raise Exception(f'Failed to download {tld_name}: {response.status} {await response.text()}')
total_size = 0
last_progress = 0 # Get expected file size from headers
async for chunk in response.content.iter_chunked(8192): if not (expected_size := int(response.headers.get('Content-Length', 0))):
await file.write(chunk) raise ValueError(f'Missing Content-Length header for {tld_name}')
total_size += len(chunk)
if expected_size: # Check if the Content-Disposition header is present
progress = int((total_size / expected_size) * 100) if not (content_disposition := response.headers.get('Content-Disposition')):
if progress >= last_progress + 5: # Log every 5% increase raise ValueError(f'Missing Content-Disposition header for {tld_name}')
logging.info(f'Downloading {tld}: {progress}% ({total_size:,}/{expected_size:,} bytes)')
last_progress = progress # Extract the filename from the Content-Disposition header
filename = content_disposition.split('filename=')[-1].strip('"')
# Create the filepath
filepath = os.path.join(output_directory, filename)
# Create a progress bar to track the download
with tqdm(total=expected_size, unit='B', unit_scale=True, desc=f'Downloading {tld_name}', leave=False) as pbar:
# Open the file for writing
async with aiofiles.open(filepath, 'wb') as file:
# Initialize the total size for tracking
total_size = 0
# Write the chunk to the file
try:
async for chunk in response.content.iter_chunked(8192):
await file.write(chunk)
total_size += len(chunk)
pbar.update(len(chunk))
except Exception as e:
logging.error(f'Connection error while downloading {tld_name}: {str(e)}')
if attempt + 1 < max_retries:
logging.info(f'Retrying {tld_name} in {retry_delay} seconds...')
await asyncio.sleep(retry_delay)
continue
raise
# Verify file size # Verify file size
if expected_size and total_size != expected_size: if expected_size and total_size != expected_size:
error_msg = f'Incomplete download for {tld}: Got {total_size} bytes, expected {expected_size} bytes' error_msg = f'Incomplete download for {tld_name}: Got {humanize_bytes(total_size)}, expected {humanize_bytes(expected_size)}'
logging.error(error_msg) logging.error(error_msg)
os.remove(filepath) # Clean up incomplete file os.remove(filepath)
if attempt + 1 < max_retries:
logging.info(f'Retrying {tld_name} in {retry_delay} seconds...')
await asyncio.sleep(retry_delay)
continue
raise Exception(error_msg) raise Exception(error_msg)
size_mb = total_size / (1024 * 1024) logging.info(f'Successfully downloaded {tld_name} zone file ({humanize_bytes(total_size)})')
logging.info(f'Successfully downloaded {tld} zone file ({size_mb:.2f} MB)')
if decompress: await gzip_decompress(filepath)
try: filepath = filepath[:-3]
# Verify gzip integrity before decompressing logging.info(f'Decompressed {tld_name} zone file')
with gzip.open(filepath, 'rb') as test_gzip:
test_gzip.read(1) # Try reading first byte to verify gzip integrity
await self.gzip_decompress(filepath, cleanup) return filepath
filepath = filepath[:-3] # Remove .gz extension
logging.info(f'Decompressed {tld} zone file')
except (gzip.BadGzipFile, OSError) as e:
error_msg = f'Failed to decompress {tld}: {str(e)}'
logging.error(error_msg)
os.remove(filepath) # Clean up corrupted file
raise Exception(error_msg)
return filepath except Exception as e:
if attempt + 1 >= max_retries:
logging.error(f'Failed to download {tld_name} after {max_retries} attempts: {str(e)}')
if 'filepath' in locals() and os.path.exists(filepath):
os.remove(filepath)
raise
logging.warning(f'Download attempt {attempt + 1} failed for {tld_name}: {str(e)}')
await asyncio.sleep(retry_delay)
except Exception as e: async with semaphore:
logging.error(f'Error downloading {tld}: {str(e)}')
# Clean up any partial downloads
if 'filepath' in locals() and os.path.exists(filepath):
os.remove(filepath)
raise
if semaphore:
async with semaphore:
return await _download()
else:
return await _download() return await _download()
async def download_zones(self, output_directory: str, concurrency: int, decompress: bool = False, cleanup: bool = True): async def download_zones(self, output_directory: str, concurrency: int):
''' '''
Download multiple zone files concurrently Download multiple zone files concurrently
:param output_directory: Directory to save the zone files :param output_directory: Directory to save the zone files
:param concurrency: Number of concurrent downloads :param concurrency: Number of concurrent downloads
:param decompress: Whether to decompress the gzip files after download
:param cleanup: Whether to remove the original gzip files after decompression
''' '''
# Create the output directory if it doesn't exist # Create the output directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True) os.makedirs(output_directory, exist_ok=True)
logging.info(f'Starting concurrent download of zones with concurrency={concurrency}')
# Get the zone links # Get the zone links
zone_links = await self.fetch_zone_links() zone_links = await self.fetch_zone_links()
zone_links.sort() # Sort the zone alphabetically for better tracking
# Create a semaphore to limit the number of concurrent downloads # Create a semaphore to limit the number of concurrent downloads
semaphore = asyncio.Semaphore(concurrency) semaphore = asyncio.Semaphore(concurrency)
logging.info(f'Downloading {len(zone_links):,} zone files...')
# Create a list of tasks to download the zone files # Create a list of tasks to download the zone files
tasks = [self.download_zone(url, output_directory, decompress, cleanup, semaphore) for url in zone_links] tasks = [self.download_zone(url, output_directory, semaphore) for url in zone_links]
# Run the tasks concurrently # Run the tasks concurrently
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
logging.info('Completed downloading all zone files') logging.info(f'Completed downloading {len(zone_links):,} zone files')

76
czds/utils.py Normal file
View File

@ -0,0 +1,76 @@
#!/usr/bin/env python3
# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
# czds/utils.py
import asyncio
import gzip
import logging
import os
try:
import aiofiles
except ImportError:
raise ImportError('missing aiofiles library (pip install aiofiles)')
try:
from tqdm import tqdm
except ImportError:
raise ImportError('missing tqdm library (pip install tqdm)')
async def gzip_decompress(filepath: str, cleanup: bool = True):
'''
Decompress a gzip file in place
:param filepath: Path to the gzip file
:param cleanup: Whether to remove the original gzip file after decompressions
'''
original_size = os.path.getsize(filepath)
output_path = filepath[:-3]
logging.debug(f'Decompressing {filepath} ({humanize_bytes(original_size)})...')
# Use a large chunk size (256MB) for maximum throughput
chunk_size = 256 * 1024 * 1024
# Run the actual decompression in a thread pool to prevent blocking
with tqdm(total=original_size, unit='B', unit_scale=True, desc=f'Decompressing {os.path.basename(filepath)}', leave=False) as pbar:
async with aiofiles.open(output_path, 'wb') as f_out:
# Run gzip decompression in thread pool since it's CPU-bound
loop = asyncio.get_event_loop()
with gzip.open(filepath, 'rb') as gz:
while True:
chunk = await loop.run_in_executor(None, gz.read, chunk_size)
if not chunk:
break
await f_out.write(chunk)
pbar.update(len(chunk))
decompressed_size = os.path.getsize(output_path)
logging.debug(f'Decompressed {filepath} ({humanize_bytes(decompressed_size)})')
if cleanup:
os.remove(filepath)
logging.debug(f'Removed original gzip file: {filepath}')
def humanize_bytes(bytes: int) -> str:
'''
Humanize a number of bytes
:param bytes: The number of bytes to humanize
'''
# List of units
units = ('B','KB','MB','GB','TB','PB','EB','ZB','YB')
# Iterate over the units
for unit in units:
# If the bytes are less than 1024, return the bytes with the unit
if bytes < 1024:
return f'{bytes:.2f} {unit}' if unit != 'B' else f'{bytes} {unit}'
# Divide the bytes by 1024
bytes /= 1024
return f'{bytes:.2f} {units[-1]}'

View File

@ -1,2 +1,3 @@
aiohttp aiohttp
aiofiles aiofiles
tqdm

View File

@ -11,7 +11,7 @@ with open('README.md', 'r', encoding='utf-8') as fh:
setup( setup(
name='czds-api', name='czds-api',
version='1.2.8', version='1.3.8',
author='acidvegas', author='acidvegas',
author_email='acid.vegas@acid.vegas', author_email='acid.vegas@acid.vegas',
description='ICANN API for the Centralized Zones Data Service', description='ICANN API for the Centralized Zones Data Service',