2021-12-24 08:00:12 +00:00
import os
2022-01-05 05:43:50 +00:00
DEBUG = os . environ . get ( ' DEBUG ' )
PROD = os . environ . get ( ' PROD ' )
2021-12-24 08:00:12 +00:00
import logging
logging . basicConfig (
format = ' [ %(asctime)s ] %(levelname)s %(module)s / %(funcName)s : - %(message)s ' ,
2022-01-05 05:43:50 +00:00
level = logging . DEBUG if DEBUG else logging . INFO )
logging . getLogger ( ' aiohttp ' ) . setLevel ( logging . DEBUG if DEBUG else logging . WARNING )
import settings
2021-12-24 08:00:12 +00:00
import asyncio
2022-01-05 05:43:50 +00:00
import json
import time
from aiohttp import web , ClientSession , ClientError
from asyncio_mqtt import Client
2021-12-24 08:00:12 +00:00
from datetime import datetime , timedelta
import pytz
2022-01-05 05:43:50 +00:00
TIMEZONE = pytz . timezone ( ' America/Edmonton ' )
2021-12-24 08:00:12 +00:00
app = web . Application ( )
2022-01-05 05:43:50 +00:00
http_session = None
2021-12-24 08:00:12 +00:00
from influxdb import InfluxDBClient
2022-01-05 05:43:50 +00:00
client = InfluxDBClient ( ' localhost ' , 8086 , database = ' sensors1 ' if PROD else ' sensors1dev ' )
PORT = 6903 if PROD else 6904
class Sensors ( ) :
sensors = [ ]
def add ( self , sensor ) :
self . sensors . append ( sensor )
def get ( self , id_ ) :
for sensor in self . sensors :
if str ( sensor . id_ ) == str ( id_ ) :
return sensor
return False
def __iter__ ( self ) :
for sensor in self . sensors :
yield sensor
sensors = Sensors ( )
async def getter ( url ) :
global http_session
if not http_session :
http_session = ClientSession ( )
try :
async with http_session . get ( url , timeout = 10 ) as response :
response . raise_for_status ( )
result = await response . json ( content_type = None , encoding = ' UTF-8 ' )
return result
except BaseException as e :
logging . error ( e )
return False
class Sensor ( ) :
value = { }
prev_value = { }
bad_keys = [ ]
last_update = time . time ( )
update_period = None
def __init__ ( self , id_ , name ) :
self . id_ = id_
self . name = name
def __str__ ( self ) :
return ' {} {} (ID: {} ) ' . format ( self . type_ , self . name , self . id_ )
def transform ( self , data ) :
return
def changed ( self ) :
before = self . prev_value . copy ( )
for key in self . bad_keys :
before . pop ( key , None )
after = self . value . copy ( )
for key in self . bad_keys :
after . pop ( key , None )
return str ( before ) != str ( after )
def log ( self ) :
if not self . value or not self . changed ( ) :
return
data = self . value . copy ( )
self . transform ( data )
for key in self . bad_keys :
data . pop ( key , None )
for key in [ ' id ' , ' time ' ] :
data . pop ( key , None )
timestamp = data . pop ( ' timestamp ' , None )
if not timestamp :
timestamp = datetime . utcnow ( ) . replace ( microsecond = 0 )
point = {
' time ' : timestamp ,
' measurement ' : self . type_ ,
' tags ' : { ' id ' : self . id_ , ' name ' : self . name } ,
' fields ' : data ,
}
client . write_points ( [ point ] )
logging . info ( ' Wrote %s data to InfluxDB. ' , self )
def check_update ( self ) :
if self . update_period :
if time . time ( ) - self . last_update > self . update_period :
logging . error ( ' Missed expected update from %s . ' , self )
self . last_update = time . time ( )
def update ( self , data ) :
self . last_update = time . time ( )
self . prev_value = self . value
self . value = data
self . log ( )
async def poll ( self ) :
return
class ThermostatSensor ( Sensor ) :
type_ = ' thermostat '
bad_keys = [
' name ' ,
' schedule ' ,
' schedulepart ' ,
' away ' ,
' cooltempmin ' ,
' cooltempmax ' ,
' heattempmin ' ,
' heattempmax ' ,
' dehum_setpoint '
]
update_period = 300
def __init__ ( self , id_ , ip , name ) :
self . id_ = id_
self . ip = ip
self . name = name
async def poll ( self ) :
data = await getter ( ' http:// {} /query/info ' . format ( self . ip ) )
self . update ( data )
class ERTSCMSensor ( Sensor ) :
type_ = ' ertscm '
bad_keys = [
' model ' ,
' mic ' ,
]
update_period = 60 * 60
class OwnTracksSensor ( Sensor ) :
type_ = ' owntracks '
bad_keys = [
' _type ' ,
' topic ' ,
' tst ' ,
' created_at ' ,
]
update_period = 90
class DustSensor ( Sensor ) :
type_ = ' dust '
update_period = 90
def transform ( self , data ) :
for key , value in data . items ( ) :
# what happens if you do this to a timestamp?
try :
data [ key ] = float ( round ( value , 1 ) )
except TypeError :
pass
class Acurite606TX ( Sensor ) :
type_ = ' temperature '
bad_keys = [
' model ' ,
' mic ' ,
' battery_ok ' ,
]
update_period = 40
def transform ( self , data ) :
if data [ ' battery_ok ' ] != 1 :
logging . error ( ' %s battery not ok! ' , self )
data [ ' temperature_C ' ] = float ( data [ ' temperature_C ' ] )
async def poll_sensors ( ) :
while True :
for sensor in sensors :
await sensor . poll ( )
sensor . check_update ( )
await asyncio . sleep ( 1 )
async def process_data ( data ) :
sensor = sensors . get ( data [ ' id ' ] )
if sensor :
sensor . update ( data )
async def process_mqtt ( message ) :
text = message . payload . decode ( )
topic = message . topic
logging . debug ( ' MQTT topic: %s , message: %s ' , topic , text )
if topic == ' test ' :
logging . info ( ' MQTT test, message: %s ' , text )
return
try :
data = json . loads ( text )
except json . JSONDecodeError :
return
if ' id ' not in data :
return
await process_data ( data )
async def fetch_mqtt ( ) :
async with Client ( ' localhost ' ) as client :
async with client . filtered_messages ( ' # ' ) as messages :
await client . subscribe ( ' # ' )
async for message in messages :
await process_mqtt ( message )
async def owntracks ( request ) :
data = await request . json ( )
logging . info ( ' Web data: %s ' , str ( data ) )
if data . get ( ' _type ' , ' ' ) == ' location ' :
data [ ' id ' ] = data [ ' topic ' ] . split ( ' / ' ) [ - 1 ]
data [ ' timestamp ' ] = datetime . utcfromtimestamp ( data [ ' tst ' ] )
if ' inregions ' in data :
data [ ' inregions ' ] = ' , ' . join ( data [ ' inregions ' ] )
await process_data ( data )
else :
logging . info ( ' Not a location, skipping. ' )
return web . Response ( )
async def history ( request ) :
measurement = request . match_info . get ( ' measurement ' )
name = request . match_info . get ( ' name ' )
end_unix = request . rel_url . query . get ( ' end ' , None )
if end_unix :
end = datetime . fromtimestamp ( end_unix )
else :
end = datetime . now ( tz = pytz . UTC )
duration = request . rel_url . query . get ( ' duration ' , ' today ' )
if duration == ' today ' :
now_tz = datetime . now ( tz = TIMEZONE )
start = now_tz . replace ( hour = 0 , minute = 0 , second = 0 , microsecond = 0 )
window = ' 5m '
elif duration == ' day ' :
start = end - timedelta ( days = 1 )
window = ' 5m '
elif duration == ' week ' :
start = end - timedelta ( days = 7 )
window = ' 30m '
elif duration == ' month ' :
start = end - timedelta ( days = 30 )
window = ' 2h '
elif duration == ' year ' :
start = end - timedelta ( days = 365 )
window = ' 24h '
start = int ( start . timestamp ( ) )
end = int ( end . timestamp ( ) )
if measurement == ' temperature ' :
q = ' select mean( " temperature_C " ) as temperature_C from temperature where " name " = \' {} \' and time >= {} s and time < {} s group by time( {} ) fill(none) ' . format ( name , start , end , window )
elif measurement == ' ertscm ' :
q = ' select max( " consumption_data " ) as consumption_data from ertscm where " name " = \' {} \' and time >= {} s and time < {} s group by time( {} ) fill(none) ' . format ( name , start , end , window )
elif measurement == ' dust ' :
q = ' select max( " max_p10 " ) as max_p10, max( " max_p25 " ) as max_p25 from dust where " name " = \' {} \' and time >= {} s and time < {} s group by time( {} ) fill(none) ' . format ( name , start , end , window )
#if window and moving_average:
# q = 'select moving_average(mean("value"),{}) as value from {} where "name" = \'{}\' and time >= {}s and time < {}s group by time({}m) fill(none)'.format(moving_average, measurement, name, start, end, window)
#elif window:
# q = 'select mean("value") as value from {} where "name" = \'{}\' and time >= {}s and time < {}s group by time({}m) fill(none)'.format(measurement, name, start, end, window)
#elif moving_average:
# q = 'select moving_average("value", {}) as value from {} where "name" = \'{}\' and time >= {}s and time < {}s'.format(moving_average, name, start, end)
#else:
# q = 'select value from {} where "name" = \'{}\' and time >= {}s and time < {}s'.format(measurement, name, start, end)
result = list ( client . query ( q ) . get_points ( ) )
return web . json_response ( result )
2021-12-24 08:00:12 +00:00
async def index ( request ) :
return web . Response ( text = ' hello world ' , content_type = ' text/html ' )
if __name__ == ' __main__ ' :
app . router . add_get ( ' / ' , index )
2022-01-05 05:43:50 +00:00
app . router . add_post ( ' /owntracks ' , owntracks )
app . router . add_get ( ' /history/ {measurement} / {name} ' , history )
sensors . add ( ThermostatSensor ( ' thermostat2 ' , ' 192.168.69.152 ' , ' Venstar ' ) )
sensors . add ( ERTSCMSensor ( ' 31005493 ' , ' Water ' ) )
sensors . add ( ERTSCMSensor ( ' 41249312 ' , ' Gas ' ) )
sensors . add ( OwnTracksSensor ( ' owntracks1 ' , ' OwnTracks ' ) )
sensors . add ( DustSensor ( ' dust1 ' , ' Nook ' ) )
sensors . add ( Acurite606TX ( ' 231 ' , ' Outside ' ) )
2021-12-24 08:00:12 +00:00
loop = asyncio . get_event_loop ( )
2022-01-05 05:43:50 +00:00
loop . create_task ( poll_sensors ( ) )
loop . create_task ( fetch_mqtt ( ) )
web . run_app ( app , port = PORT , loop = loop )