Files
bashregister/main.py
2026-03-22 01:47:25 +00:00

115 lines
3.0 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
#filename='protovax.log', encoding='utf-8',
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import io
import time
import requests
import asyncio
import aiomqtt
from escpos.printer import Usb
from PIL import Image, ImageEnhance
from aiomqtt import Client
from display import StarburstHT16K33
VENDOR_ID = 0x0416
PRODUCT_ID = 0x5011
PROD_STATIC_URL = 'https://static.my.protospace.ca/'
DEV_STATIC_URL = 'https://static.spaceport.dns.t0.vc/'
PRINTER_WIDTH = 384
def print_picture(topic, filename, p):
logging.info('New picture submitted: %s', filename)
if topic.startswith('dev_'):
url = DEV_STATIC_URL
else:
url = PROD_STATIC_URL
try:
#p.hw('INIT')
response = requests.get(url + filename, timeout=5)
response.raise_for_status()
logging.info('New image detected, printing...')
last_image_content = response.content
img = Image.open(io.BytesIO(response.content))
# Convert with dithering
img = img.convert('1', dither=Image.FLOYDSTEINBERG)
# Print image
p.image(img)
p.cut()
except requests.exceptions.RequestException as e:
logging.info(f'Error downloading image: {e}')
async def process_mqtt(message, p):
text = message.payload.decode()
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if 'spaceport/drawing/new' in topic:
print_picture(topic, text, p)
else:
logging.debug('Invalid topic, returning')
return
async def fetch_mqtt(p):
await asyncio.sleep(3)
async with Client(
hostname='172.17.17.181',
port=1883,
) as client:
await client.subscribe('#')
async for message in client.messages:
loop = asyncio.get_event_loop()
loop.create_task(process_mqtt(message, p))
async def manage_display(disp):
logging.info('Starting display loop...')
while True:
await asyncio.sleep(2)
disp.scroll_text('WELCOME TO PROTOSPACE!', scroll_speed=2, easing='inout', pulse=True)
disp.set_brightness(15)
disp.marquee('PRESENTING...', scroll_speed=8, cycles=1)
disp.marquee('THE BASH REGISTER', scroll_speed=2, cycles=1)
disp.set_brightness(15)
disp.chase(speed=8)
disp.write_text('SEND TO')
disp.rainbow()
disp.set_brightness(15)
disp.marquee('PROTOSPACE.CA/SIGN', scroll_speed=2, cycles=3)
if __name__ == '__main__':
logging.info('')
logging.info('==========================')
logging.info('Booting up...')
p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03)
disp = StarburstHT16K33(brightness=10)
loop = asyncio.get_event_loop()
loop.create_task(manage_display(disp))
loop.create_task(fetch_mqtt(p))
loop.run_forever()