add substack.py top sites, replacing webworm.py

master
Jason Schwarzenberger 4 years ago
parent 283a2b1545
commit c5fe5d25a0
  1. 9
      apiserver/feed.py
  2. 160
      apiserver/feeds/substack.py
  3. 122
      apiserver/feeds/webworm.py

@ -7,7 +7,7 @@ import requests
import time import time
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from feeds import hackernews, reddit, tildes, webworm, manual from feeds import hackernews, reddit, tildes, substack, manual
OUTLINE_API = 'https://api.outline.com/v3/parse_article' OUTLINE_API = 'https://api.outline.com/v3/parse_article'
READ_API = 'http://127.0.0.1:33843' READ_API = 'http://127.0.0.1:33843'
@ -15,12 +15,15 @@ READ_API = 'http://127.0.0.1:33843'
INVALID_DOMAINS = ['youtube.com', 'bloomberg.com', 'wsj.com'] INVALID_DOMAINS = ['youtube.com', 'bloomberg.com', 'wsj.com']
TWO_DAYS = 60*60*24*2 TWO_DAYS = 60*60*24*2
webworm = substack.Publication("https://www.webworm.co")
def list(): def list():
feed = [] feed = []
feed += [(x, 'hackernews') for x in hackernews.feed()[:10]] feed += [(x, 'hackernews') for x in hackernews.feed()[:10]]
feed += [(x, 'tildes') for x in tildes.feed()[:10]] feed += [(x, 'tildes') for x in tildes.feed()[:10]]
feed += [(x, 'reddit') for x in reddit.feed()[:15]]
feed += [(x, 'webworm') for x in webworm.feed()[:15]] feed += [(x, 'webworm') for x in webworm.feed()[:15]]
feed += [(x, 'reddit') for x in reddit.feed()[:15]]
feed += [(x, 'substack') for x in substack.top.feed()[:15]]
return feed return feed
def get_article(url): def get_article(url):
@ -80,6 +83,8 @@ def update_story(story, is_manual=False):
res = tildes.story(story['ref']) res = tildes.story(story['ref'])
elif story['source'] == 'webworm': elif story['source'] == 'webworm':
res = webworm.story(story['ref']) res = webworm.story(story['ref'])
elif story['source'] == 'substack':
res = substack.top.story(story['ref'])
elif story['source'] == 'manual': elif story['source'] == 'manual':
res = manual.story(story['ref']) res = manual.story(story['ref'])

