Compare commits

...

9 Commits

Author SHA1 Message Date
e602c47808 Fixes 2025-12-30 20:53:17 -07:00
6ff5f33ba1 fix: Flatten RGBA images before printing
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 21:34:56 -07:00
259cb0cf1d chore: Remove commented-out image resize logic 2025-12-09 21:34:54 -07:00
6c0be6ed95 fix: Initialize printer hardware before each print
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 21:13:21 -07:00
56574b4c14 refactor: Disable image resizing before dithering 2025-12-09 21:13:19 -07:00
a8132f9653 fix: Replace unsupported p.feed with newlines for paper feed
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 20:58:47 -07:00
c38549c7d7 fix: Feed paper after image for easier tearing
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 20:54:36 -07:00
5fc4811c09 fix: Init printer once, pass object, and remove redundant main block
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 20:48:16 -07:00
a4e0fb9623 feat: Implement MQTT-based image printing 2025-12-09 20:48:11 -07:00

103
main.py
View File

@@ -1,49 +1,80 @@
import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
#filename='protovax.log', encoding='utf-8',
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import io import io
import time import time
import requests import requests
import asyncio
import aiomqtt
from escpos.printer import Usb from escpos.printer import Usb
from PIL import Image, ImageEnhance from PIL import Image, ImageEnhance
from aiomqtt import Client
VENDOR_ID = 0x0416 VENDOR_ID = 0x0416
PRODUCT_ID = 0x5011 PRODUCT_ID = 0x5011
IMAGE_URL = "https://static.spaceport.dns.t0.vc/drawing.png" STATIC_URL = 'https://static.spaceport.dns.t0.vc/'
PRINTER_WIDTH = 384 # Set to your printer's pixel width (common: 384 or 576) PRINTER_WIDTH = 384
def main(): def print_picture(filename, p):
# Initialize printer logging.info('New picture submitted: %s', filename)
try:
p.hw('INIT')
response = requests.get(STATIC_URL + filename, timeout=5)
response.raise_for_status()
print('New image detected, printing...')
last_image_content = response.content
img = Image.open(io.BytesIO(response.content))
# Convert with dithering
img = img.convert('1', dither=Image.FLOYDSTEINBERG)
# Print image
p.image(img)
p.cut()
except requests.exceptions.RequestException as e:
print(f'Error downloading image: {e}')
async def process_mqtt(message, p):
text = message.payload.decode()
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if 'spaceport/drawing/new' in topic:
print_picture(text, p)
else:
logging.debug('Invalid topic, returning')
return
async def fetch_mqtt(p):
await asyncio.sleep(3)
async with Client(
hostname='172.17.17.181',
port=1883,
) as client:
await client.subscribe('#')
async for message in client.messages:
loop = asyncio.get_event_loop()
loop.create_task(process_mqtt(message, p))
if __name__ == '__main__':
logging.info('')
logging.info('==========================')
logging.info('Booting up...')
p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03) p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03)
last_image_content = None
while True: loop = asyncio.get_event_loop()
try: loop.run_until_complete(fetch_mqtt(p))
response = requests.get(IMAGE_URL, timeout=5)
response.raise_for_status()
if response.content != last_image_content:
print("New image detected, printing...")
last_image_content = response.content
img = Image.open(io.BytesIO(response.content))
# Resize first
wpercent = (PRINTER_WIDTH / float(img.size[0]))
hsize = int((float(img.size[1]) * float(wpercent)))
img = img.resize((PRINTER_WIDTH, hsize), Image.LANCZOS)
# Convert with dithering
img = img.convert('1', dither=Image.FLOYDSTEINBERG)
# Print image
p.image(img)
p.cut()
except requests.exceptions.RequestException as e:
print(f"Error downloading image: {e}")
time.sleep(5)
if __name__ == "__main__":
main()