From 2fbd560afde65399043073cb20fb6def3c65b5c6 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Thu, 30 Jan 2025 17:34:08 -0500 Subject: [PATCH] Updated a few ingestors, code cleanup, added ttls, etc --- ingestors/ingest_firehol.py | 281 ++++++++++++++---------------------- ingestors/ingest_masscan.py | 14 +- ingestors/ingest_massdns.py | 121 ++++++++-------- 3 files changed, 174 insertions(+), 242 deletions(-) diff --git a/ingestors/ingest_firehol.py b/ingestors/ingest_firehol.py index 58aa1a7..0ef699f 100644 --- a/ingestors/ingest_firehol.py +++ b/ingestors/ingest_firehol.py @@ -4,194 +4,158 @@ import ipaddress import logging +import os import time +import re try: - import aiohttp + import git except ImportError: - raise ImportError('Missing required libraries. (pip install aiohttp)') + raise ImportError('Missing required libraries. (pip install gitpython)') + # Set a default elasticsearch index if one is not provided default_index = 'eris-firehol' -# Base URLs for Firehol IP lists -FIREHOL_BASE_URL = 'https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/' -FIREHOL_API_URL = 'https://api.github.com/repos/firehol/blocklist-ipsets/git/trees/master' +# Git repository settings +REPO_URL = 'https://github.com/firehol/blocklist-ipsets.git' +REPO_PATH = os.path.join('data', 'firehol-blocklist') # Local path to store the repo + +# File suffixes to ignore +IGNORES = ('_1d', '_7d', '_30d', '_90d', '_180d', '_365d', '_730d') def construct_map() -> dict: '''Construct the Elasticsearch index mapping for Firehol records.''' - # Match on exact value or full text search - keyword_mapping = {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}} - - # Construct the index mapping mapping = { 'mappings': { 'properties': { - 'ip' : { 'type': 'ip' }, - 'network' : { 'type': 'ip_range' }, - 'ipset' : { 'type': 'keyword' }, - 'category' : { 'type': 'keyword' }, - 'description' : keyword_mapping, - 'maintainer' : { 'type': 'keyword' }, - 'source' : { 'type': 'keyword' }, - 'seen' : { 'type': 'date' }, - 'last_updated' : { 'type': 'date' } + 'ip' : { 'type': 'ip_range' }, + 'ipsets' : { 'type': 'keyword' }, + 'categories' : { 'type': 'keyword' }, + 'seen' : { 'type': 'date' }, } } } return mapping -async def fetch_ipset(session: aiohttp.ClientSession, ipset_name: str) -> dict: - ''' - Fetch and parse a Firehol ipset. - :param session: aiohttp client session - :param ipset_name: Name of the ipset to fetch - :return: Dictionary containing IPs and metadata +def update_repo(): + '''Update the repository locally.''' + + # If the repository doesn't exist, clone it + if not os.path.exists(REPO_PATH): + logging.info(f'Cloning repository to {REPO_PATH}...') + + # Create the directory if it doesn't exist + os.makedirs(os.path.dirname(REPO_PATH), exist_ok=True) + + # Clone the repository + git.Repo.clone_from(REPO_URL, REPO_PATH) + else: + # If the repository already exists, update it + repo = git.Repo(REPO_PATH) + logging.info('Updating repository...') + repo.remotes.origin.pull() + + +def stream_ips(file_path: str): ''' - # Try both .netset and .ipset extensions - for ext in ['.netset', '.ipset']: - url = f'{FIREHOL_BASE_URL}{ipset_name}{ext}' - try: - async with session.get(url) as response: - if response.status != 200: + Stream IPs from file, skipping comments and validating each IP. + + :param file_path: Path to the ipset file. + ''' + + try: + # Open the file + with open(file_path) as f: + + # Iterate over each line + for line in f: + + # Skip comments and empty lines + line = line.strip() + if line.startswith('#') or not line: + continue + + # Validate IP/network + try: + if not '/' in line: + line = f'{line}/32' + ipaddress.ip_network(line, strict=True) + except ValueError as e: + logging.warning(f'Invalid IP/network in {os.path.basename(file_path)}: {line} ({e})') continue - content = await response.text() - ips = set() - metadata = { - 'category': '', - 'description': '', - 'maintainer': '' - } - - for line in content.splitlines(): - line = line.strip() - - # Skip empty lines - if not line: - continue - - # Parse metadata from comments - if line.startswith('#'): - lower_line = line.lower() - if 'category:' in lower_line: - metadata['category'] = line.split('Category:', 1)[1].strip() - elif 'description:' in lower_line: - metadata['description'] = line.split('Description:', 1)[1].strip() - elif 'maintainer:' in lower_line: - metadata['maintainer'] = line.split('Maintainer:', 1)[1].strip() - continue - - # Skip invalid lines - if not any(c in '0123456789./:' for c in line): - continue - - try: - # Validate IP/network - if '/' in line: - ipaddress.ip_network(line, strict=False) - else: - ipaddress.ip_address(line) - ips.add(line) - except ValueError as e: - logging.warning(f'Invalid IP/network in {ipset_name}: {line} ({e})') - continue - - return { - 'ips': ips, - 'metadata': metadata - } - - except Exception as e: - logging.error(f'Error fetching {ipset_name}: {e}') - continue - - return None - - -async def get_all_ipsets(session: aiohttp.ClientSession) -> list: - ''' - Fetch list of all available ipsets from the Firehol repository. - - :param session: aiohttp client session - :return: List of ipset names - ''' - try: - headers = {'Accept': 'application/vnd.github.v3+json'} - async with session.get(FIREHOL_API_URL, headers=headers) as response: - if response.status != 200: - logging.error(f'Failed to fetch ipset list: HTTP {response.status}') - return [] - - data = await response.json() - ipsets = [] - - for item in data['tree']: - filename = item['path'] - # Include only .netset and .ipset files, exclude directories and other files - if filename.endswith(('.netset', '.ipset')) and not any(x in filename for x in [ - '_log', '_report', '_latest', '_1d', '_7d', '_30d', '_90d', '_180d', '_360d', '_720d', - 'README', 'COPYING', 'LICENSE', 'excluded' - ]): - ipsets.append(filename.rsplit('.', 1)[0]) - - logging.info(f'Found {len(ipsets)} ipsets') - return ipsets + # Yield the valid IP/network + yield line except Exception as e: - logging.error(f'Error fetching ipset list: {e}') - return [] + logging.error(f'Error streaming IPs from {file_path}: {e}') -async def process_data(input_path: str = None): +async def process_data(input_path = None): ''' Process Firehol ipsets and yield records for indexing. - :param input_path: Optional path to local file (not used for Firehol ingestion) + :param input_path: Placeholder for uniformity ''' - # Create a client session - async with aiohttp.ClientSession() as session: - # Get list of all available ipsets - ipset_names = await get_all_ipsets(session) + # Update the repository + update_repo() - if not ipset_names: - logging.error('No ipsets found') - return - - for ipset_name in ipset_names: - logging.info(f'Fetching {ipset_name}...') - - result = await fetch_ipset(session, ipset_name) - if not result: - logging.warning(f'Failed to fetch {ipset_name}') + # Get all files + files = [] + for filename in os.listdir(REPO_PATH): + if filename.endswith(('.ipset', '.netset')): + if any(filename.rsplit('.', 1)[0].endswith(x) for x in IGNORES): + logging.debug(f'Ignoring {filename} because it ends with {IGNORES}') continue + files.append(os.path.join(REPO_PATH, filename)) - ips = result['ips'] - metadata = result['metadata'] + logging.info(f'Processing {len(files)} files...') + + # Dictionary to store unique IPs and their metadata + ip_records = {} + + # Process each file + for file_path in files: + logging.info(f'Processing {os.path.basename(file_path)}...') + + # Get the ipset name + ipset_name = os.path.splitext(os.path.basename(file_path))[0] + + # Extract category if present + category = None + with open(file_path) as f: + for line in f: + if match := re.search(r'^#\s*Category\s*:\s*(.+)$', line, re.IGNORECASE): + category = match.group(1).strip() + break + + # Stream IPs from the file + for ip in stream_ips(file_path): + # Initialize record if IP not seen before + if ip not in ip_records: + ip_records[ip] = {'ip': ip, 'ipsets': set(), 'categories': set()} - timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + # Update arrays + ip_records[ip]['ipsets'].add(ipset_name) + if category: + ip_records[ip]['categories'].add(category) - for ip in ips: - document = { - 'ip' : ip.split('/')[0] if '/' in ip else ip, - 'ipset' : ipset_name, - 'category' : metadata['category'], - 'description' : metadata['description'], - 'maintainer' : metadata['maintainer'], - 'source' : 'firehol', - 'seen' : timestamp, - 'last_updated' : timestamp - } + # Yield unique records with converted sets to lists + for ip, record in ip_records.items(): + # Convert sets to lists for JSON serialization + record['ipsets'] = list(record['ipsets']) + record['categories'] = list(record['categories']) + record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + + # Yield the document with _id set to the IP + yield {'_index': default_index, '_id': ip, '_source': record} - if '/' in ip: - document['network'] = ip - - yield {'_index': default_index, '_source': document} async def test(): '''Test the ingestion process''' @@ -204,25 +168,4 @@ async def test(): if __name__ == '__main__': import asyncio logging.basicConfig(level=logging.INFO) - asyncio.run(test()) - - - -''' -Output Example: - -{ - "_index": "eris-firehol", - "_source": { - "ip" : "1.2.3.4", - "network" : "1.2.3.0/24", - "ipset" : "firehol_level1", - "category" : "attacks", - "description" : "Basic protection against attacks", - "maintainer" : "FireHOL team", - "source" : "firehol", - "seen" : "2024-03-05T12:00:00Z", - "last_updated" : "2024-03-05T12:00:00Z" - } -} -''' \ No newline at end of file + asyncio.run(test()) \ No newline at end of file diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py index a8e1810..756469e 100644 --- a/ingestors/ingest_masscan.py +++ b/ingestors/ingest_masscan.py @@ -22,17 +22,6 @@ def construct_map() -> dict: # Match on exact value or full text search keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } - # Construct the geoip mapping (Used with the geoip pipeline to enrich the data) - geoip_mapping = { - 'city_name' : keyword_mapping, - 'continent_name' : keyword_mapping, - 'country_iso_code' : keyword_mapping, - 'country_name' : keyword_mapping, - 'location' : { 'type': 'geo_point' }, - 'region_iso_code' : keyword_mapping, - 'region_name' : keyword_mapping, - } - # Construct the index mapping mapping = { 'mappings': { @@ -42,8 +31,8 @@ def construct_map() -> dict: 'proto' : { 'type': 'keyword' }, 'service' : { 'type': 'keyword' }, 'banner' : keyword_mapping, + 'ttl' : { 'type': 'integer' }, 'seen' : { 'type': 'date' } - #'geoip' : { 'properties': geoip_mapping } } } } @@ -83,6 +72,7 @@ async def process_data(input_path: str): 'ip' : record['ip'], 'port' : record['port'], 'proto' : record['proto'], + 'ttl' : record['ttl'], 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))) } diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py index d585bc4..6864f32 100644 --- a/ingestors/ingest_massdns.py +++ b/ingestors/ingest_massdns.py @@ -42,76 +42,75 @@ async def process_data(input_path: str): :param input_path: Path to the input file ''' + # Open the input file async with aiofiles.open(input_path) as input_file: # Cache the last document to avoid creating a new one for the same IP address last = None - try: - # Read the input file line by line - async for line in input_file: - line = line.strip() + # Read the input file line by line + async for line in input_file: - # Sentinel value to indicate the end of a process (for closing out a FIFO stream) - if line == '~eof': - yield last - break + # Strip whitespace + line = line.strip() - # Skip empty lines (doubtful we will have any, but just in case) - if not line: + # Skip empty lines + if not line: + continue + + # Sentinel value to indicate the end of a process (for closing out a FIFO stream) + if line == '~eof': + yield last + break + + # Split the line into its parts + parts = line.split() + + # Ensure the line has at least 3 parts + if len(parts) < 3: + logging.warning(f'Invalid PTR record: {line}') + continue + + # Split the PTR record into its parts + name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.') + + # Do not index other records + if record_type != 'PTR': + continue + + # Do not index PTR records that do not have a record + if not record: + continue + + # Do not index PTR records that have the same record as the in-addr.arpa domain + if record == name: + continue + + # Get the IP address from the in-addr.arpa domain + ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1]) + + # Check if we are still processing the same IP address + if last: + if ip == last['_id']: # This record is for the same IP address as the cached document + last_records = last['doc']['record'] + if record not in last_records: # Do not index duplicate records + last['doc']['record'].append(record) continue + else: + yield last # Return the last document and start a new one - # Split the line into its parts - parts = line.split() - - # Ensure the line has at least 3 parts - if len(parts) < 3: - logging.warning(f'Invalid PTR record: {line}') - continue - - # Split the PTR record into its parts - name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.') - - # Do not index other records - if record_type != 'PTR': - continue - - # Do not index PTR records that do not have a record - if not record: - continue - - # Do not index PTR records that have the same record as the in-addr.arpa domain - if record == name: - continue - - # Get the IP address from the in-addr.arpa domain - ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1]) - - # Check if we are still processing the same IP address - if last: - if ip == last['_id']: # This record is for the same IP address as the cached document - last_records = last['doc']['record'] - if record not in last_records: # Do not index duplicate records - last['doc']['record'].append(record) - continue - else: - yield last # Return the last document and start a new one - - # Cache the document - last = { - '_op_type' : 'update', - '_id' : ip, - '_index' : default_index, - 'doc' : { - 'ip' : ip, - 'record' : [record], - 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) - }, - 'doc_as_upsert' : True # Create the document if it does not exist - } - - except Exception as e: - logging.error(f'Error processing data: {e}') + # Cache the document + last = { + '_op_type' : 'update', + '_id' : ip, + '_index' : default_index, + 'doc' : { + 'ip' : ip, + 'record' : [record], + 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + }, + 'doc_as_upsert' : True # Create the document if it does not exist + } async def test(input_path: str):