import os DEBUG = os.environ.get('DEBUG') PROD = os.environ.get('PROD') import logging logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING) logging.info('') logging.info('BOOT UP') import settings import asyncio import json import time import requests from aiohttp import web, ClientSession, ClientError import aiomqtt from datetime import datetime, timedelta import pytz TIMEZONE = pytz.timezone('America/Edmonton') app = web.Application() http_session = None from influxdb import InfluxDBClient sensors_client = InfluxDBClient('localhost', 8086, database='sensors1' if PROD else 'sensors1dev') solar_client = InfluxDBClient('localhost', 8086, database='solar2') PORT = 6903 if PROD else 6904 def controller_message(message): payload = dict(home=message) r = requests.post('https://tbot.tannercollin.com/message', data=payload, timeout=10) if r.status_code == 200: return True else: logging.exception('Unable to communicate with controller! Message: ' + message) return False class Sensors(): sensors = [] def add(self, sensor): self.sensors.append(sensor) def get(self, id_): for sensor in self.sensors: if str(sensor.id_) == str(id_): return sensor return False def __iter__(self): for sensor in self.sensors: yield sensor sensors = Sensors() async def getter(url): global http_session if not http_session: http_session = ClientSession() try: async with http_session.get(url, timeout=10) as response: response.raise_for_status() result = await response.json(content_type=None, encoding='UTF-8') return result except BaseException as e: logging.error(e) return False class Sensor(): value = {} prev_value = {} bad_keys = [] last_update = time.time() update_period = None def __init__(self, id_, name): self.id_ = id_ self.name = name def __str__(self): return '{} {} (ID: {})'.format(self.type_, self.name, self.id_) def transform(self, data): return def changed(self): before = self.prev_value.copy() for key in self.bad_keys: before.pop(key, None) after = self.value.copy() for key in self.bad_keys: after.pop(key, None) return str(before) != str(after) def log(self): if not self.value or not self.changed(): return data = self.value.copy() self.transform(data) for key in self.bad_keys: data.pop(key, None) for key in ['id', 'time']: data.pop(key, None) timestamp = data.pop('timestamp', None) if not timestamp: timestamp = datetime.utcnow().replace(microsecond=0) point = { 'time': timestamp, 'measurement': self.type_, 'tags': {'id': self.id_, 'name': self.name}, 'fields': data, } try: sensors_client.write_points([point]) except requests.exceptions.ConnectionError: logging.exception('Error connecting to InfluxDB!') return logging.info('Wrote %s data to InfluxDB: %s', self, data) def check_update(self): if self.update_period: if time.time() - self.last_update > self.update_period: logging.warning('Missed expected update from %s.', self) self.last_update = time.time() def update(self, data): self.last_update = time.time() self.prev_value = self.value self.value = data self.log() async def poll(self): return class ThermostatSensor(Sensor): type_ = 'thermostat' bad_keys = [ 'name', 'schedule', 'schedulepart', 'away', 'cooltempmin', 'cooltempmax', 'heattempmin', 'heattempmax', 'dehum_setpoint' ] update_period = 300 def __init__(self, id_, ip, name): self.id_ = id_ self.ip = ip self.name = name async def poll(self): data = await getter('http://{}/query/info'.format(self.ip)) or {} self.update(data) class ERTSCMSensor(Sensor): type_ = 'ertscm' bad_keys = [ 'model', 'mic', ] update_period = 60*60 def transform(self, data): # new gas meter if 'Consumption' in data: data['consumption_data'] = data['Consumption'] class OwnTracksSensor(Sensor): type_ = 'owntracks' bad_keys = [ '_type', 'topic', 'tst', 'created_at', ] update_period = 90 class DustSensor(Sensor): type_ = 'dust' update_period = 90 def transform(self, data): for key, value in data.items(): # what happens if you do this to a timestamp? try: data[key] = float(round(value, 1)) except TypeError: pass class AirSensor(Sensor): type_ = 'air' update_period = 15 def transform(self, data): for key, value in data.items(): # what happens if you do this to a timestamp? try: data[key] = float(round(value, 1)) except TypeError: pass class SleepSensor(Sensor): type_ = 'sleep' update_period = 90 def transform(self, data): for key, value in data.items(): # what happens if you do this to a timestamp? try: data[key] = float(round(value, 1)) except TypeError: pass class SolarSensor(Sensor): type_ = 'solar' class Acurite606TX(Sensor): type_ = 'temperature' bad_keys = [ 'model', 'mic', 'battery_ok', ] update_period = 40 offset = 0.0 def __init__(self, id_, name, offset=0.0): self.id_ = id_ self.name = name self.offset = offset def transform(self, data): if data['battery_ok'] != 1: logging.error('%s battery not ok!', self) data['temperature_C'] = float(data['temperature_C']) + self.offset class AcuRite6002RM(Sensor): type_ = 'temperature' bad_keys = [ 'model', 'mic', 'battery_ok', 'channel', ] update_period = 40 offset = 0.0 def __init__(self, id_, name, offset=0.0): self.id_ = id_ self.name = name self.offset = offset def transform(self, data): if data['battery_ok'] != 1: logging.error('%s battery not ok!', self) data['temperature_C'] = float(data['temperature_C']) + self.offset data['humidity'] = float(data['humidity']) async def poll_sensors(): while True: for sensor in sensors: await sensor.poll() sensor.check_update() await asyncio.sleep(1) async def process_data(data): sensor = sensors.get(data['id']) if sensor: sensor.update(data) async def process_mqtt(message): try: text = message.payload.decode() except UnicodeDecodeError: return topic = message.topic.value logging.debug('MQTT topic: %s, message: %s', topic, text) if topic.startswith('test'): logging.info('MQTT test, message: %s', text) return try: data = json.loads(text) except json.JSONDecodeError: return if type(data) != dict or 'id' not in data: return await process_data(data) async def fetch_mqtt(): await asyncio.sleep(3) # from https://sbtinstruments.github.io/aiomqtt/reconnection.html # modified to make new client since their code didn't work # https://github.com/sbtinstruments/aiomqtt/issues/269 while True: try: async with aiomqtt.Client('localhost') as client: await client.subscribe('#') async for message in client.messages: await process_mqtt(message) except aiomqtt.MqttError: logging.info('MQTT connection lost, reconnecting in 5 seconds...') await asyncio.sleep(5) async def owntracks(request): data = await request.json() logging.debug('Web data: %s', str(data)) if data.get('_type', '') == 'location': data['id'] = data['topic'].split('/')[-1] data['timestamp'] = datetime.utcfromtimestamp(data['tst']) if 'inregions' in data: data['inregions'] = ','.join(data['inregions']) await process_data(data) else: logging.info('Not a location, skipping.') return web.Response() async def history(request): api_key = request.rel_url.query.get('api_key', '') authed = api_key == settings.SENSORS_API_KEY measurement = request.match_info.get('measurement') name = request.match_info.get('name') if not authed and measurement in ['owntracks', 'sleep']: return web.json_response([]) if name not in [x.name for x in sensors.sensors]: raise end_unix = request.rel_url.query.get('end', None) if end_unix: end_unix = int(end_unix) end = datetime.fromtimestamp(end_unix) else: end = datetime.now(tz=pytz.UTC) duration = request.rel_url.query.get('duration', 'day') if duration == 'today': now_tz = datetime.now(tz=TIMEZONE) start = now_tz.replace(hour=0, minute=0, second=0, microsecond=0) window = '10m' elif duration == 'day': start = end - timedelta(days=1) window = '10m' elif duration == 'week': start = end - timedelta(days=7) window = '1h' elif duration == 'month': start = end - timedelta(days=30) window = '1d' elif duration == 'year': start = end - timedelta(days=365) window = '1d' else: raise window = request.rel_url.query.get('window', window) if window not in ['1m', '3m', '10m', '1h', '2h', '1d', '7d', '30d']: raise if name == 'Water': scale = 10 elif name == 'Gas': scale = 0.001055*1000 else: scale = 1 start = int(start.timestamp()) end = int(end.timestamp()) if measurement == 'temperature': client = sensors_client q = 'select mean("temperature_C") as temperature_C, mean("humidity") as humidity from temperature where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'ertscm': client = sensors_client q = 'select derivative(max("consumption_data"))*{} as delta, max("consumption_data")*{} as max from ertscm where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(scale, scale, name, start, end, window) elif measurement == 'thermostat': client = sensors_client q = 'select first("spacetemp") as spacetemp, first("heattemp") as heattemp, mode("state") as state from thermostat where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window) elif measurement == 'dust': client = sensors_client q = 'select max("avg_p10") as max_p10, max("avg_p25") as max_p25 from dust where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'air': client = sensors_client q = 'select max("pm10") as max_p10, max("pm25") as max_p25, max("co2") as max_co2, max("voc_idx") as max_voc from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'lux': client = sensors_client q = 'select mean("lux") as lux from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'sleep': client = sensors_client q = 'select max("max_mag") as max_mag from sleep where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window) elif measurement == 'solar': client = solar_client q = 'select max("actual_total") as actual_total, last("lifetime_energy")-first("lifetime_energy") as lifetime_energy from ecu where time >= {}s and time < {}s group by time({}) fill(linear)'.format(start, end, window) elif measurement == 'owntracks': client = sensors_client q = 'select first("lat") as lat, first("lon") as lon from owntracks where "acc" < 100 and "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window) else: raise q += ' tz(\'America/Edmonton\')' result = list(client.query(q).get_points()) return web.json_response(result) async def latest(request): result = dict() api_key = request.rel_url.query.get('api_key', '') authed = api_key == settings.SENSORS_API_KEY for sensor in sensors: if sensor.type_ in ['solar']: continue if not authed and sensor.type_ in ['owntracks', 'sleep']: continue q = 'select * from {} where "name" = \'{}\' order by desc limit 1'.format(sensor.type_, sensor.name) points = sensors_client.query(q).get_points() point = list(points) if sensor.type_ not in result: result[sensor.type_] = dict() result[sensor.type_][sensor.name] = point return web.json_response(result) async def index(request): return web.Response(text='sensors api', content_type='text/html') async def run_webserver(): #web.run_app(app, port=PORT, loop=loop) logging.info('Starting webserver on port: %s', PORT) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', PORT) await site.start() while True: await asyncio.sleep(10) def task_died(future): if os.environ.get('SHELL'): logging.error('Sensors server task died!') else: logging.error('Sensors server task died! Waiting 60s and exiting...') try: controller_message('Sensors server task died! Waiting 60s and exiting...') except: # we want this to succeed no matter what pass time.sleep(60) exit() if __name__ == '__main__': app.router.add_get('/', index) app.router.add_post('/owntracks', owntracks) app.router.add_get('/history/{measurement}/{name}', history) app.router.add_get('/latest', latest) sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar')) sensors.add(ERTSCMSensor('31005493', 'Water')) sensors.add(ERTSCMSensor('78628180', 'Gas')) sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks')) sensors.add(AirSensor('air1', 'Living Room')) sensors.add(Acurite606TX('59', 'Outside')) sensors.add(AcuRite6002RM('999999', 'Seeds', 0.0)) # A sensors.add(AcuRite6002RM('5613', 'Misc', 0.0)) # A sensors.add(AcuRite6002RM('5109', 'Nook', 0.4)) # B sensors.add(AcuRite6002RM('11087', 'Bedroom', -0.3)) # C sensors.add(SleepSensor('sleep1', 'Bedroom')) sensors.add(SolarSensor('solar', 'Solar')) loop = asyncio.get_event_loop() a = loop.create_task(poll_sensors()).add_done_callback(task_died) b = loop.create_task(fetch_mqtt()).add_done_callback(task_died) c = loop.create_task(run_webserver()).add_done_callback(task_died) loop.run_forever()