feat: Implement MQTT-based image printing

This commit is contained in:
2025-12-09 20:48:11 -07:00
committed by Tanner Collin (aider)
parent 85f56db64a
commit a4e0fb9623

53
main.py
View File

@@ -1,26 +1,33 @@
import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
#filename='protovax.log', encoding='utf-8',
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import io import io
import time import time
import requests import requests
import asyncio
import aiomqtt
from escpos.printer import Usb from escpos.printer import Usb
from PIL import Image, ImageEnhance from PIL import Image, ImageEnhance
from aiomqtt import Client
VENDOR_ID = 0x0416 VENDOR_ID = 0x0416
PRODUCT_ID = 0x5011 PRODUCT_ID = 0x5011
IMAGE_URL = "https://static.spaceport.dns.t0.vc/drawing.png" STATIC_URL = "https://static.spaceport.dns.t0.vc/"
PRINTER_WIDTH = 384 # Set to your printer's pixel width (common: 384 or 576) PRINTER_WIDTH = 384 # Set to your printer's pixel width (common: 384 or 576)
def main(): def print_picture(filename):
# Initialize printer # Initialize printer
p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03) p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03)
last_image_content = None
while True:
try: try:
response = requests.get(IMAGE_URL, timeout=5) response = requests.get(STATIC_URL + filename, timeout=5)
response.raise_for_status() response.raise_for_status()
if response.content != last_image_content:
print("New image detected, printing...") print("New image detected, printing...")
last_image_content = response.content last_image_content = response.content
@@ -41,7 +48,39 @@ def main():
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f"Error downloading image: {e}") print(f"Error downloading image: {e}")
time.sleep(5)
async def process_mqtt(message):
text = message.payload.decode()
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if 'spaceport/drawing/new' in topic:
print_picture(text)
else:
logging.debug('Invalid topic, returning')
return
async def fetch_mqtt():
await asyncio.sleep(3)
async with Client(
hostname='172.17.17.181',
port=1883,
) as client:
await client.subscribe('#')
async for message in client.messages:
loop = asyncio.get_event_loop()
loop.create_task(process_mqtt(message))
if __name__ == '__main__':
logging.info('')
logging.info('==========================')
logging.info('Booting up...')
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_mqtt())
if __name__ == "__main__": if __name__ == "__main__":