@ -0,0 +1,160 @@
import logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG)
if __name__ == '__main__':
import sys
sys.path.insert(0,'.')
import requests
from datetime import datetime
from utils import clean
SUBSTACK_API_TOP_POSTS = lambda x: "https://substack.com/api/v1/reader/top-posts"
def author_link(author_id, base_url):
return f"{base_url}/people/{author_id}"
def api_comments(post_id, base_url):
return f"{base_url}/api/v1/post/{post_id}/comments?all_comments=true&sort=best_first"
def api_stories(x, base_url):
return f"{base_url}/api/v1/archive?sort=new&search=&offset=0&limit=100"
def unix(date_str):
return int(datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
def api(route, ref=None):
try:
r = requests.get(route(ref), timeout=5)
if r.status_code != 200:
raise Exception('Bad response code ' + str(r.status_code))
return r.json()
except KeyboardInterrupt:
raise
except BaseException as e:
logging.error('Problem hitting Substack API: {}, trying again'.format(str(e)))
try:
r = requests.get(route(ref), timeout=15)
if r.status_code != 200:
raise Exception('Bad response code ' + str(r.status_code))
return r.json()
except KeyboardInterrupt:
raise
except BaseException as e:
logging.error('Problem hitting Substack API: {}'.format(str(e)))
return False
def comment(i):
if 'body' not in i:
return False
c = {}
c['date'] = unix(i.get('date'))
c['author'] = i.get('name', '')
c['score'] = i.get('reactions').get('')
c['text'] = clean(i.get('body', '') or '')
c['comments'] = [comment(j) for j in i['children']]
c['comments'] = list(filter(bool, c['comments']))
return c
class Publication:
def __init__(self, domain):
self.BASE_DOMAIN = domain
def feed(self):
stories = api(lambda x: api_stories(x, self.BASE_DOMAIN))
stories = list(filter(None, [i if i.get("audience") == "everyone" else None for i in stories]))
return [str(i.get("id")) for i in stories or []]
def story(self, ref):
stories = api(lambda x: api_stories(x, self.BASE_DOMAIN))
stories = list(filter(None, [i if i.get("audience") == "everyone" else None for i in stories]))
stories = list(filter(None, [i if str(i.get('id')) == ref else None for i in stories]))
if len(stories) == 0:
return False
r = stories[0]
if not r:
return False
s = {}
s['author'] = ''
s['author_link'] = ''
s['date'] = unix(r.get('post_date'))
s['score'] = r.get('reactions').get('')
s['title'] = r.get('title', '')
s['link'] = r.get('canonical_url', '')
s['url'] = r.get('canonical_url', '')
comments = api(lambda x: api_comments(x, self.BASE_DOMAIN), r.get('id'))
s['comments'] = [comment(i) for i in comments.get('comments')]
s['comments'] = list(filter(bool, s['comments']))
s['num_comments'] = r.get('comment_count', 0)
authors = list(filter(None, [self._bylines(byline) for byline in r.get('publishedBylines')]))
if len(authors):
s['author'] = authors[0].get('name')
s['author_link'] = authors[0].get('link')
return s
def _bylines(self, b):
if 'id' not in b:
return None
a = {}
a['name'] = b.get('name')
a['link'] = author_link(b.get('id'), self.BASE_DOMAIN)
return a
class Top:
def feed(self):
stories = api(SUBSTACK_API_TOP_POSTS)
stories = list(filter(None, [i if i.get("audience") == "everyone" else None for i in stories]))
return [str(i.get("id")) for i in stories or []]
def story(self, ref):
stories = api(SUBSTACK_API_TOP_POSTS)
stories = list(filter(None, [i if i.get("audience") == "everyone" else None for i in stories]))
stories = list(filter(None, [i if str(i.get('id')) == ref else None for i in stories]))
if len(stories) == 0:
return False
r = stories[0]
if not r:
return False
s = {}
pub = r.get('pub')
base_url = pub.get('base_url')
s['author'] = pub.get('author_name')
s['author_link'] = author_link(pub.get('author_id'), base_url)
s['date'] = unix(r.get('post_date'))
s['score'] = r.get('score')
s['title'] = r.get('title', '')
s['link'] = r.get('canonical_url', '')
s['url'] = r.get('canonical_url', '')
comments = api(lambda x: api_comments(x, base_url), r.get('id'))
s['comments'] = [comment(i) for i in comments.get('comments')]
s['comments'] = list(filter(bool, s['comments']))
s['num_comments'] = r.get('comment_count', 0)
return s
top = Top()
# scratchpad so I can quickly develop the parser
if __name__ == '__main__':
top_posts = top.feed()
print(top.story(top_posts[0]))
webworm = Publication("https://www.webworm.co/")
posts = webworm.feed()
print(posts[:1])
print(webworm.story(posts[0]))

@ -1,122 +0,0 @@
import logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG)
if __name__ == '__main__':
import sys
sys.path.insert(0,'.')
import requests
from datetime import datetime
from utils import clean
WEBWORM_DOMAIN = "https://www.webworm.co"
API_STORIES = lambda x: f"{WEBWORM_DOMAIN}/api/v1/archive?sort=new&search=&offset=0&limit=100"
#API_ITEM = lambda x : f'https://hn.algolia.com/api/v1/items/{x}'
API_ITEM_COMMENTS = lambda x: f"{WEBWORM_DOMAIN}/api/v1/post/{x}/comments?all_comments=true&sort=best_first"
SITE_LINK = lambda x: f"{WEBWORM_DOMAIN}/p/{x}"
SITE_AUTHOR_LINK = lambda x : f"{WEBWORM_DOMAIN}/people/{x}"
def api(route, ref=None):
try:
r = requests.get(route(ref), timeout=5)
if r.status_code != 200:
raise Exception('Bad response code ' + str(r.status_code))
return r.json()
except KeyboardInterrupt:
raise
except BaseException as e:
logging.error('Problem hitting Substack API: {}, trying again'.format(str(e)))
try:
r = requests.get(route(ref), timeout=15)
if r.status_code != 200:
raise Exception('Bad response code ' + str(r.status_code))
return r.json()
except KeyboardInterrupt:
raise
except BaseException as e:
logging.error('Problem hitting Substack API: {}'.format(str(e)))
return False
def feed():
stories = api(API_STORIES)
stories = list(filter(None, [None if i.get("audience") == "only_paid" else i for i in stories]))
return [str(i.get("id")) for i in stories or []]
def bylines(b):
if 'id' not in b:
return None
a = {}
a['name'] = b.get('name')
a['link'] = SITE_AUTHOR_LINK(b.get('id'))
return a
def unix(date_str):
return int(datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
def comment(i):
if 'body' not in i:
return False
c = {}
c['date'] = unix(i.get('date'))
c['author'] = i.get('name', '')
c['score'] = i.get('reactions').get('')
c['text'] = clean(i.get('body', '') or '')
c['comments'] = [comment(j) for j in i['children']]
c['comments'] = list(filter(bool, c['comments']))
return c
def comment_count(i):
alive = 1 if i['author'] else 0
return sum([comment_count(c) for c in i['comments']]) + alive
def story(ref):
stories = api(API_STORIES)
stories = list(filter(None, [None if i.get("audience") == "only_paid" else i for i in stories]))
stories = list(filter(None, [i if str(i.get('id')) == ref else None for i in stories]))
if len(stories) == 0:
print("no items")
return False
r = stories[0]
if not r:
print("not r")
return False
s = {}
s['author'] = ''
s['author_link'] = ''
s['date'] = unix(r.get('post_date'))
s['score'] = r.get('reactions').get('')
s['title'] = r.get('title', '')
s['link'] = r.get('canonical_url', '')
s['url'] = r.get('canonical_url', '')
comments = api(API_ITEM_COMMENTS, r.get('id'))
s['comments'] = [comment(i) for i in comments.get('comments')]
s['comments'] = list(filter(bool, s['comments']))
s['num_comments'] = r.get('comment_count', 0)
authors = list(filter(None, [bylines(byline) for byline in r.get('publishedBylines')]))
if len(authors):
s['author'] = authors[0].get('name')
s['author_link'] = authors[0].get('link')
if 'text' in r and r['text']:
s['text'] = clean(r['text'] or '')
return s
# scratchpad so I can quickly develop the parser
if __name__ == '__main__':
stories = feed()
print(stories)
print(story(stories[0]))
Loading…
Cancel
Save