diff --git a/main.py b/main.py index c65b9c4..e638a40 100644 --- a/main.py +++ b/main.py @@ -19,7 +19,7 @@ import json import time import requests from aiohttp import web, ClientSession, ClientError -from asyncio_mqtt import Client +import aiomqtt from datetime import datetime, timedelta import pytz TIMEZONE = pytz.timezone('America/Edmonton') @@ -295,7 +295,7 @@ async def process_mqtt(message): except UnicodeDecodeError: return - topic = message.topic + topic = message.topic.value logging.debug('MQTT topic: %s, message: %s', topic, text) if topic.startswith('test'): @@ -313,11 +313,21 @@ async def process_mqtt(message): await process_data(data) async def fetch_mqtt(): - async with Client('localhost') as client: - async with client.filtered_messages('#') as messages: - await client.subscribe('#') - async for message in messages: - await process_mqtt(message) + await asyncio.sleep(3) + + # from https://sbtinstruments.github.io/aiomqtt/reconnection.html + # modified to make new client since their code didn't work + # https://github.com/sbtinstruments/aiomqtt/issues/269 + while True: + try: + async with aiomqtt.Client('localhost') as client: + await client.subscribe('#') + async for message in client.messages: + await process_mqtt(message) + except aiomqtt.MqttError: + logging.info('MQTT connection lost, reconnecting in 5 seconds...') + await asyncio.sleep(5) + async def owntracks(request): data = await request.json() diff --git a/requirements.txt b/requirements.txt index 5c101a6..6a2d09b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ aiohttp==3.8.1 +aiomqtt==2.0.0 aiosignal==1.2.0 async-timeout==4.0.2 asyncio-mqtt==0.11.0 @@ -12,11 +13,10 @@ influxdb==5.3.1 msgpack==1.0.3 multidict==5.2.0 paho-mqtt==1.6.1 -pkg_resources==0.0.0 python-dateutil==2.8.2 pytz==2021.3 requests==2.26.0 six==1.16.0 -typing_extensions==4.0.1 +typing-extensions==4.9.0 urllib3==1.26.7 yarl==1.7.2