Files
sensors/main.py
2026-04-16 22:46:38 +00:00

717 lines
22 KiB
Python

import os
DEBUG = os.environ.get('DEBUG')
PROD = os.environ.get('PROD')
import logging
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING)
logging.info('')
logging.info('BOOT UP')
import settings
import asyncio
import json
import time
import requests
from aiohttp import web, ClientSession, ClientError
import aiomqtt
from datetime import datetime, timedelta
import pytz
TIMEZONE = pytz.timezone('America/Edmonton')
import hashlib
app = web.Application()
http_session = None
from influxdb import InfluxDBClient
sensors_client = InfluxDBClient('localhost', 8086, database='sensors1' if PROD else 'sensors1dev')
solar_client = InfluxDBClient('localhost', 8086, database='solar2')
PORT = 6903 if PROD else 6904
def controller_message(message):
logging.info('Sending controller message: %s', message)
payload = dict(home=message)
r = requests.post('https://tbot.tannercollin.com/message', data=payload, timeout=10)
if r.status_code == 200:
return True
else:
logging.exception('Unable to communicate with controller! Message: ' + message)
return False
class Sensors():
sensors = []
def add(self, sensor):
self.sensors.append(sensor)
def get(self, id_):
for sensor in self.sensors:
if str(sensor.id_) == str(id_):
return sensor
return False
def __iter__(self):
for sensor in self.sensors:
yield sensor
sensors = Sensors()
async def getter(url):
global http_session
if not http_session:
http_session = ClientSession()
try:
async with http_session.get(url, timeout=10) as response:
response.raise_for_status()
result = await response.json(content_type=None, encoding='UTF-8')
return result
except BaseException as e:
logging.error(e)
return False
class Sensor():
value = {}
prev_value = {}
bad_keys = []
last_update = None
update_period = None
skip_if_hasnt_changed = False
skip_cooldown = 1.0
def __init__(self, id_, name):
self.id_ = id_
self.name = name
def __str__(self):
return '{} {} (ID: {})'.format(self.type_, self.name, self.id_)
def transform(self, data):
return
def changed(self):
before = self.prev_value.copy()
for key in self.bad_keys:
before.pop(key, None)
after = self.value.copy()
for key in self.bad_keys:
after.pop(key, None)
return str(before) != str(after)
def check_cooldown(self):
if self.last_update and self.skip_cooldown and time.time() - self.last_update < self.skip_cooldown:
# ignore data point
return True
else:
return False
def log(self):
if not self.value:
return
if not self.changed() and self.skip_if_hasnt_changed:
logging.debug('Skipping writing %s, data hasn\'t changed', self)
return
if self.check_cooldown():
logging.debug('Skipping writing %s, cooldown limit', self)
return
data = self.value.copy()
try:
self.transform(data)
except BaseException as e:
logging.exception('Problem transforming sensor data: {} - {}'.format(e.__class__.__name__, str(e)))
logging.error('Data: %s', str(data))
return
for key in self.bad_keys:
data.pop(key, None)
for key in ['id', 'time']:
data.pop(key, None)
timestamp = data.pop('timestamp', None)
if not timestamp:
timestamp = datetime.utcnow().replace(microsecond=0)
point = {
'time': timestamp,
'measurement': self.type_,
'tags': {'id': self.id_, 'name': self.name},
'fields': data,
}
try:
sensors_client.write_points([point])
except requests.exceptions.ConnectionError:
logging.exception('Error connecting to InfluxDB!')
return
logging.info('Wrote %s data to InfluxDB: %s', self, data)
def check_update(self):
if self.update_period and self.last_update:
if time.time() - self.last_update > self.update_period:
logging.warning('Missed expected update from %s.', self)
self.last_update = time.time()
def update(self, data):
self.prev_value = self.value
self.value = data
self.log()
self.last_update = time.time()
async def poll(self):
return
class ThermostatSensor(Sensor):
type_ = 'thermostat'
bad_keys = [
'name',
'schedule',
'schedulepart',
'away',
'cooltempmin',
'cooltempmax',
'heattempmin',
'heattempmax',
'dehum_setpoint'
]
update_period = 300
skip_if_hasnt_changed = True
def __init__(self, id_, ip, name):
self.id_ = id_
self.ip = ip
self.name = name
async def poll(self):
data = await getter('http://{}/query/info'.format(self.ip)) or {}
self.update(data)
class ERTSCMSensor(Sensor):
type_ = 'ertscm'
bad_keys = [
'model',
'mic',
]
update_period = 60*60
def transform(self, data):
# new gas meter
if 'Consumption' in data:
data['consumption_data'] = data['Consumption']
class OwnTracksSensor(Sensor):
type_ = 'owntracks'
bad_keys = [
'_type',
'topic',
'tst',
'created_at',
]
update_period = 90
skip_cooldown = False
class DustSensor(Sensor):
type_ = 'dust'
update_period = 90
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class AirSensor(Sensor):
type_ = 'air'
update_period = 15
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class SleepSensor(Sensor):
type_ = 'sleep'
update_period = 90
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class SoilSensor(Sensor):
type_ = 'soil'
update_period = 90
def transform(self, data):
for key, value in data.items():
# what happens if you do this to a timestamp?
try:
data[key] = float(round(value, 1))
except TypeError:
pass
class P2ProScaleSensor(Sensor):
type_ = 'scale'
skip_cooldown = 60.0 * 60 * 18 # 18 hours
def check_cooldown(self):
if 'weight' not in self.value:
return False
skip = super().check_cooldown()
if skip:
controller_message('Cooldown skipping scale weight: ' + str(self.value['weight']))
return skip
class SolarSensor(Sensor):
type_ = 'solar'
class Acurite606TX(Sensor):
type_ = 'temperature'
bad_keys = [
'model',
'mic',
'battery_ok',
]
update_period = 40
offset = 0.0
def __init__(self, id_, name, offset=0.0):
self.id_ = id_
self.name = name
self.offset = offset
def transform(self, data):
if data.get('battery_ok', None) != 1:
logging.error('%s battery not ok!', self)
data['temperature_C'] = float(data['temperature_C']) + self.offset
class AcuRite6002RM(Sensor):
type_ = 'temperature'
bad_keys = [
'model',
'mic',
'battery_ok',
'channel',
]
update_period = 40
offset = 0.0
def __init__(self, id_, name, temp_offset=0.0, hum_offset=0.0):
self.id_ = id_
self.name = name
self.temp_offset = temp_offset
self.hum_offset = hum_offset
def transform(self, data):
if data.get('battery_ok', None) != 1:
logging.error('%s battery not ok!', self)
data['temperature_C'] = float(data['temperature_C']) + self.temp_offset
data['humidity'] = float(data['humidity']) + self.hum_offset
class QotMotionSensor(Sensor):
type_ = 'qotmotion'
update_period = False
def transform(self, data):
split = data['data'].split(',')
data['battery'] = int(split[0])
data['boots'] = int(split[1])
data['motion'] = True # useful to distinguish if I eventually add a heartbeat
async def poll_sensors():
while True:
for sensor in sensors:
await sensor.poll()
sensor.check_update()
await asyncio.sleep(5)
async def process_data(data):
sensor = sensors.get(data['id'])
if sensor:
sensor.update(data)
async def process_mqtt(message):
try:
text = message.payload.decode()
except UnicodeDecodeError:
return
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if topic.startswith('test'):
logging.info('MQTT test, message: %s', text)
return
try:
data = json.loads(text)
except json.JSONDecodeError:
return
if type(data) != dict or 'id' not in data:
return
await process_data(data)
async def fetch_mqtt():
await asyncio.sleep(3)
# from https://sbtinstruments.github.io/aiomqtt/reconnection.html
# modified to make new client since their code didn't work
# https://github.com/sbtinstruments/aiomqtt/issues/269
while True:
try:
async with aiomqtt.Client('localhost') as client:
await client.subscribe('#')
async for message in client.messages:
await process_mqtt(message)
except aiomqtt.MqttError:
logging.info('MQTT connection lost, reconnecting in 5 seconds...')
await asyncio.sleep(5)
async def owntracks(request):
data = await request.json()
logging.debug('Web data: %s', str(data))
if data.get('_type', '') == 'location':
data['id'] = data['topic'].split('/')[-1]
data['timestamp'] = datetime.utcfromtimestamp(data['tst'])
if 'inregions' in data:
data['inregions'] = ','.join(data['inregions'])
await process_data(data)
else:
logging.info('Not a location, skipping.')
return web.Response()
def share_sha256(measurement, share_start, share_end, api_key):
s = f'{measurement}-{share_start}-{share_end}-{api_key}'
return hashlib.sha256(s.encode()).hexdigest()
async def history(request):
api_key = request.rel_url.query.get('api_key', '')
authed = api_key == settings.SENSORS_API_KEY
measurement = request.match_info.get('measurement')
name = request.match_info.get('name')
share_start = request.rel_url.query.get('shareStart', '')
share_end = request.rel_url.query.get('shareEnd', '')
share_sig = request.rel_url.query.get('shareSig', '')
share_authed = share_sig == share_sha256(measurement, share_start, share_end, settings.SENSORS_API_KEY)
authed = authed or share_authed
if not authed and measurement in ['owntracks', 'sleep']:
return web.json_response([])
if name not in [x.name for x in sensors.sensors]:
raise
end_unix = request.rel_url.query.get('end', None)
if end_unix:
end_unix = int(end_unix)
end = datetime.fromtimestamp(end_unix)
else:
end = datetime.now(tz=pytz.UTC)
duration = request.rel_url.query.get('duration', 'day')
if duration == 'today':
now_tz = datetime.now(tz=TIMEZONE)
start = now_tz.replace(hour=0, minute=0, second=0, microsecond=0)
window = '10m'
elif duration == 'day':
start = end - timedelta(days=1)
window = '10m'
elif duration == 'week':
start = end - timedelta(days=7)
window = '1h'
elif duration == 'month':
start = end - timedelta(days=30)
window = '1d'
elif duration == 'year':
start = end - timedelta(days=365)
window = '1d'
else:
raise
window = request.rel_url.query.get('window', window)
if window not in ['1m', '3m', '10m', '1h', '2h', '1d', '7d', '30d']:
raise
if name == 'Water':
scale = 10
elif name == 'Gas':
scale = 0.001055*1000
else:
scale = 1
start = int(start.timestamp())
end = int(end.timestamp())
if share_authed:
if start < int(share_start):
start = int(share_start)
if end > int(share_end):
end = int(share_end)
if measurement == 'temperature':
client = sensors_client
q = 'select mean("temperature_C") as temperature_C, mean("humidity") as humidity from temperature where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'ertscm':
client = sensors_client
q = 'select derivative(max("consumption_data"))*{} as delta, max("consumption_data")*{} as max from ertscm where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(scale, scale, name, start, end, window)
elif measurement == 'thermostat':
client = sensors_client
q = 'select first("spacetemp") as spacetemp, first("heattemp") as heattemp, mode("state") as state from thermostat where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window)
elif measurement == 'dust':
client = sensors_client
q = 'select max("avg_p10") as max_p10, max("avg_p25") as max_p25 from dust where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'air':
client = sensors_client
q = 'select max("pm10") as max_p10, max("pm25") as max_p25, max("co2") as max_co2, max("voc_idx") as max_voc from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'soil':
client = sensors_client
q = 'select mean("soil") as soil from soil where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'lux':
client = sensors_client
q = 'select mean("lux") as lux from air where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'sleep':
client = sensors_client
q = 'select max("max_mag") as max_mag from sleep where "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(linear)'.format(name, start, end, window)
elif measurement == 'solar':
client = solar_client
q = 'select max("actual_total") as actual_total, last("lifetime_energy")-first("lifetime_energy") as lifetime_energy from ecu where time >= {}s and time < {}s group by time({}) fill(linear)'.format(start, end, window)
elif measurement == 'owntracks':
client = sensors_client
q = 'select first("lat") as lat, first("lon") as lon from owntracks where "acc" < 100 and "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window)
else:
raise
q += ' tz(\'America/Edmonton\')'
result = list(client.query(q).get_points())
return web.json_response(result)
async def search(request):
api_key = request.rel_url.query.get('api_key', '')
authed = api_key == settings.SENSORS_API_KEY
measurement = request.match_info.get('measurement')
name = request.match_info.get('name')
if not authed:
return web.json_response([])
if name not in [x.name for x in sensors.sensors]:
raise
if measurement != 'owntracks':
return web.json_response({'error': 'not implemented for this measurement'}, status=400)
try:
post_data = await request.json()
except json.JSONDecodeError:
return web.json_response({'error': 'invalid json'}, status=400)
params = request.rel_url.query
logging.info('Search request: meas=%s, name=%s, params=%s, data=%s',
measurement, name, params, post_data)
areas = post_data.get('areas')
if not areas or not isinstance(areas, list):
return web.json_response({'error': 'invalid areas format'}, status=400)
try:
for area in areas:
_ = area['southWest']['lat']
_ = area['southWest']['lng']
_ = area['northEast']['lat']
_ = area['northEast']['lng']
except (KeyError, TypeError):
return web.json_response({'error': 'invalid area format in areas list'}, status=400)
client = sensors_client
where_clauses = []
for area in areas:
sw = area['southWest']
ne = area['northEast']
where_clauses.append(f'("lat" >= {sw["lat"]} and "lat" <= {ne["lat"]} and "lon" >= {sw["lng"]} and "lon" <= {ne["lng"]})')
full_where_clause = ' or '.join(where_clauses)
q = f'select "lat", "lon" from owntracks where "acc" < 100 and "name" = \'{name}\' and ({full_where_clause}) order by time asc'
points = list(client.query(q).get_points())
ranges = []
current_range = None
last_point_dt = None
# Use a 12-hour gap to distinguish between separate visits
GAP_THRESHOLD_HOURS = 12
for point in points:
point_time_str = point['time']
if '.' in point_time_str:
point_dt = datetime.strptime(point_time_str, '%Y-%m-%dT%H:%M:%S.%fZ')
else:
point_dt = datetime.strptime(point_time_str, '%Y-%m-%dT%H:%M:%SZ')
if current_range is None:
current_range = {'start': point_dt, 'end': point_dt}
else:
time_diff_hours = (point_dt - last_point_dt).total_seconds() / 3600
if time_diff_hours > GAP_THRESHOLD_HOURS:
ranges.append({
'start': int(current_range['start'].timestamp()),
'end': int(current_range['end'].timestamp())
})
current_range = {'start': point_dt, 'end': point_dt}
else:
current_range['end'] = point_dt
last_point_dt = point_dt
if current_range is not None:
ranges.append({
'start': int(current_range['start'].timestamp()),
'end': int(current_range['end'].timestamp())
})
return web.json_response(ranges)
async def options_handler(request):
return web.Response()
async def latest(request):
result = dict()
api_key = request.rel_url.query.get('api_key', '')
authed = api_key == settings.SENSORS_API_KEY
for sensor in sensors:
if sensor.type_ in ['solar']:
continue
if not authed and sensor.type_ in ['owntracks', 'sleep']:
continue
q = 'select * from {} where "name" = \'{}\' order by desc limit 1'.format(sensor.type_, sensor.name)
points = sensors_client.query(q).get_points()
point = list(points)
if sensor.type_ not in result:
result[sensor.type_] = dict()
result[sensor.type_][sensor.name] = point
return web.json_response(result)
async def index(request):
return web.Response(text='sensors api', content_type='text/html')
async def run_webserver():
#web.run_app(app, port=PORT, loop=loop)
logging.info('Starting webserver on port: %s', PORT)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', PORT)
await site.start()
while True: await asyncio.sleep(10)
def task_died(future):
if os.environ.get('SHELL'):
logging.error('Sensors server task died!')
else:
logging.error('Sensors server task died! Waiting 60s and exiting...')
try:
controller_message('Sensors server task died! Waiting 60s and exiting...')
except: # we want this to succeed no matter what
pass
time.sleep(60)
exit()
if __name__ == '__main__':
app.router.add_get('/', index)
app.router.add_post('/owntracks', owntracks)
app.router.add_get('/history/{measurement}/{name}', history)
app.router.add_post('/search/{measurement}/{name}', search)
app.router.add_route('OPTIONS', '/search/{measurement}/{name}', options_handler)
app.router.add_get('/latest', latest)
sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar'))
sensors.add(ERTSCMSensor('31005493', 'Water'))
sensors.add(ERTSCMSensor('78628180', 'Gas'))
sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks'))
sensors.add(AirSensor('air1', 'Living Room'))
sensors.add(AirSensor('air2', 'Bedroom'))
sensors.add(AirSensor('air3', 'Kitchen'))
sensors.add(Acurite606TX('185', 'Outside', 0.0))
sensors.add(AcuRite6002RM('999999', 'Seeds', 0.0)) # A
sensors.add(AcuRite6002RM('999998', 'Misc', 0.7, -1.0)) # A
sensors.add(AcuRite6002RM('12516', 'Basement', 0.7, -1.0)) # A
sensors.add(AcuRite6002RM('5109', 'Nook', 0.2, -1.0)) # B
sensors.add(AcuRite6002RM('11087', 'Bedroom', -0.7, 1.0)) # C
sensors.add(SleepSensor('sleep1', 'Bedroom'))
sensors.add(SolarSensor('solar', 'Solar'))
sensors.add(SoilSensor('soil1', 'Dumb Cane'))
sensors.add(SoilSensor('soil2', 'Kitchen Pothos'))
sensors.add(SoilSensor('soil3', 'Dracaena'))
sensors.add(P2ProScaleSensor('scale1', 'Master Bathroom'))
sensors.add(QotMotionSensor('qot_dc3c', 'Bedroom'))
sensors.add(QotMotionSensor('qot_88c3', 'Lower Stairs Hi'))
sensors.add(QotMotionSensor('qot_7c3c', 'Theatre'))
sensors.add(QotMotionSensor('qot_54e6', 'Lab'))
sensors.add(QotMotionSensor('qot_10f4', 'Office'))
sensors.add(QotMotionSensor('qot_74c3', 'Guest Bathroom'))
sensors.add(QotMotionSensor('qot_706f', 'Nook'))
sensors.add(QotMotionSensor('qot_8c1c', 'Kitchen S'))
sensors.add(QotMotionSensor('qot_a83b', 'Kitchen N'))
sensors.add(QotMotionSensor('qot_28c3', 'Side Entrance'))
loop = asyncio.get_event_loop()
a = loop.create_task(poll_sensors()).add_done_callback(task_died)
b = loop.create_task(fetch_mqtt()).add_done_callback(task_died)
c = loop.create_task(run_webserver()).add_done_callback(task_died)
loop.run_forever()