import os, logging DEBUG = os.environ.get('DEBUG') logging.basicConfig( #filename='protogram.log',# encoding='utf-8', format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) import asyncio import json import requests import time import pytz from datetime import datetime from bs4 import BeautifulSoup from telethon import TelegramClient, events import secrets TIMEZONE_CALGARY = pytz.timezone('America/Edmonton') bot = TelegramClient('data/bot', secrets.API_ID, secrets.API_HASH).start(bot_token=secrets.API_TOKEN) CHATGPT_TEMPLATE = '''Turn this forum post into an two paragraph instagram caption that tells about what a member of our makerspace has made. Add the hashtags: #makerspace #yyc #maker #diy #calgary and several relevant to the post at the end. Include a sentence explaining that this was made at Calgary Protospace, a makerspace that's non-profit and community ran. Only say the member's name once. Use no more than 1000 characters. Write in third person. Title: {} Member: {} Post Body: ``` {} ```''' try: data = json.load(open('data/data.json')) except: logging.info('data.json missing, initializing data.') data = {} if 'topics' not in data: data['topics'] = {} if 'states' not in data: data['states'] = {} def store_data(): with open('data/data.json', 'w') as f: json.dump(data, f, indent=4) def get_sorted_category_topic_ids(): API_TAG_URL = 'https://forum.protospace.ca/tags/c/18/protouse-consent.json?match_all_tags=true&page={}&tags[]=protouse-consent' topic_ids = [] try: for page in range(100): r = requests.get(API_TAG_URL.format(page)) r.raise_for_status() r = r.json() topics = r['topic_list']['topics'] ids = [str(t['id']) for t in topics] topic_ids.extend(ids) logging.info('Got {} topic IDs from page {}.'.format(len(ids), page)) if len(ids) == 0: break except BaseException as e: logging.error('Problem getting topic IDs: {} - {}'.format(e.__class__.__name__, str(e))) return False return sorted(topic_ids, reverse=True) def get_topic_details(topic_id): API_TOPIC_URL = 'https://forum.protospace.ca/t/{}.json' try: r = requests.get(API_TOPIC_URL.format(topic_id)) r.raise_for_status() r = r.json() logging.info('Got topic ID: {}.'.format(topic_id)) except BaseException as e: logging.error('Problem getting topic details: {} - {}'.format(e.__class__.__name__, str(e))) return False return r def fetch_missing_topics(topic_ids): for topic_id in topic_ids: if topic_id in data['topics']: continue time.sleep(1) topic = get_topic_details(topic_id) if not topic: continue data['topics'][topic_id] = topic store_data() logging.info('Fetched topic {}: {}'.format(topic_id, topic['title'])) def api_chatgpt(prompt): thread = [ dict(role='system', content='You are ChatGPT, a large language model trained by OpenAI. Answer as concisely as possible. Be terse.'), dict(role='user', content=prompt), ] data = dict( messages=thread, model='gpt-4-1106-preview', temperature=0.5, user='protogram', max_tokens=1000, ) headers = {'Authorization': 'Bearer ' + secrets.OPENAI_KEY} start = time.time() try: r = requests.post('https://api.openai.com/v1/chat/completions', json=data, headers=headers, timeout=40) r.raise_for_status() r = r.json() gpt_reply = r['choices'][0]['message']['content'] except BaseException as e: logging.error('Problem with chatgpt: {} - {}'.format(e.__class__.__name__, str(e))) return False end = time.time() logging.info('Got ChatGPT response in {}s:\n{}'.format(str(end - start), gpt_reply)) return gpt_reply def get_portal_name_from_discourse(username): try: params = dict(discourse_username=username) headers = {'Authorization': 'Bearer ' + secrets.PROTOGRAM_API_KEY} r = requests.get('https://api.my.protospace.ca/search/discourse/', params=params, headers=headers, timeout=5) r.raise_for_status() r = r.json() return r['member']['preferred_name'] except BaseException as e: logging.error('Problem with getting member name: {} - {}'.format(e.__class__.__name__, str(e))) return False def generate_caption(topic): title = topic['title'] username = topic['post_stream']['posts'][0]['username'] member = get_portal_name_from_discourse(username) if not member: return False logging.info('Converted discourse username {} -> {}'.format(username, member)) post_html = topic['post_stream']['posts'][0]['cooked'] soup = BeautifulSoup(post_html, 'html.parser') lines = soup.get_text().split('\n') filtered_lines = [line for line in lines if 'KB' not in line and 'MB' not in line] body = '\n'.join(filtered_lines).replace('\n\n\n', '\n\n') prompt = CHATGPT_TEMPLATE.format(title, member, body) logging.info('Generating caption for: {}\n{}'.format(title, body)) return api_chatgpt(prompt) def test_generate_topic_ids(topic_ids): logging.info('Test generating topic IDs {} with template:\n{}\n\n'.format(str(topic_ids), CHATGPT_TEMPLATE)) for topic_id in topic_ids: generate_caption(data['topics'][topic_id]) logging.info('Finished topic ID {}\n\n\n'.format(topic_id)) logging.info('Done.') def save_images_from_topic(topic_id, topic, state): link_counts = topic['post_stream']['posts'][0]['link_counts'] urls = [x['url'] for x in link_counts if x['url'].endswith('.jpeg')] # skip png for now if len(urls) == 0: logging.info('No photos found.') return False logging.info('Downloading {} photos...'.format(len(urls))) count = 0 for url in urls: logging.info('Downloading photo: {}'.format(url)) try: r = requests.get(url, timeout=10) r.raise_for_status() filename = 'data/photos/{}_{}.jpg'.format(topic_id, count) with open(filename, 'wb') as f: f.write(r.content) state['photos'].append(filename) store_data() count += 1 except BaseException as e: logging.error('Problem downloading photo: {} - {}'.format(e.__class__.__name__, str(e))) continue return count def find_next_valid_topic_id(topic_ids): for topic_id in topic_ids: try: if data['states'][topic_id]['status'] in ['POSTED', 'ERROR']: continue except KeyError: break else: # for loop return False return topic_id async def send_data_to_admin(state): try: await bot.send_message( secrets.ADMIN_TELEGRAM_ID, state['caption'], file=state['photos'], ) return True except BaseException as e: logging.error('Problem sending to admin: {} - {}'.format(e.__class__.__name__, str(e))) return False async def process_topics(): while True: #await asyncio.sleep(60) FRIDAY = 4 now = datetime.now(TIMEZONE_CALGARY) #if not (now.weekday() == FRIDAY and now.hour == 18 and now.minute == 15): # continue logging.info('Processing topics...') topic_ids = get_sorted_category_topic_ids() fetch_missing_topics(topic_ids) topic_id = find_next_valid_topic_id(topic_ids) if not topic_id: logging.info('No next valid topic ID found.') continue logging.info('Next valid topic ID: {}'.format(topic_id)) if topic_id not in data['states']: data['states'][topic_id] = dict( status='NEW', photos=[], caption=None ) topic = data['topics'][topic_id] state = data['states'][topic_id] count = save_images_from_topic(topic_id, topic, state) if not count: state['status'] = 'ERROR' store_data() continue caption = generate_caption(topic) state['caption'] = caption result = await send_data_to_admin(state) if result: state['status'] = 'POSTED' else: state['status'] = 'ERROR' store_data() print('done') while True: pass @bot.on(events.NewMessage(pattern='/start')) async def start(event): await event.respond('Hello world') raise events.StopPropagation @bot.on(events.NewMessage) async def new_message(event): if not event.raw_text: logging.info('No text found') return logging.info('Message: ' + event.raw_text) if event.sender.id != secrets.ADMIN_TELEGRAM_ID: logging.info('Message not from Admin') return await event.respond(event.text) def task_died(future): if os.environ.get('SHELL'): logging.error('Protogram task died!') else: logging.error('Protogram task died! Waiting 60s and exiting...') try: #controller_message('Protogram task died! Waiting 60s and exiting...') pass except: # we want this to succeed no matter what pass time.sleep(60) exit() if __name__ == '__main__': logging.info('===== BOOT UP =====') #topic_ids = get_sorted_category_topic_ids() #print(topic_ids) #fetch_missing_topics(topic_ids) #print('next valid:', find_next_valid_topic_id(topic_ids)) #generate_caption(data['topics']['5363']) #test_generate_topic_ids(['5174', '5381', '5363', '5205', '5273']) loop = asyncio.get_event_loop() a = loop.create_task(process_topics()).add_done_callback(task_died) loop.run_forever()