import os DEBUG = os.environ.get('DEBUG') PROD = os.environ.get('PROD') import logging logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING) import settings import asyncio import json import time from aiohttp import web, ClientSession, ClientError from asyncio_mqtt import Client from datetime import datetime, timedelta import pytz TIMEZONE = pytz.timezone('America/Edmonton') app = web.Application() http_session = None from influxdb import InfluxDBClient client = InfluxDBClient('localhost', 8086, database='sensors1' if PROD else 'sensors1dev') PORT = 6903 if PROD else 6904 class Sensors(): sensors = [] def add(self, sensor): self.sensors.append(sensor) def get(self, id_): for sensor in self.sensors: if str(sensor.id_) == str(id_): return sensor return False def __iter__(self): for sensor in self.sensors: yield sensor sensors = Sensors() async def getter(url): global http_session if not http_session: http_session = ClientSession() try: async with http_session.get(url, timeout=10) as response: response.raise_for_status() result = await response.json(content_type=None, encoding='UTF-8') return result except BaseException as e: logging.error(e) return False class Sensor(): value = {} prev_value = {} bad_keys = [] last_update = time.time() update_period = None def __init__(self, id_, name): self.id_ = id_ self.name = name def __str__(self): return '{} {} (ID: {})'.format(self.type_, self.name, self.id_) def transform(self, data): return def changed(self): before = self.prev_value.copy() for key in self.bad_keys: before.pop(key, None) after = self.value.copy() for key in self.bad_keys: after.pop(key, None) return str(before) != str(after) def log(self): if not self.value or not self.changed(): return data = self.value.copy() self.transform(data) for key in self.bad_keys: data.pop(key, None) for key in ['id', 'time']: data.pop(key, None) timestamp = data.pop('timestamp', None) if not timestamp: timestamp = datetime.utcnow().replace(microsecond=0) point = { 'time': timestamp, 'measurement': self.type_, 'tags': {'id': self.id_, 'name': self.name}, 'fields': data, } client.write_points([point]) logging.info('Wrote %s data to InfluxDB.', self) def check_update(self): if self.update_period: if time.time() - self.last_update > self.update_period: logging.error('Missed expected update from %s.', self) self.last_update = time.time() def update(self, data): self.last_update = time.time() self.prev_value = self.value self.value = data self.log() async def poll(self): return class ThermostatSensor(Sensor): type_ = 'thermostat' bad_keys = [ 'name', 'schedule', 'schedulepart', 'away', 'cooltempmin', 'cooltempmax', 'heattempmin', 'heattempmax', 'dehum_setpoint' ] update_period = 300 def __init__(self, id_, ip, name): self.id_ = id_ self.ip = ip self.name = name async def poll(self): data = await getter('http://{}/query/info'.format(self.ip)) self.update(data) class ERTSCMSensor(Sensor): type_ = 'ertscm' bad_keys = [ 'model', 'mic', ] update_period = 60*60 class OwnTracksSensor(Sensor): type_ = 'owntracks' bad_keys = [ '_type', 'topic', 'tst', 'created_at', ] update_period = 90 class DustSensor(Sensor): type_ = 'dust' update_period = 90 def transform(self, data): for key, value in data.items(): # what happens if you do this to a timestamp? try: data[key] = float(round(value, 1)) except TypeError: pass class Acurite606TX(Sensor): type_ = 'temperature' bad_keys = [ 'model', 'mic', 'battery_ok', ] update_period = 40 def transform(self, data): if data['battery_ok'] != 1: logging.error('%s battery not ok!', self) data['temperature_C'] = float(data['temperature_C']) async def poll_sensors(): while True: for sensor in sensors: await sensor.poll() sensor.check_update() await asyncio.sleep(1) async def process_data(data): sensor = sensors.get(data['id']) if sensor: sensor.update(data) async def process_mqtt(message): text = message.payload.decode() topic = message.topic logging.debug('MQTT topic: %s, message: %s', topic, text) if topic == 'test': logging.info('MQTT test, message: %s', text) return try: data = json.loads(text) except json.JSONDecodeError: return if 'id' not in data: return await process_data(data) async def fetch_mqtt(): async with Client('localhost') as client: async with client.filtered_messages('#') as messages: await client.subscribe('#') async for message in messages: await process_mqtt(message) async def owntracks(request): data = await request.json() logging.info('Web data: %s', str(data)) if data.get('_type', '') == 'location': data['id'] = data['topic'].split('/')[-1] data['timestamp'] = datetime.utcfromtimestamp(data['tst']) if 'inregions' in data: data['inregions'] = ','.join(data['inregions']) await process_data(data) else: logging.info('Not a location, skipping.') return web.Response() async def history(request): measurement = request.match_info.get('measurement') name = request.match_info.get('name') end_unix = request.rel_url.query.get('end', None) if end_unix: end = datetime.fromtimestamp(end_unix) else: end = datetime.now(tz=pytz.UTC) duration = request.rel_url.query.get('duration', 'today') if duration == 'today': now_tz = datetime.now(tz=TIMEZONE) start = now_tz.replace(hour=0, minute=0, second=0, microsecond=0) window = '5m' elif duration == 'day': start = end - timedelta(days=1) window = '5m' elif duration == 'week': start = end - timedelta(days=7) window = '30m' elif duration == 'month': start = end - timedelta(days=30) window = '2h' elif duration == 'year': start = end - timedelta(days=365) window = '24h' start = int(start.timestamp()) end = int(end.timestamp()) if measurement == 'temperature': q = 'select mean("temperature_C") as temperature_C from temperature where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(none)'.format(name, start, end, window) elif measurement == 'ertscm': q = 'select max("consumption_data") as consumption_data from ertscm where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(none)'.format(name, start, end, window) elif measurement == 'dust': q = 'select max("max_p10") as max_p10, max("max_p25") as max_p25 from dust where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(none)'.format(name, start, end, window) #if window and moving_average: # q = 'select moving_average(mean("value"),{}) as value from {} where "name" = \'{}\' and time >= {}s and time < {}s group by time({}m) fill(none)'.format(moving_average, measurement, name, start, end, window) #elif window: # q = 'select mean("value") as value from {} where "name" = \'{}\' and time >= {}s and time < {}s group by time({}m) fill(none)'.format(measurement, name, start, end, window) #elif moving_average: # q = 'select moving_average("value", {}) as value from {} where "name" = \'{}\' and time >= {}s and time < {}s'.format(moving_average, name, start, end) #else: # q = 'select value from {} where "name" = \'{}\' and time >= {}s and time < {}s'.format(measurement, name, start, end) result = list(client.query(q).get_points()) return web.json_response(result) async def index(request): return web.Response(text='hello world', content_type='text/html') if __name__ == '__main__': app.router.add_get('/', index) app.router.add_post('/owntracks', owntracks) app.router.add_get('/history/{measurement}/{name}', history) sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar')) sensors.add(ERTSCMSensor('31005493', 'Water')) sensors.add(ERTSCMSensor('41249312', 'Gas')) sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks')) sensors.add(DustSensor('dust1', 'Nook')) sensors.add(Acurite606TX('231', 'Outside')) loop = asyncio.get_event_loop() loop.create_task(poll_sensors()) loop.create_task(fetch_mqtt()) web.run_app(app, port=PORT, loop=loop)