You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

170 lines
5.9 KiB

import os
import logging
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s',
level=logging.DEBUG if os.environ.get('DEBUG') else logging.INFO)
logging.getLogger('aiohttp').setLevel(logging.DEBUG if os.environ.get('DEBUG') else logging.WARNING)
import asyncio
from aiohttp import web
from datetime import datetime, timedelta, timezone
import pytz
from APSystemsECUR import APSystemsECUR
ECU_IP = '192.168.69.153'
LISTEN_IP = '192.168.69.106'
ecu = APSystemsECUR(ECU_IP)
app = web.Application()
prev_ecu_timestamp = None
solar_data = {}
from influxdb import InfluxDBClient
client = InfluxDBClient('localhost', 8086, database='solar2')
async def proxy(reader, writer):
message = await reader.read(1024)
addr = writer.get_extra_info('peername')
try:
logging.debug('Recvd from {}: {}'.format(addr[0], message))
offset = 32
date_time_obj = datetime.strptime(str(message)[offset:offset+14], '%Y%m%d%H%M%S') + timedelta(minutes=-5)
send_str = '101' + datetime.strftime(date_time_obj, '%Y%m%d%H%M%S')
send_data = send_str.encode()
logging.debug('Sending to {}: {}'.format(addr[0], send_data))
writer.write(send_data)
await writer.drain()
except ValueError:
logging.debug('Ignored unnecessary data')
writer.close()
async def run_proxies():
for port in [8995, 8996, 8997]:
server = await asyncio.start_server(proxy, LISTEN_IP, port)
logging.info('Started TCP listener server on %s:%s', LISTEN_IP, port)
task = asyncio.create_task(server.serve_forever())
async def get_data():
global prev_ecu_timestamp
global solar_data
await asyncio.sleep(1)
while True:
try:
logging.debug('Grabbing ECU data...')
data = ecu.query_ecu()
logging.debug('Good read, timestamp: %s Mountain Time, current power: %s', data['timestamp'], data['current_power'])
if data['timestamp'] != prev_ecu_timestamp:
total = 0
utctime = datetime.now(timezone.utc)
for i in data['inverters'].values():
total += i['power'][0]
total += i['power'][1]
data['actual_total'] = total
solar_data = data
logging.info('Solar data updated, ecu time: %s Mountain, ecu total: %s, actual total: %s', data['timestamp'], data['current_power'], total)
points = []
for i in data['inverters'].values():
points.append({
'time': utctime,
'measurement': 'inverter',
'tags': {'ecu': data['ecu_id'], 'inverter': i['uid']},
'fields': {
'ecu_time': data['timestamp'],
'online': i['online'],
'frequency': i['frequency'],
'temperature': i['temperature'],
}
})
points.append({
'time': utctime,
'measurement': 'panel',
'tags': {'ecu': data['ecu_id'], 'inverter': i['uid'], 'channel': '0'},
'fields': {
'ecu_time': data['timestamp'],
'power': i['power'][0],
'voltage': i['voltage'][0]
}
})
points.append({
'time': utctime,
'measurement': 'panel',
'tags': {'ecu': data['ecu_id'], 'inverter': i['uid'], 'channel': '1'},
'fields': {
'ecu_time': data['timestamp'],
'power': i['power'][1],
'voltage': i['voltage'][1]
}
})
points.append({
'time': utctime,
'measurement': 'ecu',
'tags': {'ecu': data['ecu_id']},
'fields': {
'ecu_total': data['current_power'],
'ecu_time': data['timestamp'],
'actual_total': data['actual_total'],
'today_energy': data['today_energy'],
'lifetime_energy': data['lifetime_energy'],
}
})
client.write_points(points)
logging.info('Wrote %s points to InfluxDB', len(points))
prev_ecu_timestamp = data['timestamp']
except Exception as err:
logging.error('Error: ' + str(err))
await asyncio.sleep(120)
async def index(request):
return web.Response(text='hello world', content_type='text/html')
async def data(request):
return web.json_response(solar_data)
async def display(request):
res = dict(power=solar_data['actual_total'], brightness=5)
return web.json_response(res)
async def history(request):
try:
date = datetime.strptime(request.match_info['date'], '%Y-%m-%d')
tz = pytz.timezone('America/Edmonton')
date = tz.localize(date)
except ValueError:
raise web.HTTPNotFound
start = int(date.timestamp())
end = date + timedelta(days=1)
end = int(end.timestamp())
q = 'select * from ecu where time >= {}s and time < {}s'.format(start, end)
history = list(client.query(q).get_points())
return web.json_response(history)
if __name__ == '__main__':
app.router.add_get('/', index)
app.router.add_get('/data', data)
app.router.add_get('/display', display)
app.router.add_get('/history/{date}', history)
loop = asyncio.get_event_loop()
a = loop.create_task(run_proxies())
b = loop.create_task(get_data())
web.run_app(app, port=6901)