pypi ready

This commit is contained in:
Dionysus 2025-02-21 02:20:32 -05:00
parent c5bc87326b
commit ea2919b8ba
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
7 changed files with 270 additions and 200 deletions

View File

@ -1,4 +1,4 @@
# ICANN CZDS
# ICANN Centralized Zone Data Service API
The [ICANN Centralized Zone Data Service](https://czds.icann.org) *(CZDS)* allows *approved* users to request and download DNS zone files in bulk, provided they represent a legitimate company or academic institution and their intended use is legal and ethical. Once ICANN approves the request, this tool streamlines the retrieval of extensive domain name system data, facilitating research and security analysis in the realm of internet infrastructure.
@ -9,27 +9,44 @@ At the time of writing this repository, the CZDS offers access to 1,151 zones in
1,079 have been approved, 55 are still pending *(after 3 months)*, 10 have been revoked because the TLDs are longer active, and 6 have been denied. Zones that have expired automatically had the expiration extended for me without doing anything, aside from 13 zones that remained expired. I have included a recent [stats file](./extras/stats.csv) directly from my ICANN account.
## Installation
```bash
pip install czds-api
```
## Usage
### Authentication
Credentials may be provided interactively upon execution or via the `CZDS_USER` & `CZDS_PASS` environment variables:
###### Command line
```bash
czds [--username <username> --password <password>] [--concurrency <int>]
```
You can also set the `CZDS_USER` & `CZDS_PASS` environment variables to automatically authenticate:
```bash
export CZDS_USER='your_username'
export CZDS_PASS='your_password'
```
### Python version
```bash
python czds.py [--username <username> --password <password>] [--concurrency <int>]
```
###### As a Python module
```python
import os
### POSIX version
```bash
./czds
from czds import CZDS
CZDS_client = CZDS(username, password)
CZDS_client.download_report('report.csv')
zone_links = CZDS_client.fetch_zone_links()
os.makedirs('zones', exist_ok=True)
for zone_link in zone_links:
CZDS_client.download_zone(zone_link, 'zones')
```
## Respects & extras
While ICANN does have an official [czds-api-client-python](https://github.com/icann/czds-api-client-python) repository, I rewrote it from scratch to be more streamline & included a [POSIX version](./czds) for portability. There is some [official documentation](https://raw.githubusercontent.com/icann/czds-api-client-java/master/docs/ICANN_CZDS_api.pdf) that was referenced in the creation of the POSIX version. Either way, big props to ICANN for allowing me to use the CZDS for research purposes!
While ICANN does have an official [czds-api-client-python](https://github.com/icann/czds-api-client-python) repository, I rewrote it from scratch to be more streamline & included a [POSIX version](./extras/czds) for portability. There is some [official documentation](https://raw.githubusercontent.com/icann/czds-api-client-java/master/docs/ICANN_CZDS_api.pdf) that was referenced in the creation of the POSIX version. Either way, big props to ICANN for allowing me to use the CZDS for research purposes!
___

189
czds.py
View File

@ -1,189 +0,0 @@
#!/usr/bin/env python3
# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
# Reference: https://czds.icann.org
import argparse
import concurrent.futures
import getpass
import json
import logging
import os
import time
import urllib.request
class CZDS:
'''Class for the ICANN Centralized Zones Data Service'''
def __init__(self, username: str, password: str):
self.username = username
self.headers = {'Authorization': f'Bearer {self.authenticate(username, password)}'}
def authenticate(self, username: str, password: str) -> str:
'''
Authenticate with the ICANN API and return the access token.
:param username: ICANN Username
:param password: ICANN Password
'''
try:
# Prepare the request
data = json.dumps({'username': username, 'password': password}).encode('utf-8')
headers = {'Content-Type': 'application/json'}
request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers)
# Make the request
with urllib.request.urlopen(request) as response:
response = response.read().decode('utf-8')
return json.loads(response)['accessToken']
except Exception as e:
raise Exception(f'Failed to authenticate with ICANN API: {e}')
def fetch_zone_links(self) -> list:
'''
Fetch the list of zone files available for download.
:param token: ICANN access token
'''
# Create the request
request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=self.headers)
# Make the request
with urllib.request.urlopen(request) as response:
if response.status != 200:
raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}')
return json.loads(response.read().decode('utf-8'))
def download_report(self, filepath: str):
'''
Downloads the zone report stats from the API and scrubs the report for privacy.
:param filepath: Filepath to save the scrubbed report
'''
# Create the request
request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=self.headers)
# Make the request
with urllib.request.urlopen(request) as response:
if response.status != 200:
raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}')
content = response.read().decode('utf-8')
# Write the content to the file
with open(filepath, 'w') as file:
file.write(content.replace(self.username, 'nobody@no.name')) # Wipe the email address from the report for privacy
def download_zone(self, url: str, output_directory: str):
'''
Download a single zone file using urllib.request.
:param url: URL to download
:param output_directory: Directory to save the zone file
'''
# Create the request
request = urllib.request.Request(url, headers=self.headers)
# Make the request
with urllib.request.urlopen(request) as response:
if response.status != 200:
raise Exception(f'Failed to download {url}: {response.status} {response.reason}')
if not (content_disposition := response.getheader('Content-Disposition')):
raise ValueError('Missing Content-Disposition header')
# Extract the filename from the Content-Disposition header
filename = content_disposition.split('filename=')[-1].strip('"')
filepath = os.path.join(output_directory, filename)
# Write the content to the file
with open(filepath, 'wb') as file:
while True:
chunk = response.read(1024)
if not chunk:
break
file.write(chunk)
return filepath
def main(username: str, password: str, concurrency: int):
'''
Main function to download all zone files.
:param username: ICANN Username
:param password: ICANN Password
:param concurrency: Number of concurrent downloads
'''
now = time.strftime('%Y-%m-%d')
logging.info(f'Authenticating with ICANN API...')
CZDS_client = CZDS(username, password)
logging.debug('Created CZDS client')
output_directory = os.path.join(os.getcwd(), 'zones', now)
os.makedirs(output_directory, exist_ok=True)
logging.info('Fetching zone stats report...')
try:
CZDS_client.download_report(os.path.join(output_directory, '.report.csv'))
except Exception as e:
raise Exception(f'Failed to download zone stats report: {e}')
logging.info('Fetching zone links...')
try:
zone_links = CZDS_client.fetch_zone_links()
except Exception as e:
raise Exception(f'Failed to fetch zone links: {e}')
logging.info(f'Fetched {len(zone_links):,} zone links')
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_url = {executor.submit(CZDS_client.download_zone, url, output_directory): url for url in sorted(zone_links)}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
filepath = future.result()
logging.info(f'Completed downloading {url} to file {filepath}')
except Exception as e:
logging.error(f'{url} generated an exception: {e}')
if __name__ == '__main__':
# Create argument parser
parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service')
# Add arguments
parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username')
parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password')
parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads')
# Parse arguments
args = parser.parse_args()
# Setting up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Get username and password
username = args.username or input('ICANN Username: ')
password = args.password or getpass.getpass('ICANN Password: ')
# Execute main function
main(username, password, args.concurrency)

11
czds/__init__.py Normal file
View File

@ -0,0 +1,11 @@
#!/usr/bin/env python3
# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
# czds/__init__.py
from .client import CZDS
__version__ = '1.0.0'
__author__ = 'acidvegas'
__email__ = 'acid.vegas@acid.vegas'
__github__ = 'https://github.com/acidvegas/czds'

78
czds/__main__.py Normal file
View File

@ -0,0 +1,78 @@
#!/usr/bin/env python3
# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
# czds/__main__.py
import argparse
import concurrent.futures
import getpass
import logging
import os
import time
from .client import CZDS
def main(username: str, password: str, concurrency: int) -> None:
'''
Main function to download all zone files
:param username: ICANN Username
:param password: ICANN Password
:param concurrency: Number of concurrent downloads
'''
now = time.strftime('%Y-%m-%d')
logging.info('Authenticating with ICANN API...')
CZDS_client = CZDS(username, password)
logging.debug('Created CZDS client')
output_directory = os.path.join(os.getcwd(), 'zones', now)
os.makedirs(output_directory, exist_ok=True)
logging.info('Fetching zone stats report...')
try:
CZDS_client.download_report(os.path.join(output_directory, '.report.csv'))
except Exception as e:
raise Exception(f'Failed to download zone stats report: {e}')
logging.info('Fetching zone links...')
try:
zone_links = CZDS_client.fetch_zone_links()
except Exception as e:
raise Exception(f'Failed to fetch zone links: {e}')
logging.info(f'Fetched {len(zone_links):,} zone links')
logging.info('Downloading zone files...')
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_url = {executor.submit(CZDS_client.download_zone, url, output_directory): url for url in sorted(zone_links)}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
filepath = future.result()
logging.info(f'Completed downloading {url} to file {filepath}')
except Exception as e:
logging.error(f'{url} generated an exception: {e}')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service')
parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username')
parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password')
parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
username = args.username or input('ICANN Username: ')
password = args.password or getpass.getpass('ICANN Password: ')
main(username, password, args.concurrency)

104
czds/client.py Normal file
View File

@ -0,0 +1,104 @@
#!/usr/bin/env python3
# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
# czds/client.py
import json
import os
import urllib.request
class CZDS:
'''Class for the ICANN Centralized Zones Data Service'''
def __init__(self, username: str, password: str):
'''
Initialize CZDS client
:param username: ICANN Username
:param password: ICANN Password
'''
self.username = username
self.headers = {'Authorization': f'Bearer {self.authenticate(username, password)}'}
def authenticate(self, username: str, password: str) -> str:
'''
Authenticate with the ICANN API and return the access token
:param username: ICANN Username
:param password: ICANN Password
'''
try:
data = json.dumps({'username': username, 'password': password}).encode('utf-8')
headers = {'Content-Type': 'application/json'}
request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers)
with urllib.request.urlopen(request) as response:
response = response.read().decode('utf-8')
return json.loads(response)['accessToken']
except Exception as e:
raise Exception(f'Failed to authenticate with ICANN API: {e}')
def fetch_zone_links(self) -> list:
'''Fetch the list of zone files available for download'''
request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=self.headers)
with urllib.request.urlopen(request) as response:
if response.status != 200:
raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}')
return json.loads(response.read().decode('utf-8'))
def download_report(self, filepath: str):
'''
Downloads the zone report stats from the API and scrubs the report for privacy
:param filepath: Filepath to save the scrubbed report
'''
request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=self.headers)
with urllib.request.urlopen(request) as response:
if response.status != 200:
raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}')
content = response.read().decode('utf-8')
with open(filepath, 'w') as file:
file.write(content.replace(self.username, 'nobody@no.name'))
def download_zone(self, url: str, output_directory: str) -> str:
'''
Download a single zone file
:param url: URL to download
:param output_directory: Directory to save the zone file
'''
request = urllib.request.Request(url, headers=self.headers)
with urllib.request.urlopen(request) as response:
if response.status != 200:
raise Exception(f'Failed to download {url}: {response.status} {response.reason}')
if not (content_disposition := response.getheader('Content-Disposition')):
raise ValueError('Missing Content-Disposition header')
filename = content_disposition.split('filename=')[-1].strip('"')
filepath = os.path.join(output_directory, filename)
with open(filepath, 'wb') as file:
while True:
chunk = response.read(1024)
if not chunk:
break
file.write(chunk)
return filepath

View File

49
setup.py Normal file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python3
# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
# setup.py
from setuptools import setup, find_packages
with open('README.md', 'r', encoding='utf-8') as fh:
long_description = fh.read()
setup(
name='czds-api',
version='1.0.0',
author='acidvegas',
author_email='acid.vegas@acid.vegas',
description='ICANN API for the Centralized Zones Data Service',
long_description=long_description,
long_description_content_type='text/markdown',
url='https://github.com/acidvegas/czds',
project_urls={
'Bug Tracker': 'https://github.com/acidvegas/czds/issues',
'Documentation': 'https://github.com/acidvegas/czds#readme',
'Source Code': 'https://github.com/acidvegas/czds',
},
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'License :: OSI Approved :: ISC License (ISCL)',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Topic :: Internet',
'Topic :: Security',
'Topic :: Software Development :: Libraries :: Python Modules',
],
packages=find_packages(),
python_requires='>=3.6',
entry_points={
'console_scripts': [
'czds=czds.__main__:main',
],
},
)