Update / switch to aiomqtt, reconnect mqtt on error
This commit is contained in:
parent
1346171618
commit
8abb15cdd3
20
main.py
20
main.py
|
@ -19,7 +19,7 @@ import json
|
||||||
import time
|
import time
|
||||||
import requests
|
import requests
|
||||||
from aiohttp import web, ClientSession, ClientError
|
from aiohttp import web, ClientSession, ClientError
|
||||||
from asyncio_mqtt import Client
|
import aiomqtt
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
import pytz
|
import pytz
|
||||||
TIMEZONE = pytz.timezone('America/Edmonton')
|
TIMEZONE = pytz.timezone('America/Edmonton')
|
||||||
|
@ -295,7 +295,7 @@ async def process_mqtt(message):
|
||||||
except UnicodeDecodeError:
|
except UnicodeDecodeError:
|
||||||
return
|
return
|
||||||
|
|
||||||
topic = message.topic
|
topic = message.topic.value
|
||||||
logging.debug('MQTT topic: %s, message: %s', topic, text)
|
logging.debug('MQTT topic: %s, message: %s', topic, text)
|
||||||
|
|
||||||
if topic.startswith('test'):
|
if topic.startswith('test'):
|
||||||
|
@ -313,11 +313,21 @@ async def process_mqtt(message):
|
||||||
await process_data(data)
|
await process_data(data)
|
||||||
|
|
||||||
async def fetch_mqtt():
|
async def fetch_mqtt():
|
||||||
async with Client('localhost') as client:
|
await asyncio.sleep(3)
|
||||||
async with client.filtered_messages('#') as messages:
|
|
||||||
|
# from https://sbtinstruments.github.io/aiomqtt/reconnection.html
|
||||||
|
# modified to make new client since their code didn't work
|
||||||
|
# https://github.com/sbtinstruments/aiomqtt/issues/269
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
async with aiomqtt.Client('localhost') as client:
|
||||||
await client.subscribe('#')
|
await client.subscribe('#')
|
||||||
async for message in messages:
|
async for message in client.messages:
|
||||||
await process_mqtt(message)
|
await process_mqtt(message)
|
||||||
|
except aiomqtt.MqttError:
|
||||||
|
logging.info('MQTT connection lost, reconnecting in 5 seconds...')
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
async def owntracks(request):
|
async def owntracks(request):
|
||||||
data = await request.json()
|
data = await request.json()
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
aiohttp==3.8.1
|
aiohttp==3.8.1
|
||||||
|
aiomqtt==2.0.0
|
||||||
aiosignal==1.2.0
|
aiosignal==1.2.0
|
||||||
async-timeout==4.0.2
|
async-timeout==4.0.2
|
||||||
asyncio-mqtt==0.11.0
|
asyncio-mqtt==0.11.0
|
||||||
|
@ -12,11 +13,10 @@ influxdb==5.3.1
|
||||||
msgpack==1.0.3
|
msgpack==1.0.3
|
||||||
multidict==5.2.0
|
multidict==5.2.0
|
||||||
paho-mqtt==1.6.1
|
paho-mqtt==1.6.1
|
||||||
pkg_resources==0.0.0
|
|
||||||
python-dateutil==2.8.2
|
python-dateutil==2.8.2
|
||||||
pytz==2021.3
|
pytz==2021.3
|
||||||
requests==2.26.0
|
requests==2.26.0
|
||||||
six==1.16.0
|
six==1.16.0
|
||||||
typing_extensions==4.0.1
|
typing-extensions==4.9.0
|
||||||
urllib3==1.26.7
|
urllib3==1.26.7
|
||||||
yarl==1.7.2
|
yarl==1.7.2
|
||||||
|
|
Loading…
Reference in New Issue
Block a user