From 247715a76e8544c8ee9d126e2702679814e25ec4 Mon Sep 17 00:00:00 2001 From: Jason Schwarzenberger Date: Wed, 25 Nov 2020 12:34:46 +1300 Subject: [PATCH] adjust feed thread. --- apiserver/server.py | 47 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/apiserver/server.py b/apiserver/server.py index f96203f..b3aedc4 100644 --- a/apiserver/server.py +++ b/apiserver/server.py @@ -13,6 +13,7 @@ import json import threading import traceback import time +from datetime import datetime, timedelta from urllib.parse import urlparse, parse_qs import settings @@ -154,10 +155,10 @@ def _add_new_refs(): try: nid = new_id() database.put_ref(ref, nid, source, urlref) - added.append(ref) logging.info('Added ref ' + ref) + added.append(ref) except database.IntegrityError: - logging.info('Unable to add ref ' + ref) + #logging.info('Unable to add ref ' + ref) continue return added @@ -180,34 +181,60 @@ def _update_current_story(item): database.del_ref(item['ref']) logging.info('Removed ref {}'.format(item['ref'])) + def feed_thread(): - ref_list = [] + new_refs = [] + update_refs = [] + last_check = datetime.now() - timedelta(minutes=20) try: while True: # onboard new stories - if not len(ref_list): + time_since_check = datetime.now() - last_check + if not len(new_refs) and time_since_check > timedelta(minutes=15): added = _add_new_refs() ref_list = database.get_reflist() - ref_list.sort(key=lambda i: i['ref'] in added, reverse=True) + new_refs = list(filter(None, [i if i['ref'] in added else None for i in ref_list])) + update_queue = list(filter(None, [i if i['ref'] not in added else None for i in ref_list])) + current_queue_refs = [i['ref'] for i in update_refs] + update_queue = list(filter(None, [i if i['ref'] not in current_queue_refs else None for i in update_queue])) + update_refs += update_queue + logging.info('Added {} new refs'.format(len(added))) + logging.info('Have {} refs in update queue'.format(len(current_queue_refs))) + logging.info('Fetched {} refs for update queue'.format(len(update_queue))) + last_check = datetime.now() + gevent.sleep(1) + + # update new stories + if len(new_refs): + item = new_refs.pop(0) + logging.info('Processing new story ref {}'.format(item['ref'])) + _update_current_story(item) + gevent.sleep(1) # update current stories - if len(ref_list): - item = ref_list.pop(0) + if len(update_refs): + item = update_refs.pop(0) + logging.info('Processing existing story ref {}'.format(item['ref'])) _update_current_story(item) + gevent.sleep(1) - gevent.sleep(6) + gevent.sleep(1) except KeyboardInterrupt: logging.info('Ending feed thread...') except ValueError as e: logging.error('feed_thread error: {} {}'.format(e.__class__.__name__, e)) - http_server.stop() + + http_server.stop() + gevent.kill(feed_thread_ref) + print('Starting Feed thread...') -gevent.spawn(feed_thread) +feed_thread_ref = gevent.spawn(feed_thread) print('Starting HTTP thread...') try: http_server.serve_forever() except KeyboardInterrupt: + gevent.kill(feed_thread_ref) logging.info('Exiting...')