import os import logging logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s', level=logging.DEBUG if os.environ.get('DEBUG') else logging.INFO) logging.getLogger('aiohttp').setLevel(logging.DEBUG if os.environ.get('DEBUG') else logging.WARNING) import asyncio from aiohttp import web from datetime import datetime, timedelta, timezone import pytz from APSystemsECUR import APSystemsECUR ECU_IP = '192.168.69.153' LISTEN_IP = '192.168.69.106' ecu = APSystemsECUR(ECU_IP) app = web.Application() prev_ecu_timestamp = None solar_data = {} from influxdb import InfluxDBClient client = InfluxDBClient('localhost', 8086, database='solar2') async def proxy(reader, writer): message = await reader.read(1024) addr = writer.get_extra_info('peername') try: logging.debug('Recvd from {}: {}'.format(addr[0], message)) offset = 32 date_time_obj = datetime.strptime(str(message)[offset:offset+14], '%Y%m%d%H%M%S') + timedelta(minutes=-5) send_str = '101' + datetime.strftime(date_time_obj, '%Y%m%d%H%M%S') send_data = send_str.encode() logging.debug('Sending to {}: {}'.format(addr[0], send_data)) writer.write(send_data) await writer.drain() except ValueError: logging.debug('Ignored unnecessary data') writer.close() async def run_proxies(): for port in [8995, 8996, 8997]: server = await asyncio.start_server(proxy, LISTEN_IP, port) logging.info('Started TCP listener server on %s:%s', LISTEN_IP, port) task = asyncio.create_task(server.serve_forever()) async def get_data(): global prev_ecu_timestamp global solar_data await asyncio.sleep(1) while True: try: logging.debug('Grabbing ECU data...') data = ecu.query_ecu() logging.debug('Good read, timestamp: %s Mountain Time, current power: %s', data['timestamp'], data['current_power']) if data['timestamp'] != prev_ecu_timestamp: total = 0 utctime = datetime.now(timezone.utc) for i in data['inverters'].values(): total += i['power'][0] total += i['power'][1] data['actual_total'] = total solar_data = data logging.info('Solar data updated, ecu time: %s Mountain, ecu total: %s, actual total: %s', data['timestamp'], data['current_power'], total) points = [] for i in data['inverters'].values(): points.append({ 'time': utctime, 'measurement': 'inverter', 'tags': {'ecu': data['ecu_id'], 'inverter': i['uid']}, 'fields': { 'ecu_time': data['timestamp'], 'online': i['online'], 'frequency': i['frequency'], 'temperature': i['temperature'], } }) points.append({ 'time': utctime, 'measurement': 'panel', 'tags': {'ecu': data['ecu_id'], 'inverter': i['uid'], 'channel': '0'}, 'fields': { 'ecu_time': data['timestamp'], 'power': i['power'][0], 'voltage': i['voltage'][0] } }) points.append({ 'time': utctime, 'measurement': 'panel', 'tags': {'ecu': data['ecu_id'], 'inverter': i['uid'], 'channel': '1'}, 'fields': { 'ecu_time': data['timestamp'], 'power': i['power'][1], 'voltage': i['voltage'][1] } }) points.append({ 'time': utctime, 'measurement': 'ecu', 'tags': {'ecu': data['ecu_id']}, 'fields': { 'ecu_total': data['current_power'], 'ecu_time': data['timestamp'], 'actual_total': data['actual_total'], 'today_energy': data['today_energy'], 'lifetime_energy': data['lifetime_energy'], } }) client.write_points(points) logging.info('Wrote %s points to InfluxDB', len(points)) prev_ecu_timestamp = data['timestamp'] except Exception as err: logging.error('Error: ' + str(err)) await asyncio.sleep(120) async def index(request): return web.Response(text='hello world', content_type='text/html') async def data(request): return web.json_response(solar_data) async def display(request): res = dict(power=solar_data['actual_total'], brightness=5) return web.json_response(res) async def history(request): try: date = datetime.strptime(request.match_info['date'], '%Y-%m-%d') tz = pytz.timezone('America/Edmonton') date = tz.localize(date) except ValueError: raise web.HTTPNotFound start = int(date.timestamp()) end = date + timedelta(days=1) end = int(end.timestamp()) q = 'select * from ecu where time >= {}s and time < {}s'.format(start, end) history = list(client.query(q).get_points()) return web.json_response(history) if __name__ == '__main__': app.router.add_get('/', index) app.router.add_get('/data', data) app.router.add_get('/display', display) app.router.add_get('/history/{date}', history) loop = asyncio.get_event_loop() a = loop.create_task(run_proxies()) b = loop.create_task(get_data()) web.run_app(app, port=6901)