Updated a few ingestors, code cleanup, added ttls, etc

This commit is contained in:
Dionysus 2025-01-30 17:34:08 -05:00
parent 3673a60918
commit 2fbd560afd
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
3 changed files with 174 additions and 242 deletions

View File

@ -4,194 +4,158 @@
import ipaddress
import logging
import os
import time
import re
try:
import aiohttp
import git
except ImportError:
raise ImportError('Missing required libraries. (pip install aiohttp)')
raise ImportError('Missing required libraries. (pip install gitpython)')
# Set a default elasticsearch index if one is not provided
default_index = 'eris-firehol'
# Base URLs for Firehol IP lists
FIREHOL_BASE_URL = 'https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/'
FIREHOL_API_URL = 'https://api.github.com/repos/firehol/blocklist-ipsets/git/trees/master'
# Git repository settings
REPO_URL = 'https://github.com/firehol/blocklist-ipsets.git'
REPO_PATH = os.path.join('data', 'firehol-blocklist') # Local path to store the repo
# File suffixes to ignore
IGNORES = ('_1d', '_7d', '_30d', '_90d', '_180d', '_365d', '_730d')
def construct_map() -> dict:
'''Construct the Elasticsearch index mapping for Firehol records.'''
# Match on exact value or full text search
keyword_mapping = {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}
# Construct the index mapping
mapping = {
'mappings': {
'properties': {
'ip' : { 'type': 'ip' },
'network' : { 'type': 'ip_range' },
'ipset' : { 'type': 'keyword' },
'category' : { 'type': 'keyword' },
'description' : keyword_mapping,
'maintainer' : { 'type': 'keyword' },
'source' : { 'type': 'keyword' },
'seen' : { 'type': 'date' },
'last_updated' : { 'type': 'date' }
'ip' : { 'type': 'ip_range' },
'ipsets' : { 'type': 'keyword' },
'categories' : { 'type': 'keyword' },
'seen' : { 'type': 'date' },
}
}
}
return mapping
async def fetch_ipset(session: aiohttp.ClientSession, ipset_name: str) -> dict:
'''
Fetch and parse a Firehol ipset.
:param session: aiohttp client session
:param ipset_name: Name of the ipset to fetch
:return: Dictionary containing IPs and metadata
def update_repo():
'''Update the repository locally.'''
# If the repository doesn't exist, clone it
if not os.path.exists(REPO_PATH):
logging.info(f'Cloning repository to {REPO_PATH}...')
# Create the directory if it doesn't exist
os.makedirs(os.path.dirname(REPO_PATH), exist_ok=True)
# Clone the repository
git.Repo.clone_from(REPO_URL, REPO_PATH)
else:
# If the repository already exists, update it
repo = git.Repo(REPO_PATH)
logging.info('Updating repository...')
repo.remotes.origin.pull()
def stream_ips(file_path: str):
'''
# Try both .netset and .ipset extensions
for ext in ['.netset', '.ipset']:
url = f'{FIREHOL_BASE_URL}{ipset_name}{ext}'
try:
async with session.get(url) as response:
if response.status != 200:
Stream IPs from file, skipping comments and validating each IP.
:param file_path: Path to the ipset file.
'''
try:
# Open the file
with open(file_path) as f:
# Iterate over each line
for line in f:
# Skip comments and empty lines
line = line.strip()
if line.startswith('#') or not line:
continue
# Validate IP/network
try:
if not '/' in line:
line = f'{line}/32'
ipaddress.ip_network(line, strict=True)
except ValueError as e:
logging.warning(f'Invalid IP/network in {os.path.basename(file_path)}: {line} ({e})')
continue
content = await response.text()
ips = set()
metadata = {
'category': '',
'description': '',
'maintainer': ''
}
for line in content.splitlines():
line = line.strip()
# Skip empty lines
if not line:
continue
# Parse metadata from comments
if line.startswith('#'):
lower_line = line.lower()
if 'category:' in lower_line:
metadata['category'] = line.split('Category:', 1)[1].strip()
elif 'description:' in lower_line:
metadata['description'] = line.split('Description:', 1)[1].strip()
elif 'maintainer:' in lower_line:
metadata['maintainer'] = line.split('Maintainer:', 1)[1].strip()
continue
# Skip invalid lines
if not any(c in '0123456789./:' for c in line):
continue
try:
# Validate IP/network
if '/' in line:
ipaddress.ip_network(line, strict=False)
else:
ipaddress.ip_address(line)
ips.add(line)
except ValueError as e:
logging.warning(f'Invalid IP/network in {ipset_name}: {line} ({e})')
continue
return {
'ips': ips,
'metadata': metadata
}
except Exception as e:
logging.error(f'Error fetching {ipset_name}: {e}')
continue
return None
async def get_all_ipsets(session: aiohttp.ClientSession) -> list:
'''
Fetch list of all available ipsets from the Firehol repository.
:param session: aiohttp client session
:return: List of ipset names
'''
try:
headers = {'Accept': 'application/vnd.github.v3+json'}
async with session.get(FIREHOL_API_URL, headers=headers) as response:
if response.status != 200:
logging.error(f'Failed to fetch ipset list: HTTP {response.status}')
return []
data = await response.json()
ipsets = []
for item in data['tree']:
filename = item['path']
# Include only .netset and .ipset files, exclude directories and other files
if filename.endswith(('.netset', '.ipset')) and not any(x in filename for x in [
'_log', '_report', '_latest', '_1d', '_7d', '_30d', '_90d', '_180d', '_360d', '_720d',
'README', 'COPYING', 'LICENSE', 'excluded'
]):
ipsets.append(filename.rsplit('.', 1)[0])
logging.info(f'Found {len(ipsets)} ipsets')
return ipsets
# Yield the valid IP/network
yield line
except Exception as e:
logging.error(f'Error fetching ipset list: {e}')
return []
logging.error(f'Error streaming IPs from {file_path}: {e}')
async def process_data(input_path: str = None):
async def process_data(input_path = None):
'''
Process Firehol ipsets and yield records for indexing.
:param input_path: Optional path to local file (not used for Firehol ingestion)
:param input_path: Placeholder for uniformity
'''
# Create a client session
async with aiohttp.ClientSession() as session:
# Get list of all available ipsets
ipset_names = await get_all_ipsets(session)
# Update the repository
update_repo()
if not ipset_names:
logging.error('No ipsets found')
return
for ipset_name in ipset_names:
logging.info(f'Fetching {ipset_name}...')
result = await fetch_ipset(session, ipset_name)
if not result:
logging.warning(f'Failed to fetch {ipset_name}')
# Get all files
files = []
for filename in os.listdir(REPO_PATH):
if filename.endswith(('.ipset', '.netset')):
if any(filename.rsplit('.', 1)[0].endswith(x) for x in IGNORES):
logging.debug(f'Ignoring {filename} because it ends with {IGNORES}')
continue
files.append(os.path.join(REPO_PATH, filename))
ips = result['ips']
metadata = result['metadata']
logging.info(f'Processing {len(files)} files...')
# Dictionary to store unique IPs and their metadata
ip_records = {}
# Process each file
for file_path in files:
logging.info(f'Processing {os.path.basename(file_path)}...')
# Get the ipset name
ipset_name = os.path.splitext(os.path.basename(file_path))[0]
# Extract category if present
category = None
with open(file_path) as f:
for line in f:
if match := re.search(r'^#\s*Category\s*:\s*(.+)$', line, re.IGNORECASE):
category = match.group(1).strip()
break
# Stream IPs from the file
for ip in stream_ips(file_path):
# Initialize record if IP not seen before
if ip not in ip_records:
ip_records[ip] = {'ip': ip, 'ipsets': set(), 'categories': set()}
timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
# Update arrays
ip_records[ip]['ipsets'].add(ipset_name)
if category:
ip_records[ip]['categories'].add(category)
for ip in ips:
document = {
'ip' : ip.split('/')[0] if '/' in ip else ip,
'ipset' : ipset_name,
'category' : metadata['category'],
'description' : metadata['description'],
'maintainer' : metadata['maintainer'],
'source' : 'firehol',
'seen' : timestamp,
'last_updated' : timestamp
}
# Yield unique records with converted sets to lists
for ip, record in ip_records.items():
# Convert sets to lists for JSON serialization
record['ipsets'] = list(record['ipsets'])
record['categories'] = list(record['categories'])
record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
# Yield the document with _id set to the IP
yield {'_index': default_index, '_id': ip, '_source': record}
if '/' in ip:
document['network'] = ip
yield {'_index': default_index, '_source': document}
async def test():
'''Test the ingestion process'''
@ -204,25 +168,4 @@ async def test():
if __name__ == '__main__':
import asyncio
logging.basicConfig(level=logging.INFO)
asyncio.run(test())
'''
Output Example:
{
"_index": "eris-firehol",
"_source": {
"ip" : "1.2.3.4",
"network" : "1.2.3.0/24",
"ipset" : "firehol_level1",
"category" : "attacks",
"description" : "Basic protection against attacks",
"maintainer" : "FireHOL team",
"source" : "firehol",
"seen" : "2024-03-05T12:00:00Z",
"last_updated" : "2024-03-05T12:00:00Z"
}
}
'''
asyncio.run(test())

View File

@ -22,17 +22,6 @@ def construct_map() -> dict:
# Match on exact value or full text search
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
# Construct the geoip mapping (Used with the geoip pipeline to enrich the data)
geoip_mapping = {
'city_name' : keyword_mapping,
'continent_name' : keyword_mapping,
'country_iso_code' : keyword_mapping,
'country_name' : keyword_mapping,
'location' : { 'type': 'geo_point' },
'region_iso_code' : keyword_mapping,
'region_name' : keyword_mapping,
}
# Construct the index mapping
mapping = {
'mappings': {
@ -42,8 +31,8 @@ def construct_map() -> dict:
'proto' : { 'type': 'keyword' },
'service' : { 'type': 'keyword' },
'banner' : keyword_mapping,
'ttl' : { 'type': 'integer' },
'seen' : { 'type': 'date' }
#'geoip' : { 'properties': geoip_mapping }
}
}
}
@ -83,6 +72,7 @@ async def process_data(input_path: str):
'ip' : record['ip'],
'port' : record['port'],
'proto' : record['proto'],
'ttl' : record['ttl'],
'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp'])))
}

View File

@ -42,76 +42,75 @@ async def process_data(input_path: str):
:param input_path: Path to the input file
'''
# Open the input file
async with aiofiles.open(input_path) as input_file:
# Cache the last document to avoid creating a new one for the same IP address
last = None
try:
# Read the input file line by line
async for line in input_file:
line = line.strip()
# Read the input file line by line
async for line in input_file:
# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
if line == '~eof':
yield last
break
# Strip whitespace
line = line.strip()
# Skip empty lines (doubtful we will have any, but just in case)
if not line:
# Skip empty lines
if not line:
continue
# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
if line == '~eof':
yield last
break
# Split the line into its parts
parts = line.split()
# Ensure the line has at least 3 parts
if len(parts) < 3:
logging.warning(f'Invalid PTR record: {line}')
continue
# Split the PTR record into its parts
name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
# Do not index other records
if record_type != 'PTR':
continue
# Do not index PTR records that do not have a record
if not record:
continue
# Do not index PTR records that have the same record as the in-addr.arpa domain
if record == name:
continue
# Get the IP address from the in-addr.arpa domain
ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
# Check if we are still processing the same IP address
if last:
if ip == last['_id']: # This record is for the same IP address as the cached document
last_records = last['doc']['record']
if record not in last_records: # Do not index duplicate records
last['doc']['record'].append(record)
continue
else:
yield last # Return the last document and start a new one
# Split the line into its parts
parts = line.split()
# Ensure the line has at least 3 parts
if len(parts) < 3:
logging.warning(f'Invalid PTR record: {line}')
continue
# Split the PTR record into its parts
name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
# Do not index other records
if record_type != 'PTR':
continue
# Do not index PTR records that do not have a record
if not record:
continue
# Do not index PTR records that have the same record as the in-addr.arpa domain
if record == name:
continue
# Get the IP address from the in-addr.arpa domain
ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
# Check if we are still processing the same IP address
if last:
if ip == last['_id']: # This record is for the same IP address as the cached document
last_records = last['doc']['record']
if record not in last_records: # Do not index duplicate records
last['doc']['record'].append(record)
continue
else:
yield last # Return the last document and start a new one
# Cache the document
last = {
'_op_type' : 'update',
'_id' : ip,
'_index' : default_index,
'doc' : {
'ip' : ip,
'record' : [record],
'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
},
'doc_as_upsert' : True # Create the document if it does not exist
}
except Exception as e:
logging.error(f'Error processing data: {e}')
# Cache the document
last = {
'_op_type' : 'update',
'_id' : ip,
'_index' : default_index,
'doc' : {
'ip' : ip,
'record' : [record],
'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
},
'doc_as_upsert' : True # Create the document if it does not exist
}
async def test(input_path: str):