diff --git a/ingestors/ingest_certstream.py b/ingestors/ingest_certstream.py index 2cceb29..523cea2 100644 --- a/ingestors/ingest_certstream.py +++ b/ingestors/ingest_certstream.py @@ -45,8 +45,9 @@ async def process_data(place_holder: str = None): async for websocket in websockets.connect('wss://certstream.calidog.io', ping_interval=15, ping_timeout=60): try: + # Read the websocket stream async for line in websocket: - + # Parse the JSON record try: record = json.loads(line) @@ -65,9 +66,10 @@ async def process_data(place_holder: str = None): elif domain.startswith('www.') and domain.count('.') == 2: continue if domain.count('.') > 1: + # TODO: Add a check for PSL TLDs...domain.co.uk, domain.com.au, etc. (we want to ignore these if they are not subdomains) if domain not in domains: domains.append(domain) - + # Construct the document for domain in domains: struct = { @@ -81,6 +83,10 @@ async def process_data(place_holder: str = None): logging.error(f'Connection to Certstream was closed. Attempting to reconnect... ({e})') await asyncio.sleep(3) + except Exception as e: + logging.error(f'Error processing Certstream data: {e}') + await asyncio.sleep(3) + async def test(): '''Test the ingestion process.''' @@ -91,8 +97,6 @@ async def test(): if __name__ == '__main__': - import asyncio - asyncio.run(test()) @@ -155,4 +159,7 @@ Output: }, "message_type": "certificate_update" } + +Notes: + - Fix the "no close frame received or sent" error ''' diff --git a/ingestors/ingest_httpx.py b/ingestors/ingest_httpx.py index 1579bf5..8e6d0f6 100644 --- a/ingestors/ingest_httpx.py +++ b/ingestors/ingest_httpx.py @@ -3,151 +3,176 @@ # ingest_httpx.py import json +import logging try: - import aiofiles + import aiofiles except ImportError: - raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') + raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') + + +# Set a default elasticsearch index if one is not provided +default_index = 'eris-httpx' -default_index = 'httpx-logs' def construct_map() -> dict: - '''Construct the Elasticsearch index mapping for Masscan records.''' + '''Construct the Elasticsearch index mapping for Masscan records.''' - keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } + # Match on exact value or full text search + keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } - mapping = { - 'mappings': { - 'properties': { - "timestamp" : { 'type' : 'date' }, - "hash" : { - "body_md5" : { 'type': 'keyword' }, - "body_mmh3" : { 'type': 'keyword' }, - "body_sha256" : { 'type': 'keyword' }, - "body_simhash" : { 'type': 'keyword' }, - "header_md5" : { 'type': 'keyword' }, - "header_mmh3" : { 'type': 'keyword' }, - "header_sha256" : { 'type': 'keyword' }, - "header_simhash" : { 'type': 'keyword' } - }, - "port" : { 'type': 'integer' }, - "url" : keyword_mapping, - "input" : keyword_mapping, - "title" : keyword_mapping, - "scheme" : { 'type': 'keyword' }, - "webserver" : { 'type': 'keyword' }, - "body_preview" : keyword_mapping, - "content_type" : { 'type': 'keyword' }, - "method" : { 'type': 'keyword'}, - "host" : { 'type': 'ip'}, - "path" : keyword_mapping, - "favicon" : { 'type': 'keyword' }, - "favicon_path" : keyword_mapping, - "a" : { 'type': 'ip'}, - "aaaa" : { 'type': 'ip'}, - "tech" : keyword_mapping, - "words" : { 'type': 'integer'}, - "lines" : { 'type': 'integer'}, - "status_code" : { 'type': 'integer'}, - "content_length" : { 'type': 'integer'} - } - } - } + # Construct the index mapping + mapping = { + 'mappings': { + 'properties': { + 'timestamp' : { 'type' : 'date' }, + 'hash' : { + 'properties': { + 'body_md5' : { 'type': 'keyword' }, + 'body_mmh3' : { 'type': 'keyword' }, + 'body_sha256' : { 'type': 'keyword' }, + 'body_simhash' : { 'type': 'keyword' }, + 'header_md5' : { 'type': 'keyword' }, + 'header_mmh3' : { 'type': 'keyword' }, + 'header_sha256' : { 'type': 'keyword' }, + 'header_simhash' : { 'type': 'keyword' } + } + }, + 'port' : { 'type': 'integer' }, + 'url' : keyword_mapping, + 'final_url' : keyword_mapping, + 'input' : keyword_mapping, + 'title' : keyword_mapping, + 'scheme' : { 'type': 'keyword' }, + 'webserver' : { 'type': 'keyword' }, + 'body_preview' : keyword_mapping, + 'content_type' : { 'type': 'keyword' }, + 'method' : { 'type': 'keyword' }, + 'host' : { 'type': 'ip' }, + 'path' : keyword_mapping, + 'favicon' : { 'type': 'keyword' }, + 'favicon_path' : keyword_mapping, + 'a' : { 'type': 'ip' }, + 'cname' : keyword_mapping, + 'aaaa' : { 'type': 'ip' }, + 'tech' : keyword_mapping, + 'words' : { 'type': 'integer' }, + 'lines' : { 'type': 'integer' }, + 'status_code' : { 'type': 'integer' }, + 'chain_status_codes' : { 'type': 'integer' }, + 'content_length' : { 'type': 'integer' } + } + } + } - return mapping + return mapping -async def process_data(file_path: str): - ''' - Read and process HTTPX records from the log file. +async def process_data(input_path: str): + ''' + Read and process the input file - :param file_path: Path to the HTTPX log file - ''' + :param input_path: Path to the input file + ''' - async with aiofiles.open(file_path) as input_file: - async for line in input_file: - line = line.strip() + async with aiofiles.open(input_path) as input_file: + # Read the input file line by line + async for line in input_file: + line = line.strip() - if not line: - continue + # Sentinel value to indicate the end of a process (for closing out a FIFO stream) + if line == '~eof': + break - record = json.loads(line) + # Skip empty lines + if not line: + continue - record['seen'] = record.pop('timestamp').split('.')[0] + 'Z' # Hacky solution to maintain ISO 8601 format without milliseconds or offsets - record['domain'] = record.pop('input') + # Parse the JSON record + try: + record = json.loads(line) + except json.JSONDecodeError: + logging.error(f'Failed to parse JSON record: {line}') + continue - for item in ('failed', 'knowledgebase', 'time'): - del record[item] + # Hacky solution to maintain ISO 8601 format without milliseconds or offsets + record['timestamp'] = record['timestamp'].split('.')[0] + 'Z' - yield {'_id': record['domain'], '_index': default_index, '_source': record} + # Remove unnecessary fields we don't care about + for item in ('failed', 'knowledgebase', 'time', 'csp'): + if item in record: + del record[item] + + yield {'_index': default_index, '_source': record} async def test(input_path: str): - ''' - Test the HTTPX ingestion process - - :param input_path: Path to the HTTPX log file - ''' - async for document in process_data(input_path): - print(document) + ''' + Test the ingestion process + + :param input_path: Path to the input file + ''' + + async for document in process_data(input_path): + print(document) if __name__ == '__main__': - import argparse - import asyncio + import argparse + import asyncio - parser = argparse.ArgumentParser(description='HTTPX Ingestor for ERIS') - parser.add_argument('input_path', help='Path to the input file or directory') - args = parser.parse_args() - - asyncio.run(test(args.input_path)) + parser = argparse.ArgumentParser(description='Ingestor for ERIS') + parser.add_argument('input_path', help='Path to the input file or directory') + args = parser.parse_args() + + asyncio.run(test(args.input_path)) - -'''' + +''' Deploy: - go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest - curl -s https://public-dns.info/nameservers.txt -o nameservers.txt - httpx -l zone.txt -t 200 -sc -location -favicon -title -bp -td -ip -cname -mc 200,201,301,302,303,307,308 -fr -r nameservers.txt -retries 2 -stream -sd -j -o httpx.json -v - + go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest + curl -s https://public-dns.info/nameservers.txt -o nameservers.txt + httpx -l fulldomains.txt -t 200 -sc -location -favicon -title -bp -td -ip -cname -mc 200,201,301,302,303,307,308 -fr -r nameservers.txt -retries 2 -stream -sd -j -o fifo.json -v + Output: - { - "timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset - "hash": { # Do we need all of these ? - "body_md5" : "4ae9394eb98233b482508cbda3b33a66", - "body_mmh3" : "-4111954", - "body_sha256" : "89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3", - "body_simhash" : "9814303593401624250", - "header_md5" : "980366deb2b2fb5df2ad861fc63e79ce", - "header_mmh3" : "-813072798", - "header_sha256" : "39aea75ad548e38b635421861641ad1919ed3b103b17a33c41e7ad46516f736d", - "header_simhash" : "10962523587435277678" - }, - "port" : "443", - "url" : "https://supernets.org", # Remove this and only use the input field as "domain" maybe - "input" : "supernets.org", # rename to domain - "title" : "SuperNETs", - "scheme" : "https", - "webserver" : "nginx", - "body_preview" : "SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup", - "content_type" : "text/html", - "method" : "GET", # Remove this - "host" : "51.89.151.158", - "path" : "/", - "favicon" : "-674048714", - "favicon_path" : "/i/favicon.png", - "time" : "592.907689ms", # Do we need this ? - "a" : ["6.150.220.23"], - "tech" : ["Bootstrap:4.0.0", "HSTS", "Nginx"], - "words" : 436, # Do we need this ? - "lines" : 79, # Do we need this ? - "status_code" : 200, - "content_length" : 4597, - "failed" : false, # Do we need this ? - "knowledgebase" : { # Do we need this ? - "PageType" : "nonerror", - "pHash" : 0 - } - } -''' \ No newline at end of file + { + "timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset + "hash": { # Do we need all of these ? + "body_md5" : "4ae9394eb98233b482508cbda3b33a66", + "body_mmh3" : "-4111954", + "body_sha256" : "89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3", + "body_simhash" : "9814303593401624250", + "header_md5" : "980366deb2b2fb5df2ad861fc63e79ce", + "header_mmh3" : "-813072798", + "header_sha256" : "39aea75ad548e38b635421861641ad1919ed3b103b17a33c41e7ad46516f736d", + "header_simhash" : "10962523587435277678" + }, + "port" : "443", + "url" : "https://supernets.org", # Remove this and only use the input field as "domain" maybe + "input" : "supernets.org", # rename to domain + "title" : "SuperNETs", + "scheme" : "https", + "webserver" : "nginx", + "body_preview" : "SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup", + "content_type" : "text/html", + "method" : "GET", # Remove this + "host" : "51.89.151.158", + "path" : "/", + "favicon" : "-674048714", + "favicon_path" : "/i/favicon.png", + "time" : "592.907689ms", # Do we need this ? + "a" : ["6.150.220.23"], + "tech" : ["Bootstrap:4.0.0", "HSTS", "Nginx"], + "words" : 436, # Do we need this ? + "lines" : 79, # Do we need this ? + "status_code" : 200, + "content_length" : 4597, + "failed" : false, # Do we need this ? + "knowledgebase" : { # Do we need this ? + "PageType" : "nonerror", + "pHash" : 0 + } + } +''' diff --git a/ingestors/ingest_ixps.py b/ingestors/ingest_ixps.py new file mode 100644 index 0000000..ba4102f --- /dev/null +++ b/ingestors/ingest_ixps.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) +# ingest_ixps.py + +import json +import ipaddress +import time + +try: + import aiohttp +except ImportError: + raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)') + + +# Set a default elasticsearch index if one is not provided +default_index = 'ixp-' + time.strftime('%Y-%m-%d') + + +def construct_map() -> dict: + '''Construct the Elasticsearch index mapping for records''' + + # Match on exact value or full text search + keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } + + # Construct the index mapping + mapping = { + 'mappings': { + 'properties': { + 'name' : {'type': 'keyword'}, + 'alternatenames' : {'type': 'keyword'}, + 'sources' : {'type': 'keyword'}, + 'prefixes' : { 'properties': { 'ipv4' : {'type': 'ip'}, 'ipv6' : {'type': 'ip_range'} } }, + 'url' : { 'type': 'keyword' }, + 'region' : { 'type': 'keyword' }, + 'country' : { 'type': 'keyword' }, + 'city' : { 'type': 'keyword' }, + 'state' : { 'type': 'keyword' }, + 'zip' : { 'type': 'keyword' }, + 'address' : keyword_mapping, + 'iata' : { 'type': 'keyword' }, + 'latitude' : { 'type': 'float' }, + 'longitude' : { 'type': 'float' }, + 'geo_id' : { 'type': 'integer' }, + 'ix_id' : { 'type': 'integer' }, + 'org_id' : { 'type': 'integer' }, + 'pdb_id' : { 'type': 'integer' }, + 'pdb_org_id' : { 'type': 'integer' }, + 'pch_id' : { 'type': 'integer' } + } + } + } + + return mapping + + +async def process_data(): + '''Read and process the transfers data.''' + + try: + async with aiohttp.ClientSession() as session: + async with session.get('https://publicdata.caida.org/datasets/ixps/ixs_202401.jsonl') as response: + if response.status != 200: + raise Exception(f'Failed to fetch IXP data: {response.status}') + + data = await response.text() + + try: + json_data = json.loads(data) + except json.JSONDecodeError as e: + raise Exception(f'Failed to parse IXP data: {e}') + + pass + + except Exception as e: + raise Exception(f'Error processing IXP data: {e}') + + +async def test(): + '''Test the ingestion process''' + + async for document in process_data(): + print(document) + + + +if __name__ == '__main__': + import asyncio + + asyncio.run(test()) + + + +''' +Output: +{ + "pch_id" : 1848, + "name" : "ANGONIX", + "country" : "AO", + "region" : "Africa", + "city" : "Luanda", + "iata" : "LAD", + "alternatenames" : [], + "sources" : ["pch"], + "prefixes" : { + "ipv4" : ["196.11.234.0/24"], + "ipv6" : ["2001:43f8:9d0::/48"] + }, + "geo_id" : 2240449, + "ix_id" : 10 +} +''' diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py index d5d3acb..d89a8fe 100644 --- a/ingestors/ingest_masscan.py +++ b/ingestors/ingest_masscan.py @@ -7,171 +7,177 @@ import logging import time try: - import aiofiles + import aiofiles except ImportError: - raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') + raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') +# Set a default elasticsearch index if one is not provided default_index = 'masscan-logs' def construct_map() -> dict: - '''Construct the Elasticsearch index mapping for Masscan records.''' + '''Construct the Elasticsearch index mapping for Masscan records.''' - keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } + # Match on exact value or full text search + keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } - geoip_mapping = { - 'city_name' : keyword_mapping, - 'continent_name' : keyword_mapping, - 'country_iso_code' : keyword_mapping, - 'country_name' : keyword_mapping, - 'location' : { 'type': 'geo_point' }, - 'region_iso_code' : keyword_mapping, - 'region_name' : keyword_mapping, - } + # Construct the geoip mapping (Used with the geoip pipeline to enrich the data) + geoip_mapping = { + 'city_name' : keyword_mapping, + 'continent_name' : keyword_mapping, + 'country_iso_code' : keyword_mapping, + 'country_name' : keyword_mapping, + 'location' : { 'type': 'geo_point' }, + 'region_iso_code' : keyword_mapping, + 'region_name' : keyword_mapping, + } - mapping = { - 'mappings': { - 'properties': { - 'ip' : { 'type': 'ip' }, - 'port' : { 'type': 'integer' }, - 'data' : { - 'properties': { - 'proto' : { 'type': 'keyword' }, - 'service' : { 'type': 'keyword' }, - 'banner' : keyword_mapping, - 'seen' : { 'type': 'date' } - } - }, - #'geoip' : { 'properties': geoip_mapping } # Used with the geoip pipeline to enrich the data - 'last_seen' : { 'type': 'date' } - } - } - } + # Construct the index mapping + mapping = { + 'mappings': { + 'properties': { + 'ip' : { 'type': 'ip' }, + 'port' : { 'type': 'integer' }, + 'proto' : { 'type': 'keyword' }, + 'service' : { 'type': 'keyword' }, + 'banner' : keyword_mapping, + 'seen' : { 'type': 'date' } + #'geoip' : { 'properties': geoip_mapping } + } + } + } - return mapping + return mapping -async def process_data(file_path: str): - ''' - Read and process Masscan records from the log file. +async def process_data(input_path: str): + ''' + Read and process the input file - :param file_path: Path to the Masscan log file - ''' + :param input_path: Path to the input file + ''' - async with aiofiles.open(file_path) as input_file: - async for line in input_file: - line = line.strip() + async with aiofiles.open(input_path) as input_file: + # Read the input file line by line + async for line in input_file: + line = line.strip() - if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO) - break + # Sentinel value to indicate the end of a process (for closing out a FIFO stream) + if line == '~eof': + break - if not line or not line.startswith('{'): - continue + # Skip empty lines and lines that do not start with a JSON object + if not line or not line.startswith('{'): + continue - if line.endswith(','): # Do we need this? Masscan JSON output seems with seperate records with a comma between lines for some reason... - line = line[:-1] + # Do we need this? Masscan JSON output seems with seperate records with a comma between lines for some reason... + if line.endswith(','): + line = line[:-1] - try: - record = json.loads(line) - except json.decoder.JSONDecodeError: - # In rare cases, the JSON record may be incomplete or malformed: - # { "ip": "51.161.12.223", "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner": - # { "ip": "83.66.211.246", "timestamp": "1706557002" - logging.error(f'Failed to parse JSON record! ({line})') - input('Press Enter to continue...') # Pause for review & debugging (remove this in production) - continue + # Parse the JSON record + try: + record = json.loads(line) + except json.decoder.JSONDecodeError: + # In rare cases, the JSON record may be incomplete or malformed: + # { "ip": "51.161.12.223", "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner": + # { "ip": "83.66.211.246", "timestamp": "1706557002" + logging.error(f'Failed to parse JSON record! ({line})') + input('Press Enter to continue...') # Pause for review & debugging (remove this in production) + continue - if len(record['ports']) > 1: - # In rare cases, a single record may contain multiple ports, though I have yet to witness this... - logging.warning(f'Multiple ports found for record! ({record})') - input('Press Enter to continue...') # Pause for review (remove this in production) + # In rare cases, a single record may contain multiple ports, though I have yet to witness this... + if len(record['ports']) > 1: + logging.warning(f'Multiple ports found for record! ({record})') + input('Press Enter to continue...') # Pause for review (remove this in production) - for port_info in record['ports']: - struct = { - 'ip' : record['ip'], - 'data' : { - 'port' : port_info['port'], - 'proto' : port_info['proto'], - 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))), - }, - 'last_seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))), - } + # Process each port in the record + for port_info in record['ports']: + struct = { + 'ip' : record['ip'], + 'port' : port_info['port'], + 'proto' : port_info['proto'], + 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))) + } - if 'service' in port_info: - if 'name' in port_info['service']: - if (service_name := port_info['service']['name']) not in ('unknown',''): - struct['service'] = service_name + # Add the service information if available (this field is optional) + if 'service' in port_info: - if 'banner' in port_info['service']: - banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace - if banner: - struct['banner'] = banner + # Add the service name if available + if 'name' in port_info['service']: + if (service_name := port_info['service']['name']) not in ('unknown',''): + struct['service'] = service_name - id = f'{record["ip"]}:{port_info["port"]}' # Store with ip:port as the unique id to allow the record to be reindexed if it exists. + # Add the service banner if available + if 'banner' in port_info['service']: + banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace + if banner: + struct['banner'] = banner - yield {'_id': id, '_index': default_index, '_source': struct} + # Yield the record + yield {'_index': default_index, '_source': struct} async def test(input_path: str): - ''' - Test the Masscan ingestion process - - :param input_path: Path to the MassDNS log file - ''' - async for document in process_data(input_path): - print(document) + ''' + Test the ingestion process + + :param input_path: Path to the input file + ''' + + async for document in process_data(input_path): + print(document) if __name__ == '__main__': - import argparse - import asyncio + import argparse + import asyncio - parser = argparse.ArgumentParser(description='Masscan Ingestor for ERIS') - parser.add_argument('input_path', help='Path to the input file or directory') - args = parser.parse_args() - - asyncio.run(test(args.input_path)) + parser = argparse.ArgumentParser(description='Ingestor for ERIS') + parser.add_argument('input_path', help='Path to the input file or directory') + args = parser.parse_args() + + asyncio.run(test(args.input_path)) ''' Deploy: - apt-get install iptables masscan libpcap-dev screen - setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan - /sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP # Not persistent - printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32" > exclude.conf - screen -S scan - masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json - masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL + apt-get install iptables masscan libpcap-dev screen + setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan + /sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP # Not persistent + printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32" > exclude.conf + screen -S scan + masscan 0.0.0.0/0 -p18000 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ 18000.json + masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL Output: - { - "ip" : "43.134.51.142", - "timestamp" : "1705255468", - "ports" : [ - { - "port" : 22, # We will create a record for each port opened - "proto" : "tcp", - "service" : { - "name" : "ssh", - "banner" : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4" - } - } - ] - } + { + "ip" : "43.134.51.142", + "timestamp" : "1705255468", + "ports" : [ + { + "port" : 22, # We will create a record for each port opened + "proto" : "tcp", + "service" : { + "name" : "ssh", + "banner" : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4" + } + } + ] + } Input: - { - "_id" : "43.134.51.142:22" - "_index" : "masscan-logs", - "_source" : { - "ip" : "43.134.51.142", - "port" : 22, - "proto" : "tcp", - "service" : "ssh", - "banner" : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4", - "seen" : "2021-10-08T02:04:28Z" - } -''' \ No newline at end of file + { + "_id" : "43.134.51.142:22" + "_index" : "masscan-logs", + "_source" : { + "ip" : "43.134.51.142", + "port" : 22, + "proto" : "tcp", + "service" : "ssh", + "banner" : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4", + "seen" : "2021-10-08T02:04:28Z" + } +''' diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py index 39e8674..e4372df 100644 --- a/ingestors/ingest_massdns.py +++ b/ingestors/ingest_massdns.py @@ -140,18 +140,21 @@ if __name__ == '__main__': ''' Deployment: - sudo apt-get install build-essential gcc make + sudo apt-get install build-essential gcc make python3 python3-pip + pip install aiofiles aiohttp elasticsearch + git clone --depth 1 https://github.com/acidvegas/eris.git $HOME/eris + git clone --depth 1 https://github.com/blechschmidt/massdns.git $HOME/massdns && cd $HOME/massdns && make curl -s https://public-dns.info/nameservers.txt | grep -v ':' > $HOME/massdns/nameservers.txt - python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json - or... - while true; do python ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json; done + while true; do python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/eris/FIFO; done + Output: 0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com. 0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com. 0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net. + Input: { '_id' : '47.229.6.0' @@ -163,6 +166,7 @@ Input: } } + Notes: Why do some IP addresses return a A/CNAME from a PTR request What is dns-servfail.net (Frequent CNAME response from PTR requests) diff --git a/ingestors/ingest_rir_delegations.py b/ingestors/ingest_rir_delegations.py index e28e9aa..0582315 100644 --- a/ingestors/ingest_rir_delegations.py +++ b/ingestors/ingest_rir_delegations.py @@ -191,13 +191,12 @@ Input: 'registry' : 'arin', 'cc' : 'us', 'type' : 'ipv4', - 'start' : { 'ip': '76.15.132.0' }, - 'value' : 1024, + 'ip' : { 'start': '76.15.132.0', 'end': '76.16.146.0' }, 'date' : '2007-05-02T00:00:00Z', 'status' : 'allocated', 'extensions' : '6c065d5b54b877781f05e7d30ebfff28' } - - Notes: + +Notes: Do we make this handle the database locally or load it into ram? ''' diff --git a/ingestors/ingest_rir_transfers.py b/ingestors/ingest_rir_transfers.py index b1f3654..149f519 100644 --- a/ingestors/ingest_rir_transfers.py +++ b/ingestors/ingest_rir_transfers.py @@ -23,7 +23,7 @@ transfers_db = { 'lacnic' : 'https://ftp.lacnic.net/pub/stats/lacnic/transfers/transfers_latest.json', 'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/transfers/transfers_latest.json' } - + def construct_map() -> dict: '''Construct the Elasticsearch index mapping for records''' @@ -77,19 +77,19 @@ async def process_data(): async with session.get(url) as response: if response.status != 200: raise Exception(f'Failed to fetch {registry} delegation data: {response.status}') - + data = await response.text() try: json_data = json.loads(data) - except aiohttp.ContentTypeError as e: + except json.JSONDecodeError as e: raise Exception(f'Failed to parse {registry} delegation data: {e}') - + if 'transfers' not in json_data: raise Exception(f'Invalid {registry} delegation data: {json_data}') - + for record in json_data['transfers']: - + if 'asns' in record: for set_type in ('original_set', 'transfer_set'): if set_type in record['asns']: @@ -103,7 +103,7 @@ async def process_data(): else: record['asns'][set_type][count][option] = int(asn) count += 1 - + if 'ip4nets' in record or 'ip6nets' in record: for ip_type in ('ip4nets', 'ip6nets'): @@ -124,10 +124,10 @@ async def process_data(): except ValueError as e: raise Exception(f'Invalid {set_type} {option} IP in {registry} data: {e}') count += 1 - + if record['type'] not in ('MERGER_ACQUISITION', 'RESOURCE_TRANSFER'): raise Exception(f'Invalid transfer type in {registry} data: {record["type"]}') - + yield {'_index': default_index, '_source': record} except Exception as e: @@ -152,36 +152,36 @@ if __name__ == '__main__': ''' Output: { - "transfer_date" : "2017-09-15T19:00:00Z", - "ip4nets" : { - "original_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ], - "transfer_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ] - }, - "type" : "MERGER_ACQUISITION", - "source_organization" : { "name": "Unser Ortsnetz GmbH" }, - "recipient_organization" : { - "name" : "Deutsche Glasfaser Wholesale GmbH", - "country_code" : "DE" - }, - "source_rir" : "RIPE NCC", - "recipient_rir" : "RIPE NCC" - }, + "transfer_date" : "2017-09-15T19:00:00Z", + "ip4nets" : { + "original_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ], + "transfer_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ] + }, + "type" : "MERGER_ACQUISITION", + "source_organization" : { "name": "Unser Ortsnetz GmbH" }, + "recipient_organization" : { + "name" : "Deutsche Glasfaser Wholesale GmbH", + "country_code" : "DE" + }, + "source_rir" : "RIPE NCC", + "recipient_rir" : "RIPE NCC" + }, { - "transfer_date" : "2017-09-18T19:00:00Z", - "asns" : { - "original_set" : [ { "start": 198257, "end": 198257 } ], - "transfer_set" : [ { "start": 198257, "end": 198257 } ] - }, - "type" : "MERGER_ACQUISITION", - "source_organization" : { "name": "CERT PLIX Sp. z o.o." }, - "recipient_organization" : { - "name" : "Equinix (Poland) Sp. z o.o.", + "transfer_date" : "2017-09-18T19:00:00Z", + "asns" : { + "original_set" : [ { "start": 198257, "end": 198257 } ], + "transfer_set" : [ { "start": 198257, "end": 198257 } ] + }, + "type" : "MERGER_ACQUISITION", + "source_organization" : { "name": "CERT PLIX Sp. z o.o." }, + "recipient_organization" : { + "name" : "Equinix (Poland) Sp. z o.o.", "country_code" : "PL" - }, - "source_rir" : "RIPE NCC", - "recipient_rir" : "RIPE NCC" - } - + }, + "source_rir" : "RIPE NCC", + "recipient_rir" : "RIPE NCC" + } + Input: - - Nothing changed from the output for now... + Nothing changed from the output for now... '''