import os, logging DEBUG = os.environ.get('DEBUG') logging.basicConfig( #filename='protovax.log', encoding='utf-8', format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) import io import time import requests import asyncio import aiomqtt from escpos.printer import Usb from PIL import Image, ImageEnhance from aiomqtt import Client VENDOR_ID = 0x0416 PRODUCT_ID = 0x5011 STATIC_URL = "https://static.spaceport.dns.t0.vc/" PRINTER_WIDTH = 384 # Set to your printer's pixel width (common: 384 or 576) def print_picture(filename, p): try: response = requests.get(STATIC_URL + filename, timeout=5) response.raise_for_status() print("New image detected, printing...") last_image_content = response.content img = Image.open(io.BytesIO(response.content)) # Resize first wpercent = (PRINTER_WIDTH / float(img.size[0])) hsize = int((float(img.size[1]) * float(wpercent))) img = img.resize((PRINTER_WIDTH, hsize), Image.LANCZOS) # Convert with dithering img = img.convert('1', dither=Image.FLOYDSTEINBERG) # Print image p.image(img) p.text('\n\n\n') p.cut() except requests.exceptions.RequestException as e: print(f"Error downloading image: {e}") async def process_mqtt(message, p): text = message.payload.decode() topic = message.topic.value logging.debug('MQTT topic: %s, message: %s', topic, text) if 'spaceport/drawing/new' in topic: print_picture(text, p) else: logging.debug('Invalid topic, returning') return async def fetch_mqtt(p): await asyncio.sleep(3) async with Client( hostname='172.17.17.181', port=1883, ) as client: await client.subscribe('#') async for message in client.messages: loop = asyncio.get_event_loop() loop.create_task(process_mqtt(message, p)) if __name__ == '__main__': logging.info('') logging.info('==========================') logging.info('Booting up...') p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03) loop = asyncio.get_event_loop() loop.run_until_complete(fetch_mqtt(p))