Compare commits

..

9 Commits

Author SHA1 Message Date
e602c47808 Fixes 2025-12-30 20:53:17 -07:00
6ff5f33ba1 fix: Flatten RGBA images before printing
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 21:34:56 -07:00
259cb0cf1d chore: Remove commented-out image resize logic 2025-12-09 21:34:54 -07:00
6c0be6ed95 fix: Initialize printer hardware before each print
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 21:13:21 -07:00
56574b4c14 refactor: Disable image resizing before dithering 2025-12-09 21:13:19 -07:00
a8132f9653 fix: Replace unsupported p.feed with newlines for paper feed
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 20:58:47 -07:00
c38549c7d7 fix: Feed paper after image for easier tearing
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 20:54:36 -07:00
5fc4811c09 fix: Init printer once, pass object, and remove redundant main block
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-09 20:48:16 -07:00
a4e0fb9623 feat: Implement MQTT-based image printing 2025-12-09 20:48:11 -07:00

103
main.py
View File

@@ -1,49 +1,80 @@
import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
#filename='protovax.log', encoding='utf-8',
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import io
import time
import requests
import asyncio
import aiomqtt
from escpos.printer import Usb
from PIL import Image, ImageEnhance
from aiomqtt import Client
VENDOR_ID = 0x0416
PRODUCT_ID = 0x5011
IMAGE_URL = "https://static.spaceport.dns.t0.vc/drawing.png"
PRINTER_WIDTH = 384 # Set to your printer's pixel width (common: 384 or 576)
STATIC_URL = 'https://static.spaceport.dns.t0.vc/'
PRINTER_WIDTH = 384
def main():
# Initialize printer
def print_picture(filename, p):
logging.info('New picture submitted: %s', filename)
try:
p.hw('INIT')
response = requests.get(STATIC_URL + filename, timeout=5)
response.raise_for_status()
print('New image detected, printing...')
last_image_content = response.content
img = Image.open(io.BytesIO(response.content))
# Convert with dithering
img = img.convert('1', dither=Image.FLOYDSTEINBERG)
# Print image
p.image(img)
p.cut()
except requests.exceptions.RequestException as e:
print(f'Error downloading image: {e}')
async def process_mqtt(message, p):
text = message.payload.decode()
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if 'spaceport/drawing/new' in topic:
print_picture(text, p)
else:
logging.debug('Invalid topic, returning')
return
async def fetch_mqtt(p):
await asyncio.sleep(3)
async with Client(
hostname='172.17.17.181',
port=1883,
) as client:
await client.subscribe('#')
async for message in client.messages:
loop = asyncio.get_event_loop()
loop.create_task(process_mqtt(message, p))
if __name__ == '__main__':
logging.info('')
logging.info('==========================')
logging.info('Booting up...')
p = Usb(VENDOR_ID, PRODUCT_ID, interface=0, in_ep=0x81, out_ep=0x03)
last_image_content = None
while True:
try:
response = requests.get(IMAGE_URL, timeout=5)
response.raise_for_status()
if response.content != last_image_content:
print("New image detected, printing...")
last_image_content = response.content
img = Image.open(io.BytesIO(response.content))
# Resize first
wpercent = (PRINTER_WIDTH / float(img.size[0]))
hsize = int((float(img.size[1]) * float(wpercent)))
img = img.resize((PRINTER_WIDTH, hsize), Image.LANCZOS)
# Convert with dithering
img = img.convert('1', dither=Image.FLOYDSTEINBERG)
# Print image
p.image(img)
p.cut()
except requests.exceptions.RequestException as e:
print(f"Error downloading image: {e}")
time.sleep(5)
if __name__ == "__main__":
main()
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_mqtt(p))