sensors/main.py

472 lines
14 KiB
Python

import os
DEBUG = os.environ.get('DEBUG')
PROD = os.environ.get('PROD')
import logging
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING)
import settings
import asyncio
import json
import time
import requests
from aiohttp import web, ClientSession, ClientError
from asyncio_mqtt import Client
from datetime import datetime, timedelta
import pytz
TIMEZONE = pytz.timezone('America/Edmonton')
app = web.Application()
http_session = None
from influxdb import InfluxDBClient
sensors_client = InfluxDBClient('localhost', 8086, database='sensors1' if PROD else 'sensors1dev')
solar_client = InfluxDBClient('localhost', 8086, database='solar2')
PORT = 6903 if PROD else 6904
def controller_message(message):
payload = dict(home=message)
r = requests.post('https://tbot.tannercollin.com/message', data=payload, timeout=10)
if r.status_code == 200:
return True
else:
logging.exception('Unable to communicate with controller! Message: ' + message)
return False
class Sensors():
sensors = []
def add(self, sensor):
self.sensors.append(sensor)
def get(self, id_):
for sensor in self.sensors:
if str(sensor.id_) == str(id_):
return sensor
return False
def __iter__(self):
for sensor in self.sensors:
yield sensor
sensors = Sensors()
async def getter(url):
global http_session
if not http_session:
http_session = ClientSession()
try:
async with http_session.get(url, timeout=10) as response:
response.raise_for_status()
result = await response.json(content_type=None, encoding='UTF-8')
return result
except BaseException as e:
logging.error(e)
return False
class Sensor():
value = {}
prev_value = {}
bad_keys = []
last_update = time.time()
update_period = None
def __init__(self, id_, name):
self.id_ = id_
self.name = name
def __str__(self):
return '{} {} (ID: {})'.format(self.type_, self.name, self.id_)
def transform(self, data):
return
def changed(self):
before = self.prev_value.copy()
for key in self.bad_keys:
before.pop(key, None)
after = self.value.copy()
for key in self.bad_keys:
after.pop(key, None)
return str(before) != str(after)
def log(self):
if not self.value or not self.changed():
return
data = self.value.copy()
self.transform(data)
for key in self.bad_keys:
data.pop(key, None)
for key in ['id', 'time']:
data.pop(key, None)
timestamp = data.pop('timestamp', None)
if not timestamp:
timestamp = datetime.utcnow().replace(microsecond=0)
point = {
'time': timestamp,
'measurement': self.type_,
'tags': {'id': self.id_, 'name': self.name},
'fields': data,
}
sensors_client.write_points([point])
logging.info('Wrote %s data to InfluxDB: %s', self, data)
def check_update(self):
if self.update_period:
if time.time() - self.last_update > self.update_period:
logging.warning('Missed expected update from %s.', self)
self.last_update = time.time()
def update(self, data):
self.last_update = time.time()
self.prev_value = self.value
self.value = data
self.log()
async def poll(self):
return
class ThermostatSensor(Sensor):
type_ = 'thermostat'
bad_keys = [
'name',
'schedule',
'schedulepart',
'away',
'cooltempmin',
'cooltempmax',
'heattempmin',
'heattempmax',
'dehum_setpoint'
]
update_period = 300
def __init__(self, id_, ip, name):
self.id_ = id_
self.ip = ip
self.name = name
async def poll(self):
data = await getter('http://{}/query/info'.format(self.ip)) or {}
self.update(data)
class ERTSCMSensor(Sensor):
type_ = 'ertscm'
bad_keys = [
'model',
'mic',
]
update_period = 60*60
class OwnTracksSensor(Sensor):
type_ = 'owntracks'
bad_keys = [
'_type',
'topic',
'tst',
'created_at',
]
update_period = 90
class DustSensor(Sensor):
type_ = 'dust'
update_period = 90
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class AirSensor(Sensor):
type_ = 'air'
update_period = 15
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class SleepSensor(Sensor):
type_ = 'sleep'
update_period = 90
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class SolarSensor(Sensor):
type_ = 'solar'
class Acurite606TX(Sensor):
type_ = 'temperature'
bad_keys = [
'model',
'mic',
'battery_ok',
]
update_period = 40
offset = 0.0
def __init__(self, id_, name, offset=0.0):
self.id_ = id_
self.name = name
self.offset = offset
def transform(self, data):
if data['battery_ok'] != 1:
logging.error('%s battery not ok!', self)
data['temperature_C'] = float(data['temperature_C']) + self.offset
class AcuRite6002RM(Sensor):
type_ = 'temperature'
bad_keys = [
'model',
'mic',
'battery_ok',
'channel',
]
update_period = 40
offset = 0.0
def __init__(self, id_, name, offset=0.0):
self.id_ = id_
self.name = name
self.offset = offset
def transform(self, data):
if data['battery_ok'] != 1:
logging.error('%s battery not ok!', self)
data['temperature_C'] = float(data['temperature_C']) + self.offset
data['humidity'] = float(data['humidity'])
async def poll_sensors():
while True:
for sensor in sensors:
await sensor.poll()
sensor.check_update()
await asyncio.sleep(1)
async def process_data(data):
sensor = sensors.get(data['id'])
if sensor:
sensor.update(data)
async def process_mqtt(message):
text = message.payload.decode()
topic = message.topic
logging.debug('MQTT topic: %s, message: %s', topic, text)
if topic.startswith('test'):
logging.info('MQTT test, message: %s', text)
return
try:
data = json.loads(text)
except json.JSONDecodeError:
return
if type(data) != dict or 'id' not in data:
return
await process_data(data)
async def fetch_mqtt():
async with Client('localhost') as client:
async with client.filtered_messages('#') as messages:
await client.subscribe('#')
async for message in messages:
await process_mqtt(message)
async def owntracks(request):
data = await request.json()
logging.debug('Web data: %s', str(data))
if data.get('_type', '') == 'location':
data['id'] = data['topic'].split('/')[-1]
data['timestamp'] = datetime.utcfromtimestamp(data['tst'])
if 'inregions' in data:
data['inregions'] = ','.join(data['inregions'])
await process_data(data)
else:
logging.info('Not a location, skipping.')
return web.Response()
async def history(request):
measurement = request.match_info.get('measurement')
name = request.match_info.get('name')
if name not in [x.name for x in sensors.sensors]:
raise
end_unix = request.rel_url.query.get('end', None)
if end_unix:
end_unix = int(end_unix)
end = datetime.fromtimestamp(end_unix)
else:
end = datetime.now(tz=pytz.UTC)
duration = request.rel_url.query.get('duration', 'day')
if duration == 'today':
now_tz = datetime.now(tz=TIMEZONE)
start = now_tz.replace(hour=0, minute=0, second=0, microsecond=0)
window = '10m'
elif duration == 'day':
start = end - timedelta(days=1)
window = '10m'
elif duration == 'week':
start = end - timedelta(days=7)
window = '1h'
elif duration == 'month':
start = end - timedelta(days=30)
window = '1d'
elif duration == 'year':
start = end - timedelta(days=365)
window = '1d'
else:
raise
window = request.rel_url.query.get('window', window)
if window not in ['10m', '1h', '1d', '7d', '30d']:
raise
if name == 'Water':
scale = 10
elif name == 'Gas':
scale = 0.001055*1000
else:
scale = 1
start = int(start.timestamp())
end = int(end.timestamp())
if measurement == 'temperature':
client = sensors_client
q = 'select mean("temperature_C") as temperature_C, mean("humidity") as humidity from temperature where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'ertscm':
client = sensors_client
q = 'select derivative(max("consumption_data"))*{} as delta, max("consumption_data")*{} as max from ertscm where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(scale, scale, name, start, end, window)
elif measurement == 'thermostat':
client = sensors_client
q = 'select first("spacetemp") as spacetemp, first("heattemp") as heattemp, mode("state") as state from thermostat where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window)
elif measurement == 'dust':
client = sensors_client
q = 'select max("avg_p10") as max_p10, max("avg_p25") as max_p25 from dust where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'air':
client = sensors_client
q = 'select max("pm10") as max_p10, max("pm25") as max_p25, max("co2") as max_co2, max("voc_idx") as max_voc from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'lux':
client = sensors_client
q = 'select mean("lux") as lux from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'sleep':
client = sensors_client
q = 'select max("max_mag") as max_mag from sleep where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'solar':
client = solar_client
q = 'select max("actual_total") as actual_total, last("lifetime_energy")-first("lifetime_energy") as lifetime_energy from ecu where time >= {}s and time < {}s group by time({}) fill(linear)'.format(start, end, window)
else:
raise
q += ' tz(\'America/Edmonton\')'
result = list(client.query(q).get_points())
return web.json_response(result)
async def latest(request):
result = dict()
api_key = request.rel_url.query.get('api_key', '')
authed = api_key == settings.SENSORS_API_KEY
for sensor in sensors:
if sensor.type_ in ['solar']:
continue
if not authed and sensor.type_ in ['owntracks']:
continue
q = 'select * from {} where "name" = \'{}\' order by desc limit 1'.format(sensor.type_, sensor.name)
points = sensors_client.query(q).get_points()
point = list(points)
if sensor.type_ not in result:
result[sensor.type_] = dict()
result[sensor.type_][sensor.name] = point
return web.json_response(result)
async def index(request):
return web.Response(text='hello world', content_type='text/html')
async def run_webserver():
#web.run_app(app, port=PORT, loop=loop)
logging.info('Starting webserver on port: %s', PORT)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', PORT)
await site.start()
while True: await asyncio.sleep(10)
def task_died(future):
if os.environ.get('SHELL'):
logging.error('Sensors server task died!')
else:
logging.error('Sensors server task died! Waiting 60s and exiting...')
controller_message('Sensors server task died! Waiting 60s and exiting...')
time.sleep(60)
exit()
if __name__ == '__main__':
app.router.add_get('/', index)
app.router.add_post('/owntracks', owntracks)
app.router.add_get('/history/{measurement}/{name}', history)
app.router.add_get('/latest', latest)
sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar'))
sensors.add(ERTSCMSensor('31005493', 'Water'))
sensors.add(ERTSCMSensor('41249312', 'Gas'))
sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks'))
sensors.add(AirSensor('air1', 'Living Room'))
sensors.add(Acurite606TX('231', 'Outside'))
sensors.add(AcuRite6002RM('5613', 'Seeds', 0.0)) # A
sensors.add(AcuRite6002RM('5109', 'Nook', 0.4)) # B
sensors.add(AcuRite6002RM('11087', 'Bedroom', -0.3)) # C
sensors.add(SleepSensor('sleep1', 'Bedroom'))
sensors.add(SolarSensor('solar', 'Solar'))
loop = asyncio.get_event_loop()
loop.create_task(poll_sensors()).add_done_callback(task_died)
loop.create_task(fetch_mqtt()).add_done_callback(task_died)
loop.create_task(run_webserver()).add_done_callback(task_died)
loop.run_forever()