Files
bashregister/main.py
2025-12-30 20:53:17 -07:00

81 lines
2.0 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
#filename='protovax.log', encoding='utf-8',
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import io
import time
import requests
import asyncio
import aiomqtt
from escpos.printer import Usb
from PIL import Image, ImageEnhance
from aiomqtt import Client
VENDOR_ID = 0x0416
PRODUCT_ID = 0x5011
STATIC_URL = 'https://static.spaceport.dns.t0.vc/'
PRINTER_WIDTH = 384
def print_picture(filename, p):
logging.info('New picture submitted: %s', filename)
try:
p.hw('INIT')
response = requests.get(STATIC_URL + filename, timeout=5)
response.raise_for_status()
print('New image detected, printing...')
last_image_content = response.content
img = Image.open(io.BytesIO(response.content))
# Convert with dithering
img = img.convert('1', dither=Image.FLOYDSTEINBERG)
# Print image
p.image(img)
p.cut()
except requests.exceptions.RequestException as e:
print(f'Error downloading image: {e}')
async def process_mqtt(message, p):
text = message.payload.decode()
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if 'spaceport/drawing/new' in topic:
print_picture(text, p)
else:
logging.debug('Invalid topic, returning')
return
async def fetch_mqtt(p):
await asyncio.sleep(3)
async with Client(
hostname='172.17.17.181',
port=1883,
) as client:
await client.subscribe('#')
async for message in client.messages:
loop = asyncio.get_event_loop()
loop.create_task(process_mqtt(message, p))
if __name__ == '__main__':
logging.info('')
logging.info('==========================')
logging.info('Booting up...')
p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03)
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_mqtt(p))