diff --git a/main.py b/main.py index 2224595..97f1ff2 100644 --- a/main.py +++ b/main.py @@ -1,29 +1,325 @@ import os + +DEBUG = os.environ.get('DEBUG') +PROD = os.environ.get('PROD') + import logging logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s', - level=logging.DEBUG if os.environ.get('DEBUG') else logging.INFO) -logging.getLogger('aiohttp').setLevel(logging.DEBUG if os.environ.get('DEBUG') else logging.WARNING) + level=logging.DEBUG if DEBUG else logging.INFO) +logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING) + +import settings import asyncio -from aiohttp import web +import json +import time +from aiohttp import web, ClientSession, ClientError +from asyncio_mqtt import Client from datetime import datetime, timedelta import pytz +TIMEZONE = pytz.timezone('America/Edmonton') app = web.Application() +http_session = None from influxdb import InfluxDBClient -client = InfluxDBClient('localhost', 8086, database='sensors1') +client = InfluxDBClient('localhost', 8086, database='sensors1' if PROD else 'sensors1dev') +PORT = 6903 if PROD else 6904 + +class Sensors(): + sensors = [] + + def add(self, sensor): + self.sensors.append(sensor) + + def get(self, id_): + for sensor in self.sensors: + if str(sensor.id_) == str(id_): + return sensor + return False + + def __iter__(self): + for sensor in self.sensors: + yield sensor + +sensors = Sensors() + + +async def getter(url): + global http_session + if not http_session: + http_session = ClientSession() + + try: + async with http_session.get(url, timeout=10) as response: + response.raise_for_status() + result = await response.json(content_type=None, encoding='UTF-8') + return result + except BaseException as e: + logging.error(e) + return False + +class Sensor(): + value = {} + prev_value = {} + bad_keys = [] + last_update = time.time() + update_period = None + + def __init__(self, id_, name): + self.id_ = id_ + self.name = name + + def __str__(self): + return '{} {} (ID: {})'.format(self.type_, self.name, self.id_) + + def transform(self, data): + return + + def changed(self): + before = self.prev_value.copy() + for key in self.bad_keys: + before.pop(key, None) + after = self.value.copy() + for key in self.bad_keys: + after.pop(key, None) + + return str(before) != str(after) + + def log(self): + if not self.value or not self.changed(): + return + + data = self.value.copy() + self.transform(data) + + for key in self.bad_keys: + data.pop(key, None) + for key in ['id', 'time']: + data.pop(key, None) + + timestamp = data.pop('timestamp', None) + if not timestamp: + timestamp = datetime.utcnow().replace(microsecond=0) + + point = { + 'time': timestamp, + 'measurement': self.type_, + 'tags': {'id': self.id_, 'name': self.name}, + 'fields': data, + } + client.write_points([point]) + + logging.info('Wrote %s data to InfluxDB.', self) + + def check_update(self): + if self.update_period: + if time.time() - self.last_update > self.update_period: + logging.error('Missed expected update from %s.', self) + self.last_update = time.time() + + def update(self, data): + self.last_update = time.time() + self.prev_value = self.value + self.value = data + self.log() + + async def poll(self): + return + +class ThermostatSensor(Sensor): + type_ = 'thermostat' + bad_keys = [ + 'name', + 'schedule', + 'schedulepart', + 'away', + 'cooltempmin', + 'cooltempmax', + 'heattempmin', + 'heattempmax', + 'dehum_setpoint' + ] + update_period = 300 + + def __init__(self, id_, ip, name): + self.id_ = id_ + self.ip = ip + self.name = name + + async def poll(self): + data = await getter('http://{}/query/info'.format(self.ip)) + self.update(data) + +class ERTSCMSensor(Sensor): + type_ = 'ertscm' + bad_keys = [ + 'model', + 'mic', + ] + update_period = 60*60 + +class OwnTracksSensor(Sensor): + type_ = 'owntracks' + bad_keys = [ + '_type', + 'topic', + 'tst', + 'created_at', + ] + update_period = 90 + +class DustSensor(Sensor): + type_ = 'dust' + update_period = 90 + + def transform(self, data): + for key, value in data.items(): + # what happens if you do this to a timestamp? + try: + data[key] = float(round(value, 1)) + except TypeError: + pass + +class Acurite606TX(Sensor): + type_ = 'temperature' + bad_keys = [ + 'model', + 'mic', + 'battery_ok', + ] + update_period = 40 + + def transform(self, data): + if data['battery_ok'] != 1: + logging.error('%s battery not ok!', self) + data['temperature_C'] = float(data['temperature_C']) + + +async def poll_sensors(): + while True: + for sensor in sensors: + await sensor.poll() + sensor.check_update() + + await asyncio.sleep(1) + +async def process_data(data): + sensor = sensors.get(data['id']) + if sensor: + sensor.update(data) + +async def process_mqtt(message): + text = message.payload.decode() + topic = message.topic + logging.debug('MQTT topic: %s, message: %s', topic, text) + + if topic == 'test': + logging.info('MQTT test, message: %s', text) + return + + try: + data = json.loads(text) + except json.JSONDecodeError: + return + + if 'id' not in data: + return + + await process_data(data) + +async def fetch_mqtt(): + async with Client('localhost') as client: + async with client.filtered_messages('#') as messages: + await client.subscribe('#') + async for message in messages: + await process_mqtt(message) + +async def owntracks(request): + data = await request.json() + logging.info('Web data: %s', str(data)) + + if data.get('_type', '') == 'location': + data['id'] = data['topic'].split('/')[-1] + data['timestamp'] = datetime.utcfromtimestamp(data['tst']) + if 'inregions' in data: + data['inregions'] = ','.join(data['inregions']) + await process_data(data) + else: + logging.info('Not a location, skipping.') + + return web.Response() + +async def history(request): + measurement = request.match_info.get('measurement') + name = request.match_info.get('name') + + end_unix = request.rel_url.query.get('end', None) + if end_unix: + end = datetime.fromtimestamp(end_unix) + else: + end = datetime.now(tz=pytz.UTC) + + duration = request.rel_url.query.get('duration', 'today') + + if duration == 'today': + now_tz = datetime.now(tz=TIMEZONE) + start = now_tz.replace(hour=0, minute=0, second=0, microsecond=0) + window = '5m' + elif duration == 'day': + start = end - timedelta(days=1) + window = '5m' + elif duration == 'week': + start = end - timedelta(days=7) + window = '30m' + elif duration == 'month': + start = end - timedelta(days=30) + window = '2h' + elif duration == 'year': + start = end - timedelta(days=365) + window = '24h' + + start = int(start.timestamp()) + end = int(end.timestamp()) + + if measurement == 'temperature': + q = 'select mean("temperature_C") as temperature_C from temperature where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(none)'.format(name, start, end, window) + elif measurement == 'ertscm': + q = 'select max("consumption_data") as consumption_data from ertscm where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(none)'.format(name, start, end, window) + elif measurement == 'dust': + q = 'select max("max_p10") as max_p10, max("max_p25") as max_p25 from dust where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(none)'.format(name, start, end, window) + + #if window and moving_average: + # q = 'select moving_average(mean("value"),{}) as value from {} where "name" = \'{}\' and time >= {}s and time < {}s group by time({}m) fill(none)'.format(moving_average, measurement, name, start, end, window) + #elif window: + # q = 'select mean("value") as value from {} where "name" = \'{}\' and time >= {}s and time < {}s group by time({}m) fill(none)'.format(measurement, name, start, end, window) + #elif moving_average: + # q = 'select moving_average("value", {}) as value from {} where "name" = \'{}\' and time >= {}s and time < {}s'.format(moving_average, name, start, end) + #else: + # q = 'select value from {} where "name" = \'{}\' and time >= {}s and time < {}s'.format(measurement, name, start, end) + + result = list(client.query(q).get_points()) + + return web.json_response(result) -async def get_thermostat(): - pass async def index(request): return web.Response(text='hello world', content_type='text/html') if __name__ == '__main__': app.router.add_get('/', index) + app.router.add_post('/owntracks', owntracks) + app.router.add_get('/history/{measurement}/{name}', history) + + sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar')) + sensors.add(ERTSCMSensor('31005493', 'Water')) + sensors.add(ERTSCMSensor('41249312', 'Gas')) + sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks')) + sensors.add(DustSensor('dust1', 'Nook')) + sensors.add(Acurite606TX('231', 'Outside')) loop = asyncio.get_event_loop() - loop.create_task(get_thermostat()) - web.run_app(app, port=6903) + loop.create_task(poll_sensors()) + loop.create_task(fetch_mqtt()) + web.run_app(app, port=PORT, loop=loop) diff --git a/settings.py.example b/settings.py.example new file mode 100644 index 0000000..6f836dd --- /dev/null +++ b/settings.py.example @@ -0,0 +1 @@ +THERMOSTAT_IP = ''