Compare commits

..

No commits in common. "34f0444de7e3aca130ec9fab96c27b87516bf06b" and "e38164fd4348ef2a3555f9508119ae5de4c9ef5f" have entirely different histories.

2 changed files with 16 additions and 52 deletions

64
main.py
View File

@ -19,7 +19,7 @@ import json
import time import time
import requests import requests
from aiohttp import web, ClientSession, ClientError from aiohttp import web, ClientSession, ClientError
import aiomqtt from asyncio_mqtt import Client
from datetime import datetime, timedelta from datetime import datetime, timedelta
import pytz import pytz
TIMEZONE = pytz.timezone('America/Edmonton') TIMEZONE = pytz.timezone('America/Edmonton')
@ -123,12 +123,7 @@ class Sensor():
'tags': {'id': self.id_, 'name': self.name}, 'tags': {'id': self.id_, 'name': self.name},
'fields': data, 'fields': data,
} }
sensors_client.write_points([point])
try:
sensors_client.write_points([point])
except requests.exceptions.ConnectionError:
logging.exception('Error connecting to InfluxDB!')
return
logging.info('Wrote %s data to InfluxDB: %s', self, data) logging.info('Wrote %s data to InfluxDB: %s', self, data)
@ -179,11 +174,6 @@ class ERTSCMSensor(Sensor):
] ]
update_period = 60*60 update_period = 60*60
def transform(self, data):
# new gas meter
if 'Consumption' in data:
data['consumption_data'] = data['Consumption']
class OwnTracksSensor(Sensor): class OwnTracksSensor(Sensor):
type_ = 'owntracks' type_ = 'owntracks'
bad_keys = [ bad_keys = [
@ -290,12 +280,8 @@ async def process_data(data):
sensor.update(data) sensor.update(data)
async def process_mqtt(message): async def process_mqtt(message):
try: text = message.payload.decode()
text = message.payload.decode() topic = message.topic
except UnicodeDecodeError:
return
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text) logging.debug('MQTT topic: %s, message: %s', topic, text)
if topic.startswith('test'): if topic.startswith('test'):
@ -313,21 +299,11 @@ async def process_mqtt(message):
await process_data(data) await process_data(data)
async def fetch_mqtt(): async def fetch_mqtt():
await asyncio.sleep(3) async with Client('localhost') as client:
async with client.filtered_messages('#') as messages:
# from https://sbtinstruments.github.io/aiomqtt/reconnection.html await client.subscribe('#')
# modified to make new client since their code didn't work async for message in messages:
# https://github.com/sbtinstruments/aiomqtt/issues/269 await process_mqtt(message)
while True:
try:
async with aiomqtt.Client('localhost') as client:
await client.subscribe('#')
async for message in client.messages:
await process_mqtt(message)
except aiomqtt.MqttError:
logging.info('MQTT connection lost, reconnecting in 5 seconds...')
await asyncio.sleep(5)
async def owntracks(request): async def owntracks(request):
data = await request.json() data = await request.json()
@ -345,15 +321,9 @@ async def owntracks(request):
return web.Response() return web.Response()
async def history(request): async def history(request):
api_key = request.rel_url.query.get('api_key', '')
authed = api_key == settings.SENSORS_API_KEY
measurement = request.match_info.get('measurement') measurement = request.match_info.get('measurement')
name = request.match_info.get('name') name = request.match_info.get('name')
if not authed and measurement in ['owntracks', 'sleep']:
return web.json_response([])
if name not in [x.name for x in sensors.sensors]: if name not in [x.name for x in sensors.sensors]:
raise raise
@ -386,7 +356,7 @@ async def history(request):
raise raise
window = request.rel_url.query.get('window', window) window = request.rel_url.query.get('window', window)
if window not in ['1m', '3m', '10m', '1h', '2h', '1d', '7d', '30d']: if window not in ['10m', '1h', '1d', '7d', '30d']:
raise raise
if name == 'Water': if name == 'Water':
@ -423,9 +393,6 @@ async def history(request):
elif measurement == 'solar': elif measurement == 'solar':
client = solar_client client = solar_client
q = 'select max("actual_total") as actual_total, last("lifetime_energy")-first("lifetime_energy") as lifetime_energy from ecu where time >= {}s and time < {}s group by time({}) fill(linear)'.format(start, end, window) q = 'select max("actual_total") as actual_total, last("lifetime_energy")-first("lifetime_energy") as lifetime_energy from ecu where time >= {}s and time < {}s group by time({}) fill(linear)'.format(start, end, window)
elif measurement == 'owntracks':
client = sensors_client
q = 'select first("lat") as lat, first("lon") as lon from owntracks where "acc" < 100 and "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window)
else: else:
raise raise
@ -444,7 +411,7 @@ async def latest(request):
if sensor.type_ in ['solar']: if sensor.type_ in ['solar']:
continue continue
if not authed and sensor.type_ in ['owntracks', 'sleep']: if not authed and sensor.type_ in ['owntracks']:
continue continue
q = 'select * from {} where "name" = \'{}\' order by desc limit 1'.format(sensor.type_, sensor.name) q = 'select * from {} where "name" = \'{}\' order by desc limit 1'.format(sensor.type_, sensor.name)
@ -459,7 +426,7 @@ async def latest(request):
return web.json_response(result) return web.json_response(result)
async def index(request): async def index(request):
return web.Response(text='sensors api', content_type='text/html') return web.Response(text='hello world', content_type='text/html')
async def run_webserver(): async def run_webserver():
#web.run_app(app, port=PORT, loop=loop) #web.run_app(app, port=PORT, loop=loop)
@ -477,10 +444,7 @@ def task_died(future):
logging.error('Sensors server task died!') logging.error('Sensors server task died!')
else: else:
logging.error('Sensors server task died! Waiting 60s and exiting...') logging.error('Sensors server task died! Waiting 60s and exiting...')
try: controller_message('Sensors server task died! Waiting 60s and exiting...')
controller_message('Sensors server task died! Waiting 60s and exiting...')
except: # we want this to succeed no matter what
pass
time.sleep(60) time.sleep(60)
exit() exit()
@ -492,7 +456,7 @@ if __name__ == '__main__':
sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar')) sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar'))
sensors.add(ERTSCMSensor('31005493', 'Water')) sensors.add(ERTSCMSensor('31005493', 'Water'))
sensors.add(ERTSCMSensor('78628180', 'Gas')) sensors.add(ERTSCMSensor('41249312', 'Gas'))
sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks')) sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks'))
sensors.add(AirSensor('air1', 'Living Room')) sensors.add(AirSensor('air1', 'Living Room'))
sensors.add(Acurite606TX('59', 'Outside')) sensors.add(Acurite606TX('59', 'Outside'))

View File

@ -1,5 +1,4 @@
aiohttp==3.8.1 aiohttp==3.8.1
aiomqtt==2.0.0
aiosignal==1.2.0 aiosignal==1.2.0
async-timeout==4.0.2 async-timeout==4.0.2
asyncio-mqtt==0.11.0 asyncio-mqtt==0.11.0
@ -13,10 +12,11 @@ influxdb==5.3.1
msgpack==1.0.3 msgpack==1.0.3
multidict==5.2.0 multidict==5.2.0
paho-mqtt==1.6.1 paho-mqtt==1.6.1
pkg_resources==0.0.0
python-dateutil==2.8.2 python-dateutil==2.8.2
pytz==2021.3 pytz==2021.3
requests==2.26.0 requests==2.26.0
six==1.16.0 six==1.16.0
typing-extensions==4.9.0 typing_extensions==4.0.1
urllib3==1.26.7 urllib3==1.26.7
yarl==1.7.2 yarl==1.7.2