meshtastic/meshtastic_serial.py

183 lines
5.3 KiB
Python

#!/usr/bin/env python
# Meshtastic Serial Interface - Developed by Acidvegas in Python (https://git.acid.vegas)
import argparse
import logging
import os
import time
try:
import meshtastic
from meshtastic.serial_interface import SerialInterface
from meshtastic.util import findPorts
except ImportError:
raise ImportError('meshtastic library not found (pip install meshtastic)')
try:
from pubsub import pub
except ImportError:
raise ImportError('pubsub library not found (pip install pypubsub)') # Confirm this Pypi package name...
# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)9s | %(funcName)s | %(message)s')
def now():
'''Returns the current date and time in a formatted string'''
return time.strftime('%Y-%m-%d %H:%M:%S')
class Meshtastic(object):
def __init__(self, serial: str):
self.interface = None # We will define the interface in the run() function
self.nodes = {} # Nodes will populate with the on_node() callback
self.serial = serial # Serial device to use for the Meshtastic interface
def disconnect(self):
'''Disconnect from the Meshtastic interface'''
if pub.getDefaultTopicMgr().hasSubscribers():
pub.unsubAll()
logging.info('Unsubscribed from all Meshtastic topics')
else:
logging.warning('No Meshtastic topics to unsubscribe from')
if self.interface:
self.interface.close()
logging.info('Meshtastic interface closed')
else:
logging.warning('No Meshtastic interface to close')
def run(self):
'''Start the Meshtastic interface and subscribe to the callback functions'''
if devices := findPorts():
if not os.path.exists(args.serial) or not args.serial in devices:
raise SystemExit(f'Invalid serial device: {args.serial} (Available: {devices})') # Show available devices if the specified device is invalid
else:
raise SystemExit('No serial devices found')
# Initialize the Meshtastic interface
self.interface = SerialInterface(self.serial)
# Interface over TCP instead of serial:
#from meshtastic.tcp_interface import TCPInterface
#self.interface = TCPInterface(args.tcp)
logging.info('Meshtastic interface started over serial on {self.serial}')
# Get the current node information
me = self.interface.nodes[self.interface.myInfo.my_node_num]
logging.debug(me)
# Create the Meshtastic callback subscriptions
pub.subscribe(self.event_connect, 'meshtastic.connection.established')
pub.subscribe(self.event_disconnect, 'meshtastic.connection.lost')
pub.subscribe(self.on_node, 'meshtastic.node.updated')
pub.subscribe(self.on_packet, 'meshtastic.receive')
logging.debug('Listening for Meshtastic events...')
# The meshtastic.receive topics can be broken down further:
# pub.subscribe(self.on_text, 'meshtastic.receive.text')
# pub.subscribe(self.on_position, 'meshtastic.receive.position')
# pub.subscribe(self.on_user, 'meshtastic.receive.user')
# pub.subscribe(self.on_data, 'meshtastic.receive.data.portnum')
def event_connect(self, interface, topic=pub.AUTO_TOPIC):
'''
Callback function for connection established
:param interface: Meshtastic interface
:param topic: PubSub topic
'''
logging.info('Connection established')
def event_disconnect(self, interface, topic=pub.AUTO_TOPIC):
'''
Callback function for connection lost
:param interface: Meshtastic interface
:param topic: PubSub topic
'''
logging.warning('Connection lost')
def on_packet(self, packet: dict):
'''
Callback function for received packets
:param packet: Packet received
'''
# Handle incoming text messages
if packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP':
sender = packet['from']
msg = packet['decoded']['payload'].decode('utf-8')
# Message from self
if sender == self.interface.myInfo.my_node_num:
print(f'{now()} {self.nodes[sender]}: {msg}') # Can do custom formatting here or ignore the message, just an example
# Message from others
if sender in self.nodes:
print(f'{now()} {self.nodes[sender]}: {msg}')
# Unknown sender
else:
# TODO: Trigger request for node update here
print(f'{now()} UNK: {msg}')
def on_node(self, interface, topic=pub.AUTO_TOPIC):
'''
Callback function for node updates
:param interface: Meshtastic interface
:param topic: PubSub topic
'''
if not interface.nodes:
logging.warning('No nodes found')
return
for node in interface.nodes.values():
short = node['user']['shortName']
long = node['user']['longName'].encode('ascii', 'ignore').decode().rstrip()
num = node['num']
id = node['user']['id']
mac = node['user']['macaddr']
hw = node['user']['hwModel']
self.nodes[num] = long # we store the node updates in a dictionary so we can parse the names of who sent incomming messages
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Meshtastic Interface')
parser.add_argument('--serial', default='/dev/ttyACM0', help='Use serial interface')
parser.add_argument('--tcp', default='meshtastic.local'. help='Use TCP interface')
args = parser.parse_args()
# Define the Meshtastic client
mesh = Meshtastic(args.serial)
# Initialize the Meshtastic interface
mesh.run()
# Keep-alive loop
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
pass
finally:
mesh.disconnect()