import os, logging DEBUG = os.environ.get('DEBUG') logging.basicConfig( #filename='protovax.log', encoding='utf-8', format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) import asyncio import requests import random import json import base64 import time import re import pytz from PIL import Image from datetime import datetime, timezone, timedelta from io import BytesIO from telethon import TelegramClient, events import secrets TANNERCOIN = -1001394158904 SANDBOX = -1001258001846 PROTOSPACE = -1001298858490 JASON = 172033414 TANNER = 79316791 TANNERBOT_MICROSCOPY = -1001886549266 TIMEZONE_TANNER = pytz.timezone('America/Edmonton') scope_subject = '' scope_zoom = '' client = TelegramClient('data/dreamer.session', secrets.TG_API_ID, secrets.TG_API_HASH) def snap_size(size): VALID_SIZES = [512, 576, 640, 704, 768, 832, 896, 960, 1024] for s in VALID_SIZES: if s >= size: return s def dreamer(prompt, negative_prompt='', steps=30, seed=1234, height=512, width=512): url = secrets.DREAM_API sd_settings = dict( prompt=prompt, negative_prompt=negative_prompt, steps=steps, seed=seed, height=snap_size(height), width=snap_size(width), save_images=True, ) if width > 512 or height > 512: # highres fix sd_settings['enable_hr'] = True logging.info('Dreaming of: %s', prompt) try: res = requests.post(url, json=sd_settings, timeout=40) res.raise_for_status() res = res.json() return res except BaseException as e: logging.error('Problem dreaming {}: {} - {}'.format(url, e.__class__.__name__, str(e))) return None async def message_handler(event): global scope_subject, scope_zoom text = event.raw_text chat = event.chat_id sender = await event.get_sender() if not event.sender: return if event.message.fwd_from: return name = getattr(sender, 'first_name', None) if not name: return command = '' data = '' reply = '' reply_file = None if event.message.is_reply and 'plz type it' in text.lower(): text = '!transcribe' if text.startswith('! '): text = text[2:] elif text.startswith('!'): text = text[1:] else: return if ' ' in text: command = text.split(' ', 1)[0] data = text.split(' ', 1)[1] else: command = text command = command.lower() #!chat if command in ['chat', 'yarr', 'hwhat']: await client.send_read_acknowledge(event.chat, message=event.message, clear_mentions=True) return #!transacribe if command == 'transcribe': logging.info('Chat: {:<14} | ID: {:<6} | User: {} {} | Command: {} | Data: {}'.format(chat, event.id, name, sender.id, command, data or 'None')) if not reply and not event.message.is_reply: reply = '> reply to the voice message you want transcribed' await event.reply(reply, link_preview=False) if not reply: reply_message = await event.get_reply_message() try: if 'audio' in reply_message.media.document.mime_type: audio = BytesIO() await client.download_media(reply_message, file=audio) audio.seek(0) else: raise except: reply = '> reply to a voice message' await event.reply(reply, link_preview=False) if not reply: try: url = secrets.WHISPER_API res = requests.post(url, files={'audio': audio.getvalue()}, timeout=30) res.raise_for_status() res = res.json() logging.info('Whisper response: %s', str(res)) reply = '> audio transcription: "{}"'.format(res['text'].strip()) await event.reply(reply, link_preview=False) except: reply = '> error' await event.reply(reply, link_preview=False) #!dumpster if command == 'dumpster': logging.info('Chat: {:<14} | ID: {:<6} | User: {} {} | Command: {} | Data: {}'.format(chat, event.id, name, sender.id, command, data or 'None')) headers = {'Authorization': 'Basic YWRtaW46NXFoaDM0ZHFqMzRx'} try: res = requests.get('http://cameras.dns.t0.vc/image/Dump?&w=9999', headers=headers, timeout=4) res.raise_for_status() with open('tmp.jpg', 'wb') as f: f.write(res.content) await event.reply('', file='tmp.jpg', link_preview=False) except: reply = '> error' await event.reply(reply, link_preview=False) #!dumpstervid if command == 'dumpstervid': logging.info('Chat: {:<14} | ID: {:<6} | User: {} {} | Command: {} | Data: {}'.format(chat, event.id, name, sender.id, command, data or 'None')) reply_file = await client.upload_file('dumpster.mp4') await event.reply('', file=reply_file, link_preview=False) #!garden if command == 'garden': logging.info('Chat: {:<14} | ID: {:<6} | User: {} {} | Command: {} | Data: {}'.format(chat, event.id, name, sender.id, command, data or 'None')) headers = {'Authorization': 'Basic YWRtaW46NXFoaDM0ZHFqMzRx'} try: res = requests.get('http://cameras.dns.t0.vc/image/G-1?&w=9999', headers=headers, timeout=4) res.raise_for_status() with open('tmp.jpg', 'wb') as f: f.write(res.content) start = datetime(2023, 3, 5, 19, 0, tzinfo=TIMEZONE_TANNER) delta = datetime.now(pytz.utc) - start reply = '> secret garden #3 day {}'.format(delta.days + 1) reply += '\n> iceberg lettuce' await event.reply(reply, file='tmp.jpg', link_preview=False) except: reply = '> error' await event.reply(reply, link_preview=False) #!gardenvid if command == 'gardenvid': logging.info('Chat: {:<14} | ID: {:<6} | User: {} {} | Command: {} | Data: {}'.format(chat, event.id, name, sender.id, command, data or 'None')) reply_file = await client.upload_file('garden.mp4') start = datetime(2023, 3, 5, 19, 0, tzinfo=TIMEZONE_TANNER) delta = datetime.now(pytz.utc) - start reply = '> secret garden #3 day {}'.format(delta.days + 1) reply += '\n> iceberg lettuce' await event.reply(reply, file=reply_file, link_preview=False) #!setscope if command == 'setscope' and data: logging.info('Chat: {:<14} | ID: {:<6} | User: {} {} | Command: {} | Data: {}'.format(chat, event.id, name, sender.id, command, data or 'None')) if not reply and sender.id != TANNER: reply = '> do !scope to look under the microscope' await event.reply(reply) if not reply: scope_subject = data for zoom in ['4x', '10x', '40x']: if zoom in scope_subject: scope_zoom = zoom scope_subject = scope_subject.replace(zoom, '') now = datetime.now(tz=timezone.utc) - timedelta(seconds=1) reply = '> ' + scope_subject file_name = 'frames/' + now.strftime('image%S.jpg') frame = Image.open(file_name) if scope_zoom: label = Image.open(scope_zoom + '_label.png') frame.paste(label, (0,0), label) frame.save('tmp.jpg') reply_msg = await event.reply(reply, file='tmp.jpg', link_preview=False) await reply_msg.forward_to(TANNERBOT_MICROSCOPY) #!scope if command == 'scope': logging.info('Chat: {:<14} | ID: {:<6} | User: {} {} | Command: {} | Data: {}'.format(chat, event.id, name, sender.id, command, data or 'None')) if not reply: now = datetime.now(tz=timezone.utc) - timedelta(seconds=1) if scope_subject: reply = '> ' + scope_subject file_name = 'frames/' + now.strftime('image%S.jpg') frame = Image.open(file_name) if scope_zoom: label = Image.open(scope_zoom + '_label.png') frame.paste(label, (0,0), label) frame.save('tmp.jpg') reply_msg = await event.reply(reply, file='tmp.jpg', link_preview=False) #!dream if command == 'dream': logging.info('Chat: {:<14} | ID: {:<6} | User: {} {} | Command: {} | Data: {}'.format(chat, event.id, name, sender.id, command, data or 'None')) if not reply and chat == PROTOSPACE: reply = '> please join {} to dream'.format(secrets.PROTODREAM_URL) #if not reply and sender.id == JASON: # reply = '> no.' to_send = '' if not reply and data: to_send = data if not reply and not to_send and event.message.is_reply: reply_msg = await event.get_reply_message() to_send = reply_msg.raw_text if not reply and '> ' in to_send: reply = '> sounds like a bad !dream' if not reply and len(to_send) > 500: reply = '> impressive. very nice. let\'s see Paul Allen\'s prompt.' if not reply and not to_send: reply = '> usage: !dream [prompt] [options]' reply += '\n> or reply to someone with !dream' reply += '\n> default options:' reply += '\n> steps:30 (20-40)' reply += '\n> height:512 (512-1024)' reply += '\n> width:512 (512-1024)' reply += '\n> seed:number' reply += '\n> negative_prompt:text,words (no spaces)' reply += '\n> favourites: https://t.me/+UyMF3ArBe4pmYzQ5' if not reply: settings = { 'steps': 30, 'height': 512, 'width': 512, 'negative_prompt': 'text,words', 'seed': random.randint(11111, 19999), } maximums = { 'steps': 40, 'height': 1024, 'width': 1024, } minimums = { 'steps': 20, 'height': 512, 'width': 512, } for match in re.findall('((\w+)[:=](\d+))', to_send): whole = match[0] setting = match[1] value = int(match[2]) to_send = to_send.replace(whole, '').replace(' ', ' ') if setting in maximums and value > maximums[setting]: reply = '> {} should be <= {}'.format(setting, maximums[setting]) break if setting in minimums and value < minimums[setting]: reply = '> {} should be >= {}'.format(setting, minimums[setting]) break if setting in settings: settings[setting] = value if not reply: try: to_send = to_send.replace('@', '') to_send = to_send.replace('!', '') to_send = to_send.strip() prompt = to_send if len(str(settings['seed'])) == 4: enhancements1 = [ ', fantasy, intricate, elegant, digital painting, concept art, sharp focus, illustration by greg rutkowski, 4k', ', synthwave style, artstation, detailed, award winning, dramatic lighting, miami vice, oil on canvas', ', highly detailed painting by dustin nguyen, akihiko yoshida, greg tocchini, 4 k, trending on artstation, 8 k', ', cinematic lighting, insanely detailed, intricate, artstation, cgsociety, painted by simon stalenhag, concept art, illustration, sharp focus', ', photorealistic, highly detailed, illustration, lifelike, highly detailed, intricate, octane render, sharp focus, cyberpunk', ', by ghibli, makoto shinkai, bright and transparent animation style, anime art wallpaper 4 k, trending on artstation', ', painterly, sharp focus, illustration, art by lois royo', ', in the style of gta cover art, by stephen bliss, trending on artstation, pixiv, 8 k', ', d & d, fantasy concept art, octane render, 8 k', ', peter mohrbacher, charlie bowater, artstation, craig mullins, cinematic, digital painting, deviantart, cinematic lighting, 4 k', ', tristan eaton, victo ngai, maxfield parrish, artgerm, koons, ryden, intricated details', ', by soul bass, kawase hasui, moebius and edward hopper, colorful flat surreal design, xray hd, 8 k', ', manga cover art, detailed color portrait, artstation trending, 8 k', ', a hacker hologram by penny patricia poppycock, pixabay contest winner, holography, irridescent, photoillustration, maximalist vaporwave - n 9', ', breathtaking, epic, stunning, gorgeous, much detail, much wow, cgsociety, wlop, pixiv, behance, deviantart, masterpiece', ', sharp focus, dynamic lighting, elegant, harmony, beauty, masterpiece, by roberto ferry, illustration, blue background', ', ultradetailed environment, sharp focus, cinematic lighting, by alphonse maria mucha and kim jung gi', ', concept art by jama jurabaev, cinematic shot, trending on artstation, high quality, brush stroke, hyperspace, vibrant colors', ', hyper detailed, sharp focus, bokeh, unreal engine, ray tracing, cute, fantasy, sci fi, purple lights, tiny, small', ', fancy, ultra detailed, brush strokes, digital painting, cinematic, yoshitaka amano, andy warhol, ultra realistic', ', by tsuyoshi nagano, illustration, cinematic lighting, hyperdetailed, 8 k, symmetrical, frostbite 3 engine, cryengine, dof, trending on artstation, digital art, crepuscular ray', ', retrowave, tonal separation, black background, neon, black, glitch, strong contrast, pinterest, trending on artstation', ', basic background, krenz cushart, mucha, ghibli, by joaquin sorolla rhads leyendecker, by ohara koson', ] if ',' not in prompt: prompt += enhancements1[settings['seed'] % len(enhancements1)] elif str(settings['seed'])[0] == '1': pass start = time.time() dream_res = dreamer(prompt, **settings) finish = time.time() b64_str = dream_res['images'][0] reply_file = base64.decodebytes(bytes(b64_str, 'utf-8')) latency = finish - start reply = '> generated "{}" for {} in {} s with seed:{} steps:{}'.format( to_send, name, str(latency)[:3], settings['seed'], settings['steps'] ) except BaseException as e: logging.exception(e) reply = '> error' await event.reply(reply, file=reply_file, link_preview=False) @client.on(events.MessageEdited) async def message_edited(event): await message_handler(event) @client.on(events.NewMessage) async def new_message(event): await message_handler(event) @client.on(events.NewMessage) async def detect_cats(event): chat = event.chat_id if chat not in [PROTOSPACE, TANNERCOIN, SANDBOX]: return if not event.photo: return try: photo = BytesIO() await client.download_media(event.photo, photo) photo.seek(0) res = requests.post("http://192.168.0.105:32168/v1/vision/detection", files={'image': photo}).json() print(res) for prediction in res['predictions']: if prediction['label'] == 'cat' and prediction['confidence'] > 0.70: logging.info('Found a cat!') await event.reply('Cat.') return except: logging.info('Failed to try and find a cat.') client.start() client.run_until_disconnected()