You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

511 lines
15 KiB

import os
DEBUG = os.environ.get('DEBUG')
PROD = os.environ.get('PROD')
import logging
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING)
logging.info('')
logging.info('BOOT UP')
import settings
import asyncio
import json
import time
import requests
from aiohttp import web, ClientSession, ClientError
import aiomqtt
from datetime import datetime, timedelta
import pytz
TIMEZONE = pytz.timezone('America/Edmonton')
app = web.Application()
http_session = None
from influxdb import InfluxDBClient
sensors_client = InfluxDBClient('localhost', 8086, database='sensors1' if PROD else 'sensors1dev')
solar_client = InfluxDBClient('localhost', 8086, database='solar2')
PORT = 6903 if PROD else 6904
def controller_message(message):
payload = dict(home=message)
r = requests.post('https://tbot.tannercollin.com/message', data=payload, timeout=10)
if r.status_code == 200:
return True
else:
logging.exception('Unable to communicate with controller! Message: ' + message)
return False
class Sensors():
sensors = []
def add(self, sensor):
self.sensors.append(sensor)
def get(self, id_):
for sensor in self.sensors:
if str(sensor.id_) == str(id_):
return sensor
return False
def __iter__(self):
for sensor in self.sensors:
yield sensor
sensors = Sensors()
async def getter(url):
global http_session
if not http_session:
http_session = ClientSession()
try:
async with http_session.get(url, timeout=10) as response:
response.raise_for_status()
result = await response.json(content_type=None, encoding='UTF-8')
return result
except BaseException as e:
logging.error(e)
return False
class Sensor():
value = {}
prev_value = {}
bad_keys = []
last_update = time.time()
update_period = None
def __init__(self, id_, name):
self.id_ = id_
self.name = name
def __str__(self):
return '{} {} (ID: {})'.format(self.type_, self.name, self.id_)
def transform(self, data):
return
def changed(self):
before = self.prev_value.copy()
for key in self.bad_keys:
before.pop(key, None)
after = self.value.copy()
for key in self.bad_keys:
after.pop(key, None)
return str(before) != str(after)
def log(self):
if not self.value or not self.changed():
return
data = self.value.copy()
self.transform(data)
for key in self.bad_keys:
data.pop(key, None)
for key in ['id', 'time']:
data.pop(key, None)
timestamp = data.pop('timestamp', None)
if not timestamp:
timestamp = datetime.utcnow().replace(microsecond=0)
point = {
'time': timestamp,
'measurement': self.type_,
'tags': {'id': self.id_, 'name': self.name},
'fields': data,
}
try:
sensors_client.write_points([point])
except requests.exceptions.ConnectionError:
logging.exception('Error connecting to InfluxDB!')
return
logging.info('Wrote %s data to InfluxDB: %s', self, data)
def check_update(self):
if self.update_period:
if time.time() - self.last_update > self.update_period:
logging.warning('Missed expected update from %s.', self)
self.last_update = time.time()
def update(self, data):
self.last_update = time.time()
self.prev_value = self.value
self.value = data
self.log()
async def poll(self):
return
class ThermostatSensor(Sensor):
type_ = 'thermostat'
bad_keys = [
'name',
'schedule',
'schedulepart',
'away',
'cooltempmin',
'cooltempmax',
'heattempmin',
'heattempmax',
'dehum_setpoint'
]
update_period = 300
def __init__(self, id_, ip, name):
self.id_ = id_
self.ip = ip
self.name = name
async def poll(self):
data = await getter('http://{}/query/info'.format(self.ip)) or {}
self.update(data)
class ERTSCMSensor(Sensor):
type_ = 'ertscm'
bad_keys = [
'model',
'mic',
]
update_period = 60*60
def transform(self, data):
# new gas meter
if 'Consumption' in data:
data['consumption_data'] = data['Consumption']
class OwnTracksSensor(Sensor):
type_ = 'owntracks'
bad_keys = [
'_type',
'topic',
'tst',
'created_at',
]
update_period = 90
class DustSensor(Sensor):
type_ = 'dust'
update_period = 90
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class AirSensor(Sensor):
type_ = 'air'
update_period = 15
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class SleepSensor(Sensor):
type_ = 'sleep'
update_period = 90
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class SolarSensor(Sensor):
type_ = 'solar'
class Acurite606TX(Sensor):
type_ = 'temperature'
bad_keys = [
'model',
'mic',
'battery_ok',
]
update_period = 40
offset = 0.0
def __init__(self, id_, name, offset=0.0):
self.id_ = id_
self.name = name
self.offset = offset
def transform(self, data):
if data['battery_ok'] != 1:
logging.error('%s battery not ok!', self)
data['temperature_C'] = float(data['temperature_C']) + self.offset
class AcuRite6002RM(Sensor):
type_ = 'temperature'
bad_keys = [
'model',
'mic',
'battery_ok',
'channel',
]
update_period = 40
offset = 0.0
def __init__(self, id_, name, offset=0.0):
self.id_ = id_
self.name = name
self.offset = offset
def transform(self, data):
if data['battery_ok'] != 1:
logging.error('%s battery not ok!', self)
data['temperature_C'] = float(data['temperature_C']) + self.offset
data['humidity'] = float(data['humidity'])
async def poll_sensors():
while True:
for sensor in sensors:
await sensor.poll()
sensor.check_update()
await asyncio.sleep(1)
async def process_data(data):
sensor = sensors.get(data['id'])
if sensor:
sensor.update(data)
async def process_mqtt(message):
try:
text = message.payload.decode()
except UnicodeDecodeError:
return
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if topic.startswith('test'):
logging.info('MQTT test, message: %s', text)
return
try:
data = json.loads(text)
except json.JSONDecodeError:
return
if type(data) != dict or 'id' not in data:
return
await process_data(data)
async def fetch_mqtt():
await asyncio.sleep(3)
# from https://sbtinstruments.github.io/aiomqtt/reconnection.html
# modified to make new client since their code didn't work
# https://github.com/sbtinstruments/aiomqtt/issues/269
while True:
try:
async with aiomqtt.Client('localhost') as client:
await client.subscribe('#')
async for message in client.messages:
await process_mqtt(message)
except aiomqtt.MqttError:
logging.info('MQTT connection lost, reconnecting in 5 seconds...')
await asyncio.sleep(5)
async def owntracks(request):
data = await request.json()
logging.debug('Web data: %s', str(data))
if data.get('_type', '') == 'location':
data['id'] = data['topic'].split('/')[-1]
data['timestamp'] = datetime.utcfromtimestamp(data['tst'])
if 'inregions' in data:
data['inregions'] = ','.join(data['inregions'])
await process_data(data)
else:
logging.info('Not a location, skipping.')
return web.Response()
async def history(request):
api_key = request.rel_url.query.get('api_key', '')
authed = api_key == settings.SENSORS_API_KEY
measurement = request.match_info.get('measurement')
name = request.match_info.get('name')
if not authed and measurement in ['owntracks', 'sleep']:
return web.json_response([])
if name not in [x.name for x in sensors.sensors]:
raise
end_unix = request.rel_url.query.get('end', None)
if end_unix:
end_unix = int(end_unix)
end = datetime.fromtimestamp(end_unix)
else:
end = datetime.now(tz=pytz.UTC)
duration = request.rel_url.query.get('duration', 'day')
if duration == 'today':
now_tz = datetime.now(tz=TIMEZONE)
start = now_tz.replace(hour=0, minute=0, second=0, microsecond=0)
window = '10m'
elif duration == 'day':
start = end - timedelta(days=1)
window = '10m'
elif duration == 'week':
start = end - timedelta(days=7)
window = '1h'
elif duration == 'month':
start = end - timedelta(days=30)
window = '1d'
elif duration == 'year':
start = end - timedelta(days=365)
window = '1d'
else:
raise
window = request.rel_url.query.get('window', window)
if window not in ['1m', '3m', '10m', '1h', '2h', '1d', '7d', '30d']:
raise
if name == 'Water':
scale = 10
elif name == 'Gas':
scale = 0.001055*1000
else:
scale = 1
start = int(start.timestamp())
end = int(end.timestamp())
if measurement == 'temperature':
client = sensors_client
q = 'select mean("temperature_C") as temperature_C, mean("humidity") as humidity from temperature where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'ertscm':
client = sensors_client
q = 'select derivative(max("consumption_data"))*{} as delta, max("consumption_data")*{} as max from ertscm where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(scale, scale, name, start, end, window)
elif measurement == 'thermostat':
client = sensors_client
q = 'select first("spacetemp") as spacetemp, first("heattemp") as heattemp, mode("state") as state from thermostat where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window)
elif measurement == 'dust':
client = sensors_client
q = 'select max("avg_p10") as max_p10, max("avg_p25") as max_p25 from dust where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'air':
client = sensors_client
q = 'select max("pm10") as max_p10, max("pm25") as max_p25, max("co2") as max_co2, max("voc_idx") as max_voc from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'lux':
client = sensors_client
q = 'select mean("lux") as lux from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'sleep':
client = sensors_client
q = 'select max("max_mag") as max_mag from sleep where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'solar':
client = solar_client
q = 'select max("actual_total") as actual_total, last("lifetime_energy")-first("lifetime_energy") as lifetime_energy from ecu where time >= {}s and time < {}s group by time({}) fill(linear)'.format(start, end, window)
elif measurement == 'owntracks':
client = sensors_client
q = 'select first("lat") as lat, first("lon") as lon from owntracks where "acc" < 100 and "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window)
else:
raise
q += ' tz(\'America/Edmonton\')'
result = list(client.query(q).get_points())
return web.json_response(result)
async def latest(request):
result = dict()
api_key = request.rel_url.query.get('api_key', '')
authed = api_key == settings.SENSORS_API_KEY
for sensor in sensors:
if sensor.type_ in ['solar']:
continue
if not authed and sensor.type_ in ['owntracks', 'sleep']:
continue
q = 'select * from {} where "name" = \'{}\' order by desc limit 1'.format(sensor.type_, sensor.name)
points = sensors_client.query(q).get_points()
point = list(points)
if sensor.type_ not in result:
result[sensor.type_] = dict()
result[sensor.type_][sensor.name] = point
return web.json_response(result)
async def index(request):
return web.Response(text='sensors api', content_type='text/html')
async def run_webserver():
#web.run_app(app, port=PORT, loop=loop)
logging.info('Starting webserver on port: %s', PORT)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', PORT)
await site.start()
while True: await asyncio.sleep(10)
def task_died(future):
if os.environ.get('SHELL'):
logging.error('Sensors server task died!')
else:
logging.error('Sensors server task died! Waiting 60s and exiting...')
try:
controller_message('Sensors server task died! Waiting 60s and exiting...')
except: # we want this to succeed no matter what
pass
time.sleep(60)
exit()
if __name__ == '__main__':
app.router.add_get('/', index)
app.router.add_post('/owntracks', owntracks)
app.router.add_get('/history/{measurement}/{name}', history)
app.router.add_get('/latest', latest)
sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar'))
sensors.add(ERTSCMSensor('31005493', 'Water'))
sensors.add(ERTSCMSensor('78628180', 'Gas'))
sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks'))
sensors.add(AirSensor('air1', 'Living Room'))
sensors.add(Acurite606TX('59', 'Outside'))
sensors.add(AcuRite6002RM('999999', 'Seeds', 0.0)) # A
sensors.add(AcuRite6002RM('5613', 'Misc', 0.0)) # A
sensors.add(AcuRite6002RM('5109', 'Nook', 0.4)) # B
sensors.add(AcuRite6002RM('11087', 'Bedroom', -0.3)) # C
sensors.add(SleepSensor('sleep1', 'Bedroom'))
sensors.add(SolarSensor('solar', 'Solar'))
loop = asyncio.get_event_loop()
a = loop.create_task(poll_sensors()).add_done_callback(task_died)
b = loop.create_task(fetch_mqtt()).add_done_callback(task_died)
c = loop.create_task(run_webserver()).add_done_callback(task_died)
loop.run_forever()