You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

122 lines
3.6 KiB

import logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG)
if __name__ == '__main__':
import sys
sys.path.insert(0,'.')
import requests
from datetime import datetime
from utils import clean
WEBWORM_DOMAIN = "https://www.webworm.co"
API_STORIES = lambda x: f"{WEBWORM_DOMAIN}/api/v1/archive?sort=new&search=&offset=0&limit=100"
#API_ITEM = lambda x : f'https://hn.algolia.com/api/v1/items/{x}'
API_ITEM_COMMENTS = lambda x: f"{WEBWORM_DOMAIN}/api/v1/post/{x}/comments?all_comments=true&sort=best_first"
SITE_LINK = lambda x: f"{WEBWORM_DOMAIN}/p/{x}"
SITE_AUTHOR_LINK = lambda x : f"{WEBWORM_DOMAIN}/people/{x}"
def api(route, ref=None):
try:
r = requests.get(route(ref), timeout=5)
if r.status_code != 200:
raise Exception('Bad response code ' + str(r.status_code))
return r.json()
except KeyboardInterrupt:
raise
except BaseException as e:
logging.error('Problem hitting Substack API: {}, trying again'.format(str(e)))
try:
r = requests.get(route(ref), timeout=15)
if r.status_code != 200:
raise Exception('Bad response code ' + str(r.status_code))
return r.json()
except KeyboardInterrupt:
raise
except BaseException as e:
logging.error('Problem hitting Substack API: {}'.format(str(e)))
return False
def feed():
stories = api(API_STORIES)
stories = list(filter(None, [None if i.get("audience") == "only_paid" else i for i in stories]))
return [str(i.get("id")) for i in stories or []]
def bylines(b):
if 'id' not in b:
return None
a = {}
a['name'] = b.get('name')
a['link'] = SITE_AUTHOR_LINK(b.get('id'))
return a
def unix(date_str):
return int(datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
def comment(i):
if 'body' not in i:
return False
c = {}
c['date'] = unix(i.get('date'))
c['author'] = i.get('name', '')
c['score'] = i.get('reactions').get('')
c['text'] = clean(i.get('body', '') or '')
c['comments'] = [comment(j) for j in i['children']]
c['comments'] = list(filter(bool, c['comments']))
return c
def comment_count(i):
alive = 1 if i['author'] else 0
return sum([comment_count(c) for c in i['comments']]) + alive
def story(ref):
stories = api(API_STORIES)
stories = list(filter(None, [None if i.get("audience") == "only_paid" else i for i in stories]))
stories = list(filter(None, [i if str(i.get('id')) == ref else None for i in stories]))
if len(stories) == 0:
print("no items")
return False
r = stories[0]
if not r:
print("not r")
return False
s = {}
s['author'] = ''
s['author_link'] = ''
s['date'] = unix(r.get('post_date'))
s['score'] = r.get('reactions').get('')
s['title'] = r.get('title', '')
s['link'] = r.get('canonical_url', '')
s['url'] = r.get('canonical_url', '')
comments = api(API_ITEM_COMMENTS, r.get('id'))
s['comments'] = [comment(i) for i in comments.get('comments')]
s['comments'] = list(filter(bool, s['comments']))
s['num_comments'] = r.get('comment_count', 0)
authors = list(filter(None, [bylines(byline) for byline in r.get('publishedBylines')]))
if len(authors):
s['author'] = authors[0].get('name')
s['author_link'] = authors[0].get('link')
if 'text' in r and r['text']:
s['text'] = clean(r['text'] or '')
return s
# scratchpad so I can quickly develop the parser
if __name__ == '__main__':
stories = feed()
print(stories)
print(story(stories[0]))