#!/usr/bin/env python # Meshtastic Serial Interface - Developed by Acidvegas in Python (https://git.acid.vegas) import argparse import logging import os import time try: import meshtastic from meshtastic.serial_interface import SerialInterface from meshtastic.util import findPorts from meshtastic.tcp_interface import TCPInterface except ImportError: raise ImportError('meshtastic library not found (pip install meshtastic)') try: from pubsub import pub except ImportError: raise ImportError('pubsub library not found (pip install pypubsub)') # Confirm this Pypi package name... # Initialize logging logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)9s | %(funcName)s | %(message)s') def now(): '''Returns the current date and time in a formatted string''' return time.strftime('%Y-%m-%d %H:%M:%S') class MeshtasticClient(object): def __init__(self): self.interface = None # We will define the interface in the connect() function self.nodes = {} # Nodes will populate with the event_node() callback def connect(self, option: str, value: str): ''' Connect to the Meshtastic interface :param option: The interface option to connect to :param value: The value of the interface option ''' if option == 'serial': if devices := findPorts(): if not os.path.exists(args.serial) or not args.serial in devices: raise SystemExit(f'Invalid serial device: {args.serial} (Available: {devices})') # Show available devices if the specified device is invalid else: raise SystemExit('No serial devices found') self.interface = SerialInterface(value) elif option == 'tcp': self.interface = TCPInterface(value) elif option == 'mqtt': raise NotImplementedError('MQTT interface not implemented yet') else: raise SystemExit('Invalid interface option') logging.info(f'Connected to radio over {option} from {value}:') logging.debug(self.interface.nodes[self.interface.myInfo.my_node_num]) # Print the node info of the connected radio def disconnect(self): '''Disconnect from the Meshtastic interface''' if pub.getDefaultTopicMgr().hasSubscribers(): pub.unsubAll() logging.info('Unsubscribed from all Meshtastic topics') else: logging.warning('No Meshtastic topics to unsubscribe from') if self.interface: self.interface.close() logging.info('Meshtastic interface closed') else: logging.warning('No Meshtastic interface to close') logging.info('Disconnected from radio') def send(self, message: str): ''' Send a message to the Meshtastic interface :param message: The message to send ''' if len(message) > 255: logging.warning('Message exceeds 255 characters') message = message[:255] self.interface.sendText(message) logging.info(f'Sent broadcast message: {message}') def listen(self): '''Create the Meshtastic callback subscriptions''' pub.subscribe(self.event_connect, 'meshtastic.connection.established') pub.subscribe(self.event_disconnect, 'meshtastic.connection.lost') pub.subscribe(self.event_node, 'meshtastic.node.updated') pub.subscribe(self.event_packet, 'meshtastic.receive') logging.debug('Listening for Meshtastic events...') # The meshtastic.receive topics can be broken down further: # pub.subscribe(self.on_text, 'meshtastic.receive.text') # pub.subscribe(self.on_position, 'meshtastic.receive.position') # pub.subscribe(self.on_user, 'meshtastic.receive.user') # pub.subscribe(self.on_data, 'meshtastic.receive.data.portnum') def event_connect(self, interface, topic=pub.AUTO_TOPIC): ''' Callback function for connection established :param interface: Meshtastic interface :param topic: PubSub topic ''' me = interface.nodes[interface.myInfo.my_node_num]['user']['longName'] logging.info(f'Connected to \'{me}\' radio') def event_disconnect(self, interface, topic=pub.AUTO_TOPIC): ''' Callback function for connection lost :param interface: Meshtastic interface :param topic: PubSub topic ''' logging.warning('Lost connection to radio') def event_node(self, interface, topic=pub.AUTO_TOPIC): ''' Callback function for node updates :param interface: Meshtastic interface :param topic: PubSub topic ''' if not interface.nodes: logging.warning('No nodes found') return for node in interface.nodes.values(): short = node['user']['shortName'] long = node['user']['longName'].encode('ascii', 'ignore').decode().rstrip() num = node['num'] id = node['user']['id'] mac = node['user']['macaddr'] hw = node['user']['hwModel'] self.nodes[num] = long # we store the node updates in a dictionary so we can parse the names of who sent incomming messages def event_packet(self, packet: dict): ''' Callback function for received packets :param packet: Packet received ''' # Handle incoming text messages if packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': sender = packet['from'] msg = packet['decoded']['payload'].decode('utf-8') # Message from self if sender == self.interface.myInfo.my_node_num: print(f'{now()} {self.nodes[sender]}: {msg}') # Can do custom formatting here or ignore the message, just an example # Message from others if sender in self.nodes: print(f'{now()} {self.nodes[sender]}: {msg}') # Unknown sender else: # TODO: Trigger request for node update here print(f'{now()} UNK: {msg}') def event_position(self, packet: dict): ''' Callback function for received position packets :param packet: Packet received ''' # Handle incoming position messages pass if __name__ == '__main__': parser = argparse.ArgumentParser(description='Meshtastic Interface') # Interface options parser.add_argument('--serial', help='Use serial interface') parser.add_argument('--tcp', help='Use TCP interface') parser.add_argument('--mqtt', help='Use MQTT interface') args = parser.parse_args() if not args.serial and not args.tcp and not args.mqtt: raise SystemExit('No interface specified') if (args.serial and args.tcp) or (args.serial and args.mqtt) or (args.tcp and args.mqtt): raise SystemExit('Only one interface option can be specified (--serial, --tcp, or --mqtt)') # Initialize the Meshtastic client mesh = MeshtasticClient() # Determine the interface option and value option = 'serial' if args.serial else 'tcp' if args.tcp else 'mqtt' value = args.serial if args.serial else args.tcp if args.tcp else args.mqtt # Start the Meshtastic interface mesh.connect(option, value) # Keep-alive loop try: while True: time.sleep(60) except KeyboardInterrupt: pass finally: mesh.disconnect()