Files
bashregister/main.py
2026-03-28 17:41:33 -06:00

121 lines
3.3 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
#filename='protovax.log', encoding='utf-8',
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import io
import time
import requests
import asyncio
import aiomqtt
from escpos.printer import Usb
from PIL import Image, ImageEnhance
from aiomqtt import Client
from display import StarburstHT16K33
VENDOR_ID = 0x0416
PRODUCT_ID = 0x5011
PROD_STATIC_URL = 'https://static.my.protospace.ca/'
DEV_STATIC_URL = 'https://static.spaceport.dns.t0.vc/'
PRINTER_WIDTH = 384
def print_picture(topic, filename, p):
logging.info('New picture submitted: %s', filename)
if topic.startswith('dev_'):
url = DEV_STATIC_URL
else:
url = PROD_STATIC_URL
try:
#p.hw('INIT')
response = requests.get(url + filename, timeout=5)
response.raise_for_status()
logging.info('New image detected, printing...')
last_image_content = response.content
img = Image.open(io.BytesIO(response.content))
# Convert with dithering
img = img.convert('1', dither=Image.FLOYDSTEINBERG)
# Print image
p.image(img)
p.cut()
except requests.exceptions.RequestException as e:
logging.info(f'Error downloading image: {e}')
async def process_mqtt(message, p):
text = message.payload.decode()
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if 'spaceport/drawing/new' in topic:
print_picture(topic, text, p)
else:
logging.debug('Invalid topic, returning')
return
async def fetch_mqtt(p):
await asyncio.sleep(3)
while True:
try:
async with Client(
hostname='172.17.17.181',
port=1883,
) as client:
logging.info('MQTT client connected')
await client.subscribe('#')
async for message in client.messages:
loop = asyncio.get_event_loop()
loop.create_task(process_mqtt(message, p))
except aiomqtt.MqttError as e:
logging.warning('MQTT error: %s. Reconnecting in 5 seconds...', e)
await asyncio.sleep(5)
async def manage_display(disp):
logging.info('Starting display loop...')
while True:
await asyncio.sleep(2)
await disp.scroll_text('WELCOME TO PROTOSPACE!', scroll_speed=2, easing='inout', pulse=True)
disp.set_brightness(15)
await disp.marquee('PRESENTING...', scroll_speed=8, cycles=1)
await disp.marquee('THE BASH REGISTER', scroll_speed=2, cycles=1)
disp.set_brightness(15)
await disp.chase(speed=8)
disp.write_text('SEND TO')
await disp.rainbow()
disp.set_brightness(15)
await disp.marquee('PROTOSPACE.CA/SIGN', scroll_speed=2, cycles=3)
if __name__ == '__main__':
logging.info('')
logging.info('==========================')
logging.info('Booting up...')
p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03)
disp = StarburstHT16K33(brightness=10)
loop = asyncio.get_event_loop()
loop.create_task(manage_display(disp))
loop.create_task(fetch_mqtt(p))
loop.run_forever()