import os DEBUG = os.environ.get('DEBUG') PROD = os.environ.get('PROD') import logging logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING) import settings import asyncio import json import time import requests from aiohttp import web, ClientSession, ClientError from asyncio_mqtt import Client from datetime import datetime, timedelta import pytz TIMEZONE = pytz.timezone('America/Edmonton') app = web.Application() http_session = None from influxdb import InfluxDBClient sensors_client = InfluxDBClient('localhost', 8086, database='sensors1' if PROD else 'sensors1dev') solar_client = InfluxDBClient('localhost', 8086, database='solar2') PORT = 6903 if PROD else 6904 def controller_message(message): payload = dict(home=message) r = requests.post('https://tbot.tannercollin.com/message', data=payload, timeout=10) if r.status_code == 200: return True else: logging.exception('Unable to communicate with controller! Message: ' + message) return False class Sensors(): sensors = [] def add(self, sensor): self.sensors.append(sensor) def get(self, id_): for sensor in self.sensors: if str(sensor.id_) == str(id_): return sensor return False def __iter__(self): for sensor in self.sensors: yield sensor sensors = Sensors() async def getter(url): global http_session if not http_session: http_session = ClientSession() try: async with http_session.get(url, timeout=10) as response: response.raise_for_status() result = await response.json(content_type=None, encoding='UTF-8') return result except BaseException as e: logging.error(e) return False class Sensor(): value = {} prev_value = {} bad_keys = [] last_update = time.time() update_period = None def __init__(self, id_, name): self.id_ = id_ self.name = name def __str__(self): return '{} {} (ID: {})'.format(self.type_, self.name, self.id_) def transform(self, data): return def changed(self): before = self.prev_value.copy() for key in self.bad_keys: before.pop(key, None) after = self.value.copy() for key in self.bad_keys: after.pop(key, None) return str(before) != str(after) def log(self): if not self.value or not self.changed(): return data = self.value.copy() self.transform(data) for key in self.bad_keys: data.pop(key, None) for key in ['id', 'time']: data.pop(key, None) timestamp = data.pop('timestamp', None) if not timestamp: timestamp = datetime.utcnow().replace(microsecond=0) point = { 'time': timestamp, 'measurement': self.type_, 'tags': {'id': self.id_, 'name': self.name}, 'fields': data, } sensors_client.write_points([point]) logging.info('Wrote %s data to InfluxDB: %s', self, data) def check_update(self): if self.update_period: if time.time() - self.last_update > self.update_period: logging.warning('Missed expected update from %s.', self) self.last_update = time.time() def update(self, data): self.last_update = time.time() self.prev_value = self.value self.value = data self.log() async def poll(self): return class ThermostatSensor(Sensor): type_ = 'thermostat' bad_keys = [ 'name', 'schedule', 'schedulepart', 'away', 'cooltempmin', 'cooltempmax', 'heattempmin', 'heattempmax', 'dehum_setpoint' ] update_period = 300 def __init__(self, id_, ip, name): self.id_ = id_ self.ip = ip self.name = name async def poll(self): data = await getter('http://{}/query/info'.format(self.ip)) or {} self.update(data) class ERTSCMSensor(Sensor): type_ = 'ertscm' bad_keys = [ 'model', 'mic', ] update_period = 60*60 class OwnTracksSensor(Sensor): type_ = 'owntracks' bad_keys = [ '_type', 'topic', 'tst', 'created_at', ] update_period = 90 class DustSensor(Sensor): type_ = 'dust' update_period = 90 def transform(self, data): for key, value in data.items(): # what happens if you do this to a timestamp? try: data[key] = float(round(value, 1)) except TypeError: pass class AirSensor(Sensor): type_ = 'air' update_period = 15 def transform(self, data): for key, value in data.items(): # what happens if you do this to a timestamp? try: data[key] = float(round(value, 1)) except TypeError: pass class SleepSensor(Sensor): type_ = 'sleep' update_period = 90 def transform(self, data): for key, value in data.items(): # what happens if you do this to a timestamp? try: data[key] = float(round(value, 1)) except TypeError: pass class SolarSensor(Sensor): type_ = 'solar' class Acurite606TX(Sensor): type_ = 'temperature' bad_keys = [ 'model', 'mic', 'battery_ok', ] update_period = 40 offset = 0.0 def __init__(self, id_, name, offset=0.0): self.id_ = id_ self.name = name self.offset = offset def transform(self, data): if data['battery_ok'] != 1: logging.error('%s battery not ok!', self) data['temperature_C'] = float(data['temperature_C']) + self.offset class AcuRite6002RM(Sensor): type_ = 'temperature' bad_keys = [ 'model', 'mic', 'battery_ok', 'channel', ] update_period = 40 offset = 0.0 def __init__(self, id_, name, offset=0.0): self.id_ = id_ self.name = name self.offset = offset def transform(self, data): if data['battery_ok'] != 1: logging.error('%s battery not ok!', self) data['temperature_C'] = float(data['temperature_C']) + self.offset data['humidity'] = float(data['humidity']) async def poll_sensors(): while True: for sensor in sensors: await sensor.poll() sensor.check_update() await asyncio.sleep(1) async def process_data(data): sensor = sensors.get(data['id']) if sensor: sensor.update(data) async def process_mqtt(message): text = message.payload.decode() topic = message.topic logging.debug('MQTT topic: %s, message: %s', topic, text) if topic.startswith('test'): logging.info('MQTT test, message: %s', text) return try: data = json.loads(text) except json.JSONDecodeError: return if type(data) != dict or 'id' not in data: return await process_data(data) async def fetch_mqtt(): async with Client('localhost') as client: async with client.filtered_messages('#') as messages: await client.subscribe('#') async for message in messages: await process_mqtt(message) async def owntracks(request): data = await request.json() logging.debug('Web data: %s', str(data)) if data.get('_type', '') == 'location': data['id'] = data['topic'].split('/')[-1] data['timestamp'] = datetime.utcfromtimestamp(data['tst']) if 'inregions' in data: data['inregions'] = ','.join(data['inregions']) await process_data(data) else: logging.info('Not a location, skipping.') return web.Response() async def history(request): measurement = request.match_info.get('measurement') name = request.match_info.get('name') if name not in [x.name for x in sensors.sensors]: raise end_unix = request.rel_url.query.get('end', None) if end_unix: end_unix = int(end_unix) end = datetime.fromtimestamp(end_unix) else: end = datetime.now(tz=pytz.UTC) duration = request.rel_url.query.get('duration', 'day') if duration == 'today': now_tz = datetime.now(tz=TIMEZONE) start = now_tz.replace(hour=0, minute=0, second=0, microsecond=0) window = '10m' elif duration == 'day': start = end - timedelta(days=1) window = '10m' elif duration == 'week': start = end - timedelta(days=7) window = '1h' elif duration == 'month': start = end - timedelta(days=30) window = '1d' elif duration == 'year': start = end - timedelta(days=365) window = '1d' else: raise window = request.rel_url.query.get('window', window) if window not in ['10m', '1h', '1d', '7d', '30d']: raise if name == 'Water': scale = 10 elif name == 'Gas': scale = 0.001055*1000 else: scale = 1 start = int(start.timestamp()) end = int(end.timestamp()) if measurement == 'temperature': client = sensors_client q = 'select mean("temperature_C") as temperature_C, mean("humidity") as humidity from temperature where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'ertscm': client = sensors_client q = 'select derivative(max("consumption_data"))*{} as delta, max("consumption_data")*{} as max from ertscm where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(scale, scale, name, start, end, window) elif measurement == 'thermostat': client = sensors_client q = 'select first("spacetemp") as spacetemp, first("heattemp") as heattemp, mode("state") as state from thermostat where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window) elif measurement == 'dust': client = sensors_client q = 'select max("avg_p10") as max_p10, max("avg_p25") as max_p25 from dust where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'air': client = sensors_client q = 'select max("pm10") as max_p10, max("pm25") as max_p25, max("co2") as max_co2, max("voc_idx") as max_voc from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'lux': client = sensors_client q = 'select mean("lux") as lux from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'sleep': client = sensors_client q = 'select max("max_mag") as max_mag from sleep where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'solar': client = solar_client q = 'select max("actual_total") as actual_total, last("lifetime_energy")-first("lifetime_energy") as lifetime_energy from ecu where time >= {}s and time < {}s group by time({}) fill(linear)'.format(start, end, window) else: raise q += ' tz(\'America/Edmonton\')' result = list(client.query(q).get_points()) return web.json_response(result) async def latest(request): result = dict() api_key = request.rel_url.query.get('api_key', '') authed = api_key == settings.SENSORS_API_KEY for sensor in sensors: if sensor.type_ in ['solar']: continue if not authed and sensor.type_ in ['owntracks']: continue q = 'select * from {} where "name" = \'{}\' order by desc limit 1'.format(sensor.type_, sensor.name) points = sensors_client.query(q).get_points() point = list(points) if sensor.type_ not in result: result[sensor.type_] = dict() result[sensor.type_][sensor.name] = point return web.json_response(result) async def index(request): return web.Response(text='hello world', content_type='text/html') async def run_webserver(): #web.run_app(app, port=PORT, loop=loop) logging.info('Starting webserver on port: %s', PORT) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', PORT) await site.start() while True: await asyncio.sleep(10) def task_died(future): if os.environ.get('SHELL'): logging.error('Sensors server task died!') else: logging.error('Sensors server task died! Waiting 60s and exiting...') controller_message('Sensors server task died! Waiting 60s and exiting...') time.sleep(60) exit() if __name__ == '__main__': app.router.add_get('/', index) app.router.add_post('/owntracks', owntracks) app.router.add_get('/history/{measurement}/{name}', history) app.router.add_get('/latest', latest) sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar')) sensors.add(ERTSCMSensor('31005493', 'Water')) sensors.add(ERTSCMSensor('41249312', 'Gas')) sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks')) sensors.add(AirSensor('air1', 'Living Room')) sensors.add(Acurite606TX('231', 'Outside')) sensors.add(AcuRite6002RM('5613', 'Seeds', 0.0)) # A sensors.add(AcuRite6002RM('5109', 'Nook', 0.4)) # B sensors.add(AcuRite6002RM('11087', 'Bedroom', -0.3)) # C sensors.add(SleepSensor('sleep1', 'Bedroom')) sensors.add(SolarSensor('solar', 'Solar')) loop = asyncio.get_event_loop() loop.create_task(poll_sensors()).add_done_callback(task_died) loop.create_task(fetch_mqtt()).add_done_callback(task_died) loop.create_task(run_webserver()).add_done_callback(task_died) loop.run_forever()