@ -19,7 +19,7 @@ import json
import time
import requests
from aiohttp import web , ClientSession , ClientError
from asyncio_mqtt import Clien t
import aiomqt t
from datetime import datetime , timedelta
import pytz
TIMEZONE = pytz . timezone ( ' America/Edmonton ' )
@ -123,7 +123,12 @@ class Sensor():
' tags ' : { ' id ' : self . id_ , ' name ' : self . name } ,
' fields ' : data ,
}
sensors_client . write_points ( [ point ] )
try :
sensors_client . write_points ( [ point ] )
except requests . exceptions . ConnectionError :
logging . exception ( ' Error connecting to InfluxDB! ' )
return
logging . info ( ' Wrote %s data to InfluxDB: %s ' , self , data )
@ -174,6 +179,11 @@ class ERTSCMSensor(Sensor):
]
update_period = 60 * 60
def transform ( self , data ) :
# new gas meter
if ' Consumption ' in data :
data [ ' consumption_data ' ] = data [ ' Consumption ' ]
class OwnTracksSensor ( Sensor ) :
type_ = ' owntracks '
bad_keys = [
@ -280,8 +290,12 @@ async def process_data(data):
sensor . update ( data )
async def process_mqtt ( message ) :
text = message . payload . decode ( )
topic = message . topic
try :
text = message . payload . decode ( )
except UnicodeDecodeError :
return
topic = message . topic . value
logging . debug ( ' MQTT topic: %s , message: %s ' , topic , text )
if topic . startswith ( ' test ' ) :
@ -299,11 +313,21 @@ async def process_mqtt(message):
await process_data ( data )
async def fetch_mqtt ( ) :
async with Client ( ' localhost ' ) as client :
async with client . filtered_messages ( ' # ' ) as messages :
await client . subscribe ( ' # ' )
async for message in messages :
await process_mqtt ( message )
await asyncio . sleep ( 3 )
# from https://sbtinstruments.github.io/aiomqtt/reconnection.html
# modified to make new client since their code didn't work
# https://github.com/sbtinstruments/aiomqtt/issues/269
while True :
try :
async with aiomqtt . Client ( ' localhost ' ) as client :
await client . subscribe ( ' # ' )
async for message in client . messages :
await process_mqtt ( message )
except aiomqtt . MqttError :
logging . info ( ' MQTT connection lost, reconnecting in 5 seconds... ' )
await asyncio . sleep ( 5 )
async def owntracks ( request ) :
data = await request . json ( )
@ -321,9 +345,15 @@ async def owntracks(request):
return web . Response ( )
async def history ( request ) :
api_key = request . rel_url . query . get ( ' api_key ' , ' ' )
authed = api_key == settings . SENSORS_API_KEY
measurement = request . match_info . get ( ' measurement ' )
name = request . match_info . get ( ' name ' )
if not authed and measurement in [ ' owntracks ' , ' sleep ' ] :
return web . json_response ( [ ] )
if name not in [ x . name for x in sensors . sensors ] :
raise
@ -356,7 +386,7 @@ async def history(request):
raise
window = request . rel_url . query . get ( ' window ' , window )
if window not in [ ' 10m ' , ' 1h ' , ' 1d ' , ' 7d ' , ' 30d ' ] :
if window not in [ ' 1m ' , ' 3m ' , ' 1 0m ' , ' 1h ' , ' 2 h ' , ' 1d ' , ' 7d ' , ' 30d ' ] :
raise
if name == ' Water ' :
@ -393,6 +423,9 @@ async def history(request):
elif measurement == ' solar ' :
client = solar_client
q = ' select max( " actual_total " ) as actual_total, last( " lifetime_energy " )-first( " lifetime_energy " ) as lifetime_energy from ecu where time >= {} s and time < {} s group by time( {} ) fill(linear) ' . format ( start , end , window )
elif measurement == ' owntracks ' :
client = sensors_client
q = ' select first( " lat " ) as lat, first( " lon " ) as lon from owntracks where " acc " < 100 and " name " = \' {} \' and time >= {} s and time < {} s group by time( {} ) fill(previous) ' . format ( name , start , end , window )
else :
raise
@ -411,7 +444,7 @@ async def latest(request):
if sensor . type_ in [ ' solar ' ] :
continue
if not authed and sensor . type_ in [ ' owntracks ' ] :
if not authed and sensor . type_ in [ ' owntracks ' , ' sleep ' ] :
continue
q = ' select * from {} where " name " = \' {} \' order by desc limit 1 ' . format ( sensor . type_ , sensor . name )
@ -426,7 +459,7 @@ async def latest(request):
return web . json_response ( result )
async def index ( request ) :
return web . Response ( text = ' hello world ' , content_type = ' text/html ' )
return web . Response ( text = ' sensors api ' , content_type = ' text/html ' )
async def run_webserver ( ) :
#web.run_app(app, port=PORT, loop=loop)
@ -444,7 +477,10 @@ def task_died(future):
logging . error ( ' Sensors server task died! ' )
else :
logging . error ( ' Sensors server task died! Waiting 60s and exiting... ' )
controller_message ( ' Sensors server task died! Waiting 60s and exiting... ' )
try :
controller_message ( ' Sensors server task died! Waiting 60s and exiting... ' )
except : # we want this to succeed no matter what
pass
time . sleep ( 60 )
exit ( )
@ -456,7 +492,7 @@ if __name__ == '__main__':
sensors . add ( ThermostatSensor ( ' thermostat2 ' , ' 192.168.69.152 ' , ' Venstar ' ) )
sensors . add ( ERTSCMSensor ( ' 31005493 ' , ' Water ' ) )
sensors . add ( ERTSCMSensor ( ' 41249312 ' , ' Gas ' ) )
sensors . add ( ERTSCMSensor ( ' 78628180 ' , ' Gas ' ) )
sensors . add ( OwnTracksSensor ( ' owntracks1 ' , ' OwnTracks ' ) )
sensors . add ( AirSensor ( ' air1 ' , ' Living Room ' ) )
sensors . add ( Acurite606TX ( ' 59 ' , ' Outside ' ) )