Files
protovac/utils.py
2026-03-04 16:24:25 -07:00

682 lines
21 KiB
Python

#!/usr/bin/env python
import os, logging
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
import requests
import pytz
import re
import os
import time
import json
import textwrap
import random
import qrcode
import urllib.parse
import unicodedata
from PIL import Image, ImageEnhance, ImageFont, ImageDraw
from datetime import datetime, timezone, timedelta
import paho.mqtt.publish as publish
try:
import secrets
wa_api_key = secrets.wa_api_key
except:
wa_api_key = None
try:
import secrets
openai_key = secrets.openai_key
FORUM_SEARCH_API_KEY = secrets.FORUM_SEARCH_API_KEY
MQTT_WRITER_PASSWORD = secrets.MQTT_WRITER_PASSWORD
except:
openai_key = None
FORUM_SEARCH_API_KEY = None
MQTT_WRITER_PASSWORD = None
TIMEZONE_CALGARY = pytz.timezone('America/Edmonton')
DRUGWARS_LOCATION = '/home/pi/protovac/env/bin/drugwars'
NETHACK_LOCATION = '/usr/games/nethack'
MORIA_LOCATION = '/usr/games/moria'
_2048_LOCATION = '/home/pi/2048-cli/2048'
FROTZ_LOCATION = '/usr/games/frotz'
HITCHHIKERS_LOCATION = '/home/pi/frotz/hhgg.z3'
SUDOKU_LOCATION = '/usr/games/nudoku'
HAS_DRUGWARS = os.path.isfile(DRUGWARS_LOCATION)
HAS_NETHACK = os.path.isfile(NETHACK_LOCATION)
HAS_MORIA = os.path.isfile(MORIA_LOCATION)
HAS_2048 = os.path.isfile(_2048_LOCATION)
HAS_FROTZ = os.path.isfile(FROTZ_LOCATION)
HAS_HITCHHIKERS = os.path.isfile(HITCHHIKERS_LOCATION)
HAS_SUDOKU = os.path.isfile(SUDOKU_LOCATION)
location = os.path.dirname(os.path.realpath(__file__))
with open(location + '/info.txt') as f:
PROTO_INFO = f.read()
for num, line in enumerate(PROTO_INFO.split('\n')):
try:
line.encode('ascii')
except UnicodeEncodeError:
print('non-ascii found in line:', num+1)
raise
with open(location + '/lastquestion.txt') as f:
LAST_QUESTION = f.read()
def format_date(datestr):
if not datestr: return 'None'
d = datetime.strptime(datestr, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=pytz.UTC)
d = d.astimezone(TIMEZONE_CALGARY)
return d.strftime('%a %b %-d, %Y %-I:%M %p')
def normalize_to_ascii(s):
return unicodedata.normalize('NFKD', s).encode('ascii', 'ignore').decode('ascii')
def truncate_string(s, max_length):
return s[:max_length-3] + '...' if len(s) > max_length else s
def sign_send(to_send):
try:
logging.info('Sending to sign: %s', to_send)
data = dict(sign=to_send, on_behalf_of='protovac')
r = requests.post('https://api.my.protospace.ca/stats/sign/', data=data, timeout=5)
r.raise_for_status()
return 'Success!'
except BaseException as e:
logging.exception(e)
return 'Error'
def protovac_sign_color(color):
try:
logging.info('Sending color to protovac sign: %s', color)
data = dict(on=True, bri=255, seg=[dict(col=[color, [0,0,0]])])
r = requests.post('http://10.139.251.5/json', json=data, timeout=3)
r.raise_for_status()
return 'Success!'
except BaseException as e:
logging.exception(e)
return 'Error'
def protovac_sign_effect(effect):
try:
logging.info('Sending effect to protovac sign: %s', effect)
data = dict(on=True, bri=255, seg=[dict(fx=effect)])
r = requests.post('http://10.139.251.5/json', json=data, timeout=3)
r.raise_for_status()
return 'Success!'
except BaseException as e:
logging.exception(e)
return 'Error'
def fetch_stats():
try:
logging.info('Fetching status...')
r = requests.get('https://api.my.protospace.ca/stats/', timeout=5)
r.raise_for_status()
return r.json()
except BaseException as e:
logging.exception(e)
return 'Error'
def fetch_classes():
try:
logging.info('Fetching classes...')
r = requests.get('https://api.my.protospace.ca/sessions/', timeout=5)
r.raise_for_status()
return r.json()['results']
except BaseException as e:
logging.exception(e)
return 'Error'
def fetch_protocoin():
try:
logging.info('Fetching protocoin...')
r = requests.get('https://api.my.protospace.ca/protocoin/transactions/', timeout=5)
r.raise_for_status()
return r.json()
except BaseException as e:
logging.exception(e)
return 'Error'
def mqtt_publish(topic, message):
if not MQTT_WRITER_PASSWORD:
return False
try:
publish.single(
topic,
str(message),
hostname='172.17.17.181',
port=1883,
client_id='protovac',
keepalive=5, # timeout
)
except BaseException as e:
logging.error('Problem sending MQTT message: ' + str(e))
QUOTES = [
'THEY MADE ME WEAR THIS',
'ASK ME ABOUT TOAST',
'ASK ME ABOUT BIKESHEDDING',
'ASK ME ABOUT VETTING',
'ASK ME ABOUT MAGNETS',
'ASK ME ABOUT SPACE',
'ASK ME ABOUT COUNTING',
'EXPERT WITNESS',
'AS SEEN ON TV',
'CONTAINS MEAT',
'PROTOCOIN ECONOMIST',
'EXPERT ON ALIENS',
'EXPERT ON WARP DRIVES',
'CHIEF OF STARFLEET OPERATIONS',
'ALIEN DOCTOR',
'NASA ASTROLOGIST',
'PINBALL WIZARD',
'JEDI KNIGHT',
'GHOSTBUSTER',
'DOUBLE AGENT',
'POKEMON TRAINER',
'POKEMON GYM LEADER',
'ASSISTANT TO THE REGIONAL MANAGER',
'BOUNTY HUNTER',
'I\'M NOT A DOCTOR',
'SPACE PIRATE',
'BATTERIES NOT INCLUDED',
'QUANTUM MECHANIC',
'PROTO SPACEX PILOT',
'EARTHBENDER',
'AIRBENDER',
'WATERBENDER',
'FIREBENDER',
'01001000 01101001',
'CURRENT EBAY BID: $8.51',
'MADE YOU LOOK!',
'(OR SIMILAR PRODUCT)',
'BATTERY MAY EXPLODE OR LEAK',
'CONNECT GROUND WIRE TO AVOID SHOCK',
'COOK THROROUGHLY',
'CURRENT AT TIME OF PRINTING',
'DO NOT BLEACH',
'DO NOT LEAVE UNATTENDED',
'DO NOT REMOVE TAG UNDER PENALTY OF LAW',
'DROP IN ANY MAILBOX',
'EDITED FOR TELEVISION',
'FOR A LIMITED TIME ONLY',
'FOR INDOOR OR OUTDOOR USE ONLY',
'KEEP AWAY FROM FIRE OR FLAMES',
'KEEP AWAY FROM SUNLIGHT',
'MADE FROM 100% RECYCLED ELECTRONS',
'LIFEGUARD ON DUTY',
'NOT DISHWASHER SAFE',
'NOT TO BE COMBINED WITH OTHER RADIOISOTOPES',
'NOT TO BE USED AS A PERSONAL FLOTATION DEVICE',
'PEEL FROM PAPER BACKING BEFORE EATING',
'STORE IN A COOL, DRY PLACE',
'VOID WHERE PROHIBITED',
'THE FUTURE IS NOW',
'MASTER OF DISGUISE',
'YOUR PERSONAL TIME TRAVEL GUIDE',
'OFFICIAL TASTE TESTER',
'INTERGALACTIC AMBASSADOR',
'VIRTUAL REALITY PIONEER',
'PARANORMAL INVESTIGATOR',
'UNDERCOVER SUPERHERO',
'THE COSMIC CHEF',
'THE ROBOT WHISPERER',
'THE DREAM WEAVER',
'USE AT YOUR OWN RISK',
'RESULTS MAY VARY',
'READ INSTRUCTIONS CAREFULLY',
'KEEP OUT OF REACH OF PETS',
'USE ONLY AS DIRECTED',
'NOT INTENDED FOR MEDICAL USE',
'DO NOT USE IF SEAL IS BROKEN',
'PRODUCT SOLD AS-IS',
'NO WARRANTIES, EXPRESS OR IMPLIED',
'USE CAUTION WHEN HANDLING',
'CRASH OVERRIDE',
'ACID BURN',
'CEREAL KILLER',
'ZERO COOL',
'ASK ME HOW I SAVED 15% ON MY CAR INSURANCE',
'TELL YOUR CAT I SAID PSP PSP PSP',
'I\'M BEGGING YOU, DON\'T SAY HI',
'BACK 2 BACK HOTDOG EATING WORLD CHAMPION',
'I ATE A BROWNIE ONCE',
'THIS ISN\'T ACTUALLY MY NAME',
'I OFTEN WONDER HOW I EVEN GOT HERE',
'YOU READ THIS, NOW WE MUST DUEL',
'DAVE\'S ARCH NEMESIS',
'EXCEPTIONALLY MID',
'VOTE ME 4 PREZADENT',
'PREVALENT IN OTHER WAYS',
'SCINTILLATING CHATOYANCY',
'TRAIN EXPERT',
'ASSISTANT MANAGER',
'WOW, ANOTHER TAG LINE',
'I SURVIVED VETTING AND ALL I GOT WAS THIS LOUSY NAME-TAG',
'NOT GUEST',
'WHEN WAS THE LAST TIME YOU TOOK OUT A GARBAGE?',
'YOU HAD ME AT BATMAN',
'DAD?',
'LEAD PROJECT UN-FINISHER',
'NOODLE CONNOISSEUR',
'GARTH\'S #1 FAN',
'UNVETTABLE',
'B-',
'CLOWN CAR DRIVER',
'IS A NAME TAG ON BREAD CONSIDERED A SANDWICH?',
'IS A NAME TAG FOLDED IN HALF CONSIDERED A TACO?',
'DISHWASHER SAFE',
'WET NOODLE',
'I\'M HERE FOR THE SIMULATION',
'BIRDS ARE NOT REAL',
]
random.shuffle(QUOTES)
quote_count = 0
assigned_quotes = {}
def print_nametag(name, guest=False):
global quote_count
quote = ''
if guest:
quote_size = 120
quote = 'GUEST'
logging.info('Printing guest nametag for: %s', name)
else:
quote_size = 80
name_lookup = name.lower()[:4]
if name_lookup in assigned_quotes:
quote = assigned_quotes[name_lookup]
else:
quote = QUOTES[quote_count % len(QUOTES)]
quote_count += 1
assigned_quotes[name_lookup] = quote
logging.info('Printing member nametag for: %s, quote: %s', name, quote)
name_size = 305
im = Image.open(location + '/label.png')
width, height = im.size
draw = ImageDraw.Draw(im)
w = 9999
while w > 1084:
name_size -= 5
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', name_size)
w, h = draw.textsize(name, font=font)
x, y = (width - w) / 2, ((height - h) / 2) - 20
draw.text((x, y), name, font=font, fill='black')
w = 9999
while w > 1200:
quote_size -= 5
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', quote_size)
w, h = draw.textsize(quote, font=font)
x, y = (width - w) / 2, height - h - 30
draw.text((x, y), quote, font=font, fill='black')
im.save('tmp.png')
os.system('lp -d dymo tmp.png > /dev/null 2>&1')
def print_tool_label(wiki_num):
im = Image.open(location + '/blank.png')
w1, h1 = im.size
logging.info('Printing tool label for ID: %s', wiki_num)
draw = ImageDraw.Draw(im)
params = {'id': str(wiki_num), 'size': '4'}
res = requests.get('https://labels.protospace.ca/', stream=True, params=params, timeout=5)
res.raise_for_status()
label = Image.open(res.raw)
new_size = (1280, 640)
label = label.resize(new_size, Image.ANTIALIAS)
w2, h2 = label.size
x, y = int((w1 - w2) / 2), int((h1 - h2) / 2)
im.paste(label, (x, y))
im.save('tmp.png')
os.system('lp -d dymo tmp.png > /dev/null 2>&1')
def print_sheet_label(name, contact):
def get_date():
d = datetime.now(tz=timezone.utc)
d = d.astimezone(TIMEZONE_CALGARY)
return d.strftime('%b %-d, %Y')
def get_expiry_date():
d = datetime.now(tz=timezone.utc) + timedelta(days=90)
d = d.astimezone(TIMEZONE_CALGARY)
return d.strftime('%b %-d, %Y')
logging.info('Printing sheet label for: %s, contact: %s', name, contact)
name_size = 85
contact_size = 65
date_size = 65
im = Image.open(location + '/label.png')
draw = ImageDraw.Draw(im)
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', name_size)
draw.text((20, 300), name, font=font, fill='black')
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', contact_size)
draw.text((20, 425), contact, font=font, fill='black')
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', date_size)
date_line = 'Printed: ' + get_date()
draw.text((20, 590), date_line, font=font, fill='black')
date_line = 'EXPIRES: ' + get_expiry_date()
draw.text((20, 680), date_line, font=font, fill='black')
im.save('tmp.png')
os.system('lp -d dymo tmp.png > /dev/null 2>&1')
def _fit_text(text, font_size, draw, max_w, max_h):
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', font_size)
pad = 5
for cols in range(100, 1, -4):
paragraph = textwrap.wrap(text, width=cols, break_long_words=False)
total_h = -pad
total_w = 0
for line in paragraph:
w, h = draw.textsize(line, font=font)
if w > total_w:
total_w = w
total_h += h + pad
if total_w <= max_w and total_h < max_h:
return True, paragraph, total_h
return False, [], 0
def print_generic_label(text):
MARGIN = 50
MAX_W, MAX_H, PAD = 1285 - (MARGIN*2), 635 - (MARGIN*2), 5
logging.info('Printing generic label: %s', text)
im = Image.open(location + '/label.png')
width, height = im.size
draw = ImageDraw.Draw(im)
font_size_range = [1, 500]
# Thanks to Alex (UDIA) for the binary search algorithm
while abs(font_size_range[0] - font_size_range[1]) > 1:
font_size = sum(font_size_range) // 2
image_fit, check_para, check_h = _fit_text(text, font_size, draw, MAX_W, MAX_H)
if image_fit:
font_size_range = [font_size, font_size_range[1]]
good_size = font_size
paragraph = check_para
total_h = check_h
else:
font_size_range = [font_size_range[0], font_size]
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', good_size)
offset = height - MAX_H - MARGIN
start_h = (MAX_H - total_h) // 2 + offset
current_h = start_h
for line in paragraph:
w, h = draw.textsize(line, font=font)
x, y = (MAX_W - w) / 2, current_h
draw.text((x+MARGIN, y), line, font=font, fill='black')
current_h += h + PAD
im.save('tmp.png')
os.system('lp -d dymo tmp.png > /dev/null 2>&1')
def print_consumable_label(item):
im = Image.open(location + '/label.png')
width, height = im.size
draw = ImageDraw.Draw(im)
logging.info('Printing consumable label item: %s', item)
encodeded = urllib.parse.quote(item)
url = 'https://my.protospace.ca/out-of-stock?item=' + encodeded
qr = qrcode.make(url, version=6, box_size=9)
im.paste(qr, (840, 325))
item_size = 150
w = 9999
while w > 1200:
item_size -= 5
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', item_size)
w, h = draw.textsize(item, font=font)
x, y = (width - w) / 2, ((height - h) / 2) - 140
draw.text((x, y), item, font=font, fill='black')
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', 100)
draw.text((100, 410), 'Out of stock?', font=font, fill='black')
draw.text((150, 560), 'Scan here:', font=font, fill='black')
im.save('tmp.png')
os.system('lp -d dymo tmp.png > /dev/null 2>&1')
def print_forum_label(thread):
im = Image.open(location + '/label.png')
width, height = im.size
draw = ImageDraw.Draw(im)
logging.info('Printing forum thread ID: %s, title: %s', thread['id'], thread['title'])
url = 'https://forum.protospace.ca/t/{}/'.format(thread['id'])
qr = qrcode.make(url, version=6, box_size=9)
im.paste(qr, (840, 150))
item_size = 150
w = 9999
while w > 1200:
item_size -= 5
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', item_size)
w, h = draw.textsize(url, font=font)
x, y = (width - w) / 2, ((height - h) / 2) + 300
draw.text((x, y), url, font=font, fill='black')
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf', 80)
draw.text((120, 150), 'Forum Thread', font=font, fill='black')
text = thread['title']
MARGIN = 50
MAX_W, MAX_H, PAD = 900 - (MARGIN*2), 450 - (MARGIN*2), 5
font_size_range = [1, 500]
# Thanks to Alex (UDIA) for the binary search algorithm
while abs(font_size_range[0] - font_size_range[1]) > 1:
font_size = sum(font_size_range) // 2
image_fit, check_para, check_h = _fit_text(text, font_size, draw, MAX_W, MAX_H)
if image_fit:
font_size_range = [font_size, font_size_range[1]]
good_size = font_size
paragraph = check_para
total_h = check_h
else:
font_size_range = [font_size_range[0], font_size]
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', good_size)
offset = height - MAX_H - MARGIN
start_h = -100 + offset
current_h = start_h
for line in paragraph:
w, h = draw.textsize(line, font=font)
x, y = (MAX_W - w) / 2, current_h
draw.text((x+MARGIN, y), line, font=font, fill='black')
current_h += h + PAD
im.save('tmp.png')
os.system('lp -d dymo tmp.png > /dev/null 2>&1')
def search_forum_thread(search):
params = dict(q=search + ' in:title')
headers = {'Api-Key': FORUM_SEARCH_API_KEY, 'Api-Username': 'system'}
results = []
r = requests.get('https://forum.protospace.ca/search.json', params=params, headers=headers, timeout=5)
r.raise_for_status()
r = r.json()
for topic in r.get('topics', []):
results.append(dict(
id=topic['id'],
title=normalize_to_ascii(topic['title']),
))
return sorted(results, key=lambda x: x['id'], reverse=True)[:7]
def message_protovac(thread):
try:
logging.info('Message to Protovac: %s', thread[-1]['content'])
data = dict(
messages=thread,
model='gpt-3.5-turbo',
temperature=0.5,
user='Protovac',
max_tokens=1000,
)
headers = {'Authorization': 'Bearer ' + openai_key}
res = None
try:
res = requests.post('https://api.openai.com/v1/chat/completions', json=data, headers=headers, timeout=4)
except requests.ReadTimeout:
logging.info('Got timeout, modifying prompt...')
data['messages'][-1]['content'] = 'Be terse in your response to this: ' + data['messages'][-1]['content']
res = requests.post('https://api.openai.com/v1/chat/completions', json=data, headers=headers, timeout=20)
res.raise_for_status()
res = res.json()
gpt_reply = res['choices'][0]['message']
logging.info('Message reply: %s', gpt_reply['content'])
return gpt_reply
except BaseException as e:
logging.exception(e)
return dict(role='assistant', content='INSUFFICIENT DATA FOR A MEANINGFUL ANSWER.')
if wa_api_key:
import wolframalpha
wa_client = wolframalpha.Client(wa_api_key)
else:
wa_client = None
def think_send(query):
if not wa_client:
return 'WOLFRAM|ALPHA API KEY NOT CONFIGURED'
result = ''
try:
res = wa_client.query(query, timeout=10)
except BaseException as e:
logging.error('Error hitting W|A API: {} - {}\n'.format(e.__class__.__name__, e))
return 'Network error'
if 'didyoumeans' in res:
try:
guess = res['didyoumeans']['didyoumean']['#text']
except TypeError:
guess = res['didyoumeans']['didyoumean'][0]['#text']
next_result = think_send(guess)
result += 'Confused, using \'' + guess + '\'\n' + next_result
elif 'pod' in res:
pods = res['pod'] if isinstance(res['pod'], list) else [res['pod']]
for pod in pods:
title = pod['@title']
subpods = pod['subpod'] if isinstance(pod['subpod'], list) else [pod['subpod']]
plaintexts = []
for subpod in subpods:
if subpod['plaintext']:
plaintexts.append(subpod['plaintext'])
plaintext = '; '.join(plaintexts)
if any([x in title.lower() for x in ['input', 'conversion', 'corresponding', 'comparison', 'interpretation']]):
pass
elif 'definition' in title.lower():
if plaintext[0] == '1':
definition = plaintext.split('\n')[0].split(' | ', 1)[1]
else:
definition = plaintext
result += 'Definition: ' + definition + '\n'
elif 'result' in title.lower():
if re.match(r'^\d+/\d+$', plaintext):
plaintext += '\n' + think_send(plaintext + '.0')
if 'base' in query.lower() and '_' in plaintext:
plaintext = '(Base conversion) "' + plaintext + '"'
if '(irreducible)' in plaintext and '/' in plaintext:
result = think_send(query + '.0')
break
else:
result += 'Result: ' + plaintext + '\n'
break
elif plaintext:
result += title + ': ' + plaintext + '\n'
break
else:
result = 'Error'
result = result.strip()
if len(result) > 500:
result = result[:500] + '... truncated.'
elif len(result) == 0:
result = 'Error'
result = result.replace('Wolfram|Alpha', 'Protovac')
result = result.replace('Stephen Wolfram', 'Tanner') # lol
result = result.replace('and his team', '')
for word in ['according to', 'asked', 'although', 'approximately']:
idx = result.lower().find('('+word)
if idx > 0:
result = result[:idx-1]
if result == 'Error':
result = 'INSUFFICIENT DATA FOR A MEANINGFUL ANSWER.'
return result