Compare commits
3 Commits
e38164fd43
...
34f0444de7
Author | SHA1 | Date | |
---|---|---|---|
34f0444de7 | |||
8abb15cdd3 | |||
1346171618 |
64
main.py
64
main.py
|
@ -19,7 +19,7 @@ import json
|
||||||
import time
|
import time
|
||||||
import requests
|
import requests
|
||||||
from aiohttp import web, ClientSession, ClientError
|
from aiohttp import web, ClientSession, ClientError
|
||||||
from asyncio_mqtt import Client
|
import aiomqtt
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
import pytz
|
import pytz
|
||||||
TIMEZONE = pytz.timezone('America/Edmonton')
|
TIMEZONE = pytz.timezone('America/Edmonton')
|
||||||
|
@ -123,7 +123,12 @@ class Sensor():
|
||||||
'tags': {'id': self.id_, 'name': self.name},
|
'tags': {'id': self.id_, 'name': self.name},
|
||||||
'fields': data,
|
'fields': data,
|
||||||
}
|
}
|
||||||
sensors_client.write_points([point])
|
|
||||||
|
try:
|
||||||
|
sensors_client.write_points([point])
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
logging.exception('Error connecting to InfluxDB!')
|
||||||
|
return
|
||||||
|
|
||||||
logging.info('Wrote %s data to InfluxDB: %s', self, data)
|
logging.info('Wrote %s data to InfluxDB: %s', self, data)
|
||||||
|
|
||||||
|
@ -174,6 +179,11 @@ class ERTSCMSensor(Sensor):
|
||||||
]
|
]
|
||||||
update_period = 60*60
|
update_period = 60*60
|
||||||
|
|
||||||
|
def transform(self, data):
|
||||||
|
# new gas meter
|
||||||
|
if 'Consumption' in data:
|
||||||
|
data['consumption_data'] = data['Consumption']
|
||||||
|
|
||||||
class OwnTracksSensor(Sensor):
|
class OwnTracksSensor(Sensor):
|
||||||
type_ = 'owntracks'
|
type_ = 'owntracks'
|
||||||
bad_keys = [
|
bad_keys = [
|
||||||
|
@ -280,8 +290,12 @@ async def process_data(data):
|
||||||
sensor.update(data)
|
sensor.update(data)
|
||||||
|
|
||||||
async def process_mqtt(message):
|
async def process_mqtt(message):
|
||||||
text = message.payload.decode()
|
try:
|
||||||
topic = message.topic
|
text = message.payload.decode()
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
return
|
||||||
|
|
||||||
|
topic = message.topic.value
|
||||||
logging.debug('MQTT topic: %s, message: %s', topic, text)
|
logging.debug('MQTT topic: %s, message: %s', topic, text)
|
||||||
|
|
||||||
if topic.startswith('test'):
|
if topic.startswith('test'):
|
||||||
|
@ -299,11 +313,21 @@ async def process_mqtt(message):
|
||||||
await process_data(data)
|
await process_data(data)
|
||||||
|
|
||||||
async def fetch_mqtt():
|
async def fetch_mqtt():
|
||||||
async with Client('localhost') as client:
|
await asyncio.sleep(3)
|
||||||
async with client.filtered_messages('#') as messages:
|
|
||||||
await client.subscribe('#')
|
# from https://sbtinstruments.github.io/aiomqtt/reconnection.html
|
||||||
async for message in messages:
|
# modified to make new client since their code didn't work
|
||||||
await process_mqtt(message)
|
# https://github.com/sbtinstruments/aiomqtt/issues/269
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
async with aiomqtt.Client('localhost') as client:
|
||||||
|
await client.subscribe('#')
|
||||||
|
async for message in client.messages:
|
||||||
|
await process_mqtt(message)
|
||||||
|
except aiomqtt.MqttError:
|
||||||
|
logging.info('MQTT connection lost, reconnecting in 5 seconds...')
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
async def owntracks(request):
|
async def owntracks(request):
|
||||||
data = await request.json()
|
data = await request.json()
|
||||||
|
@ -321,9 +345,15 @@ async def owntracks(request):
|
||||||
return web.Response()
|
return web.Response()
|
||||||
|
|
||||||
async def history(request):
|
async def history(request):
|
||||||
|
api_key = request.rel_url.query.get('api_key', '')
|
||||||
|
authed = api_key == settings.SENSORS_API_KEY
|
||||||
|
|
||||||
measurement = request.match_info.get('measurement')
|
measurement = request.match_info.get('measurement')
|
||||||
name = request.match_info.get('name')
|
name = request.match_info.get('name')
|
||||||
|
|
||||||
|
if not authed and measurement in ['owntracks', 'sleep']:
|
||||||
|
return web.json_response([])
|
||||||
|
|
||||||
if name not in [x.name for x in sensors.sensors]:
|
if name not in [x.name for x in sensors.sensors]:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -356,7 +386,7 @@ async def history(request):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
window = request.rel_url.query.get('window', window)
|
window = request.rel_url.query.get('window', window)
|
||||||
if window not in ['10m', '1h', '1d', '7d', '30d']:
|
if window not in ['1m', '3m', '10m', '1h', '2h', '1d', '7d', '30d']:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
if name == 'Water':
|
if name == 'Water':
|
||||||
|
@ -393,6 +423,9 @@ async def history(request):
|
||||||
elif measurement == 'solar':
|
elif measurement == 'solar':
|
||||||
client = solar_client
|
client = solar_client
|
||||||
q = 'select max("actual_total") as actual_total, last("lifetime_energy")-first("lifetime_energy") as lifetime_energy from ecu where time >= {}s and time < {}s group by time({}) fill(linear)'.format(start, end, window)
|
q = 'select max("actual_total") as actual_total, last("lifetime_energy")-first("lifetime_energy") as lifetime_energy from ecu where time >= {}s and time < {}s group by time({}) fill(linear)'.format(start, end, window)
|
||||||
|
elif measurement == 'owntracks':
|
||||||
|
client = sensors_client
|
||||||
|
q = 'select first("lat") as lat, first("lon") as lon from owntracks where "acc" < 100 and "name" = \'{}\' and time >= {}s and time < {}s group by time({}) fill(previous)'.format(name, start, end, window)
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -411,7 +444,7 @@ async def latest(request):
|
||||||
if sensor.type_ in ['solar']:
|
if sensor.type_ in ['solar']:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not authed and sensor.type_ in ['owntracks']:
|
if not authed and sensor.type_ in ['owntracks', 'sleep']:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
q = 'select * from {} where "name" = \'{}\' order by desc limit 1'.format(sensor.type_, sensor.name)
|
q = 'select * from {} where "name" = \'{}\' order by desc limit 1'.format(sensor.type_, sensor.name)
|
||||||
|
@ -426,7 +459,7 @@ async def latest(request):
|
||||||
return web.json_response(result)
|
return web.json_response(result)
|
||||||
|
|
||||||
async def index(request):
|
async def index(request):
|
||||||
return web.Response(text='hello world', content_type='text/html')
|
return web.Response(text='sensors api', content_type='text/html')
|
||||||
|
|
||||||
async def run_webserver():
|
async def run_webserver():
|
||||||
#web.run_app(app, port=PORT, loop=loop)
|
#web.run_app(app, port=PORT, loop=loop)
|
||||||
|
@ -444,7 +477,10 @@ def task_died(future):
|
||||||
logging.error('Sensors server task died!')
|
logging.error('Sensors server task died!')
|
||||||
else:
|
else:
|
||||||
logging.error('Sensors server task died! Waiting 60s and exiting...')
|
logging.error('Sensors server task died! Waiting 60s and exiting...')
|
||||||
controller_message('Sensors server task died! Waiting 60s and exiting...')
|
try:
|
||||||
|
controller_message('Sensors server task died! Waiting 60s and exiting...')
|
||||||
|
except: # we want this to succeed no matter what
|
||||||
|
pass
|
||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
exit()
|
exit()
|
||||||
|
|
||||||
|
@ -456,7 +492,7 @@ if __name__ == '__main__':
|
||||||
|
|
||||||
sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar'))
|
sensors.add(ThermostatSensor('thermostat2', '192.168.69.152', 'Venstar'))
|
||||||
sensors.add(ERTSCMSensor('31005493', 'Water'))
|
sensors.add(ERTSCMSensor('31005493', 'Water'))
|
||||||
sensors.add(ERTSCMSensor('41249312', 'Gas'))
|
sensors.add(ERTSCMSensor('78628180', 'Gas'))
|
||||||
sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks'))
|
sensors.add(OwnTracksSensor('owntracks1', 'OwnTracks'))
|
||||||
sensors.add(AirSensor('air1', 'Living Room'))
|
sensors.add(AirSensor('air1', 'Living Room'))
|
||||||
sensors.add(Acurite606TX('59', 'Outside'))
|
sensors.add(Acurite606TX('59', 'Outside'))
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
aiohttp==3.8.1
|
aiohttp==3.8.1
|
||||||
|
aiomqtt==2.0.0
|
||||||
aiosignal==1.2.0
|
aiosignal==1.2.0
|
||||||
async-timeout==4.0.2
|
async-timeout==4.0.2
|
||||||
asyncio-mqtt==0.11.0
|
asyncio-mqtt==0.11.0
|
||||||
|
@ -12,11 +13,10 @@ influxdb==5.3.1
|
||||||
msgpack==1.0.3
|
msgpack==1.0.3
|
||||||
multidict==5.2.0
|
multidict==5.2.0
|
||||||
paho-mqtt==1.6.1
|
paho-mqtt==1.6.1
|
||||||
pkg_resources==0.0.0
|
|
||||||
python-dateutil==2.8.2
|
python-dateutil==2.8.2
|
||||||
pytz==2021.3
|
pytz==2021.3
|
||||||
requests==2.26.0
|
requests==2.26.0
|
||||||
six==1.16.0
|
six==1.16.0
|
||||||
typing_extensions==4.0.1
|
typing-extensions==4.9.0
|
||||||
urllib3==1.26.7
|
urllib3==1.26.7
|
||||||
yarl==1.7.2
|
yarl==1.7.2
|
||||||
|
|
Loading…
Reference in New Issue
Block a user