From e38e0ec69b00f2aded2b1acca6e8d1733518095b Mon Sep 17 00:00:00 2001 From: acidvegas Date: Sat, 30 Nov 2024 18:17:12 -0500 Subject: [PATCH] Added mapping for Meshtastic MQTT events, improved payload processing --- ingestors/ingest_meshtastic.py | 133 +++++++++++++++++++++++++++++++-- 1 file changed, 127 insertions(+), 6 deletions(-) diff --git a/ingestors/ingest_meshtastic.py b/ingestors/ingest_meshtastic.py index 5142f4f..55363fa 100644 --- a/ingestors/ingest_meshtastic.py +++ b/ingestors/ingest_meshtastic.py @@ -5,6 +5,7 @@ import asyncio import json import logging +import time try: import aiofiles @@ -19,8 +20,113 @@ default_index = 'eris-meshtastic' def construct_map() -> dict: '''Construct the Elasticsearch index mapping for Meshtastic records.''' - # Mapping not done yet - return {} + # Match on exact value or full text search + keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } + + return { + 'mappings': { + 'properties': { + 'channel' : { 'type': 'long'}, + 'decoded' : { + 'properties': { + 'bitfield' : { 'type': 'long' }, + 'payload' : { + 'type' : 'nested', + 'dynamic' : True, + 'properties' : { + 'HDOP' : { 'type': 'long' }, + 'PDOP' : { 'type': 'long' }, + 'altitude' : { 'type': 'long' }, + 'altitudeGeoidalSeparation' : { 'type': 'long' }, + 'altitudeHae' : { 'type': 'long' }, + 'deviceMetrics' : { + 'properties' : { + 'airUtilTx' : { 'type': 'float' }, + 'batteryLevel' : { 'type': 'long' }, + 'channelUtilization' : { 'type': 'float' }, + 'uptimeSeconds' : { 'type': 'long' }, + 'voltage' : { 'type': 'float' } + } + }, + 'environmentMetrics' : { + 'properties' : { + 'barometricPressure' : { 'type': 'float' }, + 'current' : { 'type': 'float' }, + 'distance' : keyword_mapping, + 'gasResistance' : { 'type': 'float' }, + 'iaq' : { 'type': 'long' }, + 'lux' : { 'type': 'float' }, + 'relativeHumidity' : { 'type': 'float' }, + 'temperature' : { 'type': 'float' }, + 'voltage' : { 'type': 'float' }, + 'whiteLux' : { 'type': 'float' }, + 'windDirection' : { 'type': 'long' }, + 'windSpeed' : { 'type': 'float' } + } + }, + 'errorReason' : keyword_mapping, + 'groundSpeed' : { 'type': 'long' }, + 'groundTrack' : { 'type': 'long' }, + 'hwModel' : keyword_mapping, + 'id' : keyword_mapping, + 'isLicensed' : { 'type': 'boolean' }, + 'lastSentById' : { 'type': 'long' }, + 'latitudeI' : { 'type': 'long' }, + 'locationSource' : keyword_mapping, + 'longName' : keyword_mapping, + 'longitudeI' : { 'type': 'long' }, + 'macaddr' : keyword_mapping, + 'neighbors' : { + 'properties' : { + 'nodeId' : { 'type': 'long' }, + 'snr' : { 'type': 'float' } + } + }, + 'nodeBroadcastIntervalSecs' : { 'type': 'long' }, + 'nodeId' : { 'type': 'long' }, + 'powerMetrics' : { + 'properties': { + 'ch1Current' : { 'type': 'float' }, + 'ch1Voltage' : { 'type': 'float' }, + 'ch2Current' : { 'type': 'float' }, + 'ch2Voltage' : { 'type': 'float' }, + 'ch3Current' : { 'type': 'float' }, + 'ch3Voltage' : { 'type': 'float' } + } + }, + 'precisionBits' : { 'type': 'long' }, + 'publicKey' : keyword_mapping, + 'role' : keyword_mapping, + 'route' : { 'type': 'long' }, + 'routeBack' : { 'type': 'long' }, + 'satsInView' : { 'type': 'long' }, + 'seqNumber' : { 'type': 'long' }, + 'shortName' : keyword_mapping, + 'snrBack' : { 'type': 'long' }, + 'snrTowards' : { 'type': 'long' }, + 'time' : { 'type': 'date' }, + 'timestamp' : { 'type': 'date' } + } + }, + 'portnum' : keyword_mapping, + 'requestId' : { 'type': 'long' }, + 'wantResponse' : { 'type': 'boolean' } + } + }, + 'from' : { 'type': 'long' }, + 'hopLimit' : { 'type': 'long' }, + 'hopStart' : { 'type': 'long' }, + 'id' : { 'type': 'long' }, + 'priority' : keyword_mapping, + 'rxRssi' : { 'type': 'long' }, + 'rxSnr' : { 'type': 'float' }, + 'rxTime' : { 'type': 'date' }, + 'to' : { 'type': 'long' }, + 'viaMqtt' : { 'type': 'boolean' }, + 'wantAck' : { 'type': 'boolean' } + } + } + } async def process_data(input_path: str): @@ -31,25 +137,40 @@ async def process_data(input_path: str): ''' async with aiofiles.open(input_path) as input_file: - # Read the input file line by line async for line in input_file: line = line.strip() - # Sentinel value to indicate the end of a process (for closing out a FIFO stream) if line == '~eof': break - # Skip empty lines and lines that do not start with a JSON object if not line or not line.startswith('{'): continue - # Parse the JSON record try: record = json.loads(line) except json.decoder.JSONDecodeError: logging.error(f'Failed to parse JSON record! ({line})') continue + # Convert Unix timestamps to Zulu time format + if 'rxTime' in record: + record['rxTime'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(record['rxTime'])) + + # Handle payload processing + if 'decoded' in record and 'payload' in record['decoded']: + payload = record['decoded']['payload'] + + # If payload is not a dict, wrap it in a nested array with a value field + if not isinstance(payload, dict): + record['decoded']['payload'] = [{'value': payload}] + else: + # Process timestamps in payload object and ensure it's in an array + if 'time' in payload: + payload['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(payload['time'])) + if 'timestamp' in payload: + payload['timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(payload['timestamp'])) + record['decoded']['payload'] = [payload] + yield {'_index': default_index, '_source': record}