import os DEBUG = os.environ.get('DEBUG') import logging logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) import json from telethon import TelegramClient, events from asyncio_mqtt import Client import settings STORAGE_FILE = 'data/data.json' storage = dict(nodes={}) def save_data(): with open(STORAGE_FILE, 'w') as f: json.dump(storage, f, sort_keys=True, indent=4) try: storage = json.load(open(STORAGE_FILE)) except FileNotFoundError: save_data() bot = TelegramClient('data/bot', settings.API_ID, settings.API_HASH).start(bot_token=settings.API_TOKEN) def int_to_mesh_id(num): # converts signed 32-bit ints to unsigned hex meshtastic IDs # example: -697240346 to !d670f4e6 hex_str = hex(num & 0xFFFFFFFF) return '!' + hex_str[2:] async def process_text(data): msg_to = data.get('to', None) if msg_to != -1: logging.debug('Message not for channel: %s', str(data)) return try: msg_text = data['payload']['text'] sender = data['from'] except KeyError: logging.info('Invalid payload: %s', str(data)) return sender_id = int_to_mesh_id(sender) try: from_name = storage['nodes'][sender_id]['long_name'] except KeyError: from_name = 'Unknown sender' logging.info('Channel message from %s: %s', from_name, msg_text) msg = from_name + ': ' + msg_text await bot.send_message(settings.CHAT_ID, msg) async def process_info(data): try: node_id = data['payload']['id'] long_name = data['payload']['longname'] short_name = data['payload']['shortname'] except KeyError: logging.info('Invalid payload: %s', str(data)) return logging.info('Node %s info: %s (%s)', node_id, long_name, short_name) storage['nodes'][node_id] = dict( node_id=node_id, long_name=long_name, short_name=short_name, ) save_data() async def process_mqtt(message): try: text = message.payload.decode() except UnicodeDecodeError: logging.info('Problem decoding unicode: %s', message.payload) return topic = message.topic logging.debug('MQTT topic: %s, message: %s', topic, text) try: data = json.loads(text) except json.JSONDecodeError: logging.info('Problem decoding json: %s', text) return if type(data) != dict or 'id' not in data: logging.info('Not valid Meshtastic message: %s', text) return msg_type = data.get('type', '') if msg_type == 'text': await process_text(data) if msg_type == 'nodeinfo': await process_info(data) else: logging.debug('Ignored message type %s: %s', msg_type, text) return async def fetch_mqtt(): async with Client('localhost') as client: async with client.filtered_messages('msh/+/json/#') as messages: await client.subscribe('msh/+/json/#') async for message in messages: await process_mqtt(message) with bot: bot.loop.create_task(fetch_mqtt()) bot.run_until_disconnected()