Finish PoC that messages the result over Telegram

This commit is contained in:
Tanner Collin 2024-01-06 22:50:08 +00:00
parent 4b35ec7700
commit ae416242e2

344
main.py
View File

@ -0,0 +1,344 @@
import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
#filename='protogram.log',# encoding='utf-8',
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import asyncio
import json
import requests
import time
import pytz
from datetime import datetime
from bs4 import BeautifulSoup
from telethon import TelegramClient, events
import secrets
TIMEZONE_CALGARY = pytz.timezone('America/Edmonton')
bot = TelegramClient('data/bot', secrets.API_ID, secrets.API_HASH).start(bot_token=secrets.API_TOKEN)
CHATGPT_TEMPLATE = '''Turn this forum post into an two paragraph instagram
caption that tells about what a member of our makerspace has made. Add the
hashtags: #makerspace #yyc #maker #diy #calgary and several relevant to the post
at the end. Include a sentence explaining that this was made at Calgary
Protospace, a makerspace that's non-profit and community ran. Only say the
member's name once. Use no more than 1000 characters. Write in third person.
Title: {}
Member: {}
Post Body:
```
{}
```'''
try:
data = json.load(open('data/data.json'))
except:
logging.info('data.json missing, initializing data.')
data = {}
if 'topics' not in data: data['topics'] = {}
if 'states' not in data: data['states'] = {}
def store_data():
with open('data/data.json', 'w') as f:
json.dump(data, f, indent=4)
def get_sorted_category_topic_ids():
API_TAG_URL = 'https://forum.protospace.ca/tags/c/18/protouse-consent.json?match_all_tags=true&page={}&tags[]=protouse-consent'
topic_ids = []
try:
for page in range(100):
r = requests.get(API_TAG_URL.format(page))
r.raise_for_status()
r = r.json()
topics = r['topic_list']['topics']
ids = [str(t['id']) for t in topics]
topic_ids.extend(ids)
logging.info('Got {} topic IDs from page {}.'.format(len(ids), page))
if len(ids) == 0:
break
except BaseException as e:
logging.error('Problem getting topic IDs: {} - {}'.format(e.__class__.__name__, str(e)))
return False
return sorted(topic_ids, reverse=True)
def get_topic_details(topic_id):
API_TOPIC_URL = 'https://forum.protospace.ca/t/{}.json'
try:
r = requests.get(API_TOPIC_URL.format(topic_id))
r.raise_for_status()
r = r.json()
logging.info('Got topic ID: {}.'.format(topic_id))
except BaseException as e:
logging.error('Problem getting topic details: {} - {}'.format(e.__class__.__name__, str(e)))
return False
return r
def fetch_missing_topics(topic_ids):
for topic_id in topic_ids:
if topic_id in data['topics']:
continue
time.sleep(1)
topic = get_topic_details(topic_id)
if not topic:
continue
data['topics'][topic_id] = topic
store_data()
logging.info('Fetched topic {}: {}'.format(topic_id, topic['title']))
def api_chatgpt(prompt):
thread = [
dict(role='system', content='You are ChatGPT, a large language model trained by OpenAI. Answer as concisely as possible. Be terse.'),
dict(role='user', content=prompt),
]
data = dict(
messages=thread,
model='gpt-4-1106-preview',
temperature=0.5,
user='protogram',
max_tokens=1000,
)
headers = {'Authorization': 'Bearer ' + secrets.OPENAI_KEY}
start = time.time()
try:
r = requests.post('https://api.openai.com/v1/chat/completions', json=data, headers=headers, timeout=40)
r.raise_for_status()
r = r.json()
gpt_reply = r['choices'][0]['message']['content']
except BaseException as e:
logging.error('Problem with chatgpt: {} - {}'.format(e.__class__.__name__, str(e)))
return False
end = time.time()
logging.info('Got ChatGPT response in {}s:\n{}'.format(str(end - start), gpt_reply))
return gpt_reply
def get_portal_name_from_discourse(username):
try:
params = dict(discourse_username=username)
headers = {'Authorization': 'Bearer ' + secrets.PROTOGRAM_API_KEY}
r = requests.get('https://api.my.protospace.ca/search/discourse/', params=params, headers=headers, timeout=5)
r.raise_for_status()
r = r.json()
return r['member']['preferred_name']
except BaseException as e:
logging.error('Problem with getting member name: {} - {}'.format(e.__class__.__name__, str(e)))
return False
def generate_caption(topic):
title = topic['title']
username = topic['post_stream']['posts'][0]['username']
member = get_portal_name_from_discourse(username)
if not member:
return False
logging.info('Converted discourse username {} -> {}'.format(username, member))
post_html = topic['post_stream']['posts'][0]['cooked']
soup = BeautifulSoup(post_html, 'html.parser')
lines = soup.get_text().split('\n')
filtered_lines = [line for line in lines if 'KB' not in line and 'MB' not in line]
body = '\n'.join(filtered_lines).replace('\n\n\n', '\n\n')
prompt = CHATGPT_TEMPLATE.format(title, member, body)
logging.info('Generating caption for: {}\n{}'.format(title, body))
return api_chatgpt(prompt)
def test_generate_topic_ids(topic_ids):
logging.info('Test generating topic IDs {} with template:\n{}\n\n'.format(str(topic_ids), CHATGPT_TEMPLATE))
for topic_id in topic_ids:
generate_caption(data['topics'][topic_id])
logging.info('Finished topic ID {}\n\n\n'.format(topic_id))
logging.info('Done.')
def save_images_from_topic(topic_id, topic, state):
link_counts = topic['post_stream']['posts'][0]['link_counts']
urls = [x['url'] for x in link_counts if x['url'].endswith('.jpeg')] # skip png for now
if len(urls) == 0:
logging.info('No photos found.')
return False
logging.info('Downloading {} photos...'.format(len(urls)))
count = 0
for url in urls:
logging.info('Downloading photo: {}'.format(url))
try:
r = requests.get(url, timeout=10)
r.raise_for_status()
filename = 'data/photos/{}_{}.jpg'.format(topic_id, count)
with open(filename, 'wb') as f:
f.write(r.content)
state['photos'].append(filename)
store_data()
count += 1
except BaseException as e:
logging.error('Problem downloading photo: {} - {}'.format(e.__class__.__name__, str(e)))
continue
return count
def find_next_valid_topic_id(topic_ids):
for topic_id in topic_ids:
try:
if data['states'][topic_id]['status'] in ['POSTED', 'ERROR']:
continue
except KeyError:
break
else: # for loop
return False
return topic_id
async def send_data_to_admin(state):
try:
await bot.send_message(
secrets.ADMIN_TELEGRAM_ID,
state['caption'],
file=state['photos'],
)
return True
except BaseException as e:
logging.error('Problem sending to admin: {} - {}'.format(e.__class__.__name__, str(e)))
return False
async def process_topics():
while True:
#await asyncio.sleep(60)
FRIDAY = 4
now = datetime.now(TIMEZONE_CALGARY)
#if not (now.weekday() == FRIDAY and now.hour == 18 and now.minute == 15):
# continue
logging.info('Processing topics...')
topic_ids = get_sorted_category_topic_ids()
fetch_missing_topics(topic_ids)
topic_id = find_next_valid_topic_id(topic_ids)
if not topic_id:
logging.info('No next valid topic ID found.')
continue
logging.info('Next valid topic ID: {}'.format(topic_id))
if topic_id not in data['states']:
data['states'][topic_id] = dict(
status='NEW',
photos=[],
caption=None
)
topic = data['topics'][topic_id]
state = data['states'][topic_id]
count = save_images_from_topic(topic_id, topic, state)
if not count:
state['status'] = 'ERROR'
store_data()
continue
caption = generate_caption(topic)
state['caption'] = caption
result = await send_data_to_admin(state)
if result:
state['status'] = 'POSTED'
else:
state['status'] = 'ERROR'
store_data()
print('done')
while True: pass
@bot.on(events.NewMessage(pattern='/start'))
async def start(event):
await event.respond('Hello world')
raise events.StopPropagation
@bot.on(events.NewMessage)
async def new_message(event):
if not event.raw_text:
logging.info('No text found')
return
logging.info('Message: ' + event.raw_text)
if event.sender.id != secrets.ADMIN_TELEGRAM_ID:
logging.info('Message not from Admin')
return
await event.respond(event.text)
def task_died(future):
if os.environ.get('SHELL'):
logging.error('Protogram task died!')
else:
logging.error('Protogram task died! Waiting 60s and exiting...')
try:
#controller_message('Protogram task died! Waiting 60s and exiting...')
pass
except: # we want this to succeed no matter what
pass
time.sleep(60)
exit()
if __name__ == '__main__':
logging.info('===== BOOT UP =====')
#topic_ids = get_sorted_category_topic_ids()
#print(topic_ids)
#fetch_missing_topics(topic_ids)
#print('next valid:', find_next_valid_topic_id(topic_ids))
#generate_caption(data['topics']['5363'])
#test_generate_topic_ids(['5174', '5381', '5363', '5205', '5273'])
loop = asyncio.get_event_loop()
a = loop.create_task(process_topics()).add_done_callback(task_died)
loop.run_forever()