Make specific to Fuse33 / Fabman

master
Tanner Collin 9 months ago
parent c4998709a9
commit 2d74cb11e5
  1. 199
      main.py
  2. 5
      secrets.py.example

@ -1,61 +1,90 @@
import os
import logging import logging
logging.basicConfig( logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO) level=logging.INFO)
TEST = os.environ.get('TEST', False)
DEBUG = os.environ.get('DEBUG', False)
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from queue import Empty from queue import Empty
import RPi.GPIO as GPIO
import os
import json import json
import requests import requests
import serial
import time import time
from signal import * from signal import *
if not TEST:
import serial
import RPi.GPIO as GPIO
import secrets import secrets
DEBUG = os.environ.get('DEBUG', False)
RELAY_PIN = 17 RELAY_PIN = 17
RFID_EN_PIN = 27 RFID_EN_PIN = 27
CARDS_FILE = 'card_data.json' CARDS_FILE = 'card_data.json'
OPEN_DURATION = 4 OPEN_DURATION = 4
VALID_PACKAGES = [
API_STATS = 'https://api.my.protospace.ca/stats/' 'Maker',
API_DOOR = 'https://api.my.protospace.ca/door/' 'Maker Plus',
API_SEEN = lambda x: 'https://api.my.protospace.ca/door/{}/seen/'.format(x) 'Maker Pro',
#'Storage bin rental',
#'Backyard Rental Spot',
'IndiCity Laser Space Rental',
#'Shipping container rental',
'Day Pass Holder',
'Access to everything 24/7',
'Barter Membership',
'Loft Member',
]
TEST_PIPE = '/tmp/airlock'
os.remove(TEST_PIPE)
API_MEMBERS = 'https://fabman.io/api/v1/members?limit=1000&embed=key&embed=activePackages&includeKeyToken=true'
ser = None ser = None
def unlock_door(): def unlock_door():
GPIO.output(RELAY_PIN, GPIO.HIGH) logging.info('Unlocking door...')
GPIO.output(RFID_EN_PIN, GPIO.HIGH)
time.sleep(OPEN_DURATION) if not TEST:
GPIO.output(RELAY_PIN, GPIO.HIGH)
GPIO.output(RFID_EN_PIN, GPIO.HIGH)
GPIO.output(RELAY_PIN, GPIO.LOW) time.sleep(OPEN_DURATION)
GPIO.output(RFID_EN_PIN, GPIO.LOW)
GPIO.output(RELAY_PIN, GPIO.LOW)
GPIO.output(RFID_EN_PIN, GPIO.LOW)
def lock_door_on_exit(*args): def lock_door_on_exit(*args):
logging.info('Exiting, locking door...') logging.info('Exiting, locking door...')
GPIO.output(RELAY_PIN, GPIO.LOW)
GPIO.output(RFID_EN_PIN, GPIO.LOW) if not TEST:
GPIO.output(RELAY_PIN, GPIO.LOW)
GPIO.output(RFID_EN_PIN, GPIO.LOW)
os._exit(0) os._exit(0)
def init(): def init():
global ser, cards global ser, cards
GPIO.setwarnings(False) if not TEST:
GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False)
GPIO.setup(RELAY_PIN, GPIO.OUT) GPIO.setmode(GPIO.BCM)
GPIO.output(RELAY_PIN, GPIO.LOW) GPIO.setup(RELAY_PIN, GPIO.OUT)
GPIO.setup(RFID_EN_PIN, GPIO.OUT) GPIO.output(RELAY_PIN, GPIO.LOW)
GPIO.output(RFID_EN_PIN, GPIO.LOW) GPIO.setup(RFID_EN_PIN, GPIO.OUT)
logging.info('GPIO initialized') GPIO.output(RFID_EN_PIN, GPIO.LOW)
logging.info('GPIO initialized')
ser = serial.Serial(port='/dev/ttyAMA0', baudrate=2400, timeout=0.1)
logging.info('Serial initialized') if TEST:
os.mkfifo(TEST_PIPE)
logging.info('Test pipe initialized')
else:
ser = serial.Serial(port='/dev/ttyAMA0', baudrate=2400, timeout=0.1)
logging.info('Serial initialized')
for sig in (SIGABRT, SIGILL, SIGINT, SIGSEGV, SIGTERM): for sig in (SIGABRT, SIGILL, SIGINT, SIGSEGV, SIGTERM):
signal(sig, lock_door_on_exit) signal(sig, lock_door_on_exit)
@ -65,89 +94,127 @@ def reader_thread(card_data_queue):
recent_scans = {} recent_scans = {}
with open(CARDS_FILE, 'r') as f: with open(CARDS_FILE, 'r') as f:
card_data = json.load(f) cards = json.load(f)
logging.info('Read {} card numbers from disk'.format(str(len(card_data)))) logging.info('Read {} cards from disk'.format(len(cards)))
while True: while True:
try: try:
card_data = card_data_queue.get_nowait() cards = card_data_queue.get_nowait()
except Empty: except Empty:
pass pass
card = ser.readline() if TEST:
with open(TEST_PIPE, 'r') as pipe:
card = pipe.readline()
else:
card = ser.readline()
if not card: continue if not card: continue
try: try:
card = card.decode().strip() card = card.decode().strip()
except AttributeError:
card = card.strip()
except UnicodeDecodeError: except UnicodeDecodeError:
continue continue
if len(card) != 10: continue if len(card) != 14: continue
# debounce card scans # debounce card scans
now = time.time() now = time.time()
if card in recent_scans: if card in recent_scans:
if now - recent_scans[card] < 5.0: if now - recent_scans[card] < 5.0:
logging.info('Debounce skipping card scan')
continue continue
recent_scans[card] = now recent_scans[card] = now
logging.info('Read card: ' + card) logging.info('Read card: ' + card)
if card in card_data: if card in cards:
logging.info('Card recognized') logging.info('Card recognized')
else: else:
logging.info('Card not recognized, denying access') logging.info('Card not recognized, denying access')
continue continue
logging.info('DOOR ACCESS - Card: {} | Name: {}'.format( card_data = cards[card]
card, card_data[card],
)) logging.info('Card belongs to: %s', card_data['name'])
if not any(package in card_data['packages'] for package in VALID_PACKAGES):
logging.info('No valid packages found: %s', str(card_data['packages']))
continue
logging.info('DOOR ACCESS GRANTED - Card: %s | Name: %s', card, card_data['name'])
unlock_door() unlock_door()
try: #try:
res = requests.post(API_SEEN(card), timeout=2) # res = requests.post(API_SEEN(card), timeout=2)
res.raise_for_status() # res.raise_for_status()
except BaseException as e: #except BaseException as e:
logging.error('Problem POSTing seen: {} - {}'.format(e.__class__.__name__, str(e))) # logging.error('Problem POSTing seen: {} - {}'.format(e.__class__.__name__, str(e)))
# continue
def get_cards(card_data_queue):
try:
headers = {'Authorization': 'Bearer ' + secrets.FABMAN_API_KEY}
res = requests.get(API_MEMBERS, headers=headers, timeout=10)
res.raise_for_status()
res = res.json()
except BaseException as e:
logging.exception('Problem GETting Fabman API: {} - {}'.format(e.__class__.__name__, str(e)))
return
members = res
cards = {}
logging.info('Got {} members from API'.format(str(len(res))))
for member in members:
if member['state'] != 'active':
continue continue
def update_thread(card_data_queue): packages = []
last_card_change = None
while True: for member_packages in member['_embedded']['memberPackages']:
time.sleep(5) package = member_packages['_embedded']['package']
try: if package['state'] != 'active':
res = requests.get(API_STATS, timeout=5) continue
res.raise_for_status()
res = res.json() packages.append(package['name'])
except BaseException as e:
logging.error('Problem GETting stats: {} - {}'.format(e.__class__.__name__, str(e))) key = member['_embedded']['key']
if not key:
continue continue
if res['last_card_change'] == last_card_change: if key['state'] != 'active':
continue continue
last_card_change = res['last_card_change']
logging.info('Cards changed, pulling update from API') token = key['token']
name = '{} {} ({})'.format(member['firstName'], member['lastName'], member['memberNumber'])
try: cards[token] = dict(name=name, packages=packages)
headers = {'Authorization': 'Bearer ' + secrets.DOOR_API_KEY}
res = requests.get(API_DOOR, headers=headers, timeout=5)
res.raise_for_status() logging.info('Processed {} cards'.format(len(cards)))
res = res.json()
except BaseException as e: card_data_queue.put(cards)
logging.error('Problem GETting door: {} - {}'.format(e.__class__.__name__, str(e)))
last_card_change = None logging.info('Writing data to file')
continue with open(CARDS_FILE, 'w') as f:
json.dump(cards, f, indent=4)
def update_thread(card_data_queue):
if not DEBUG: time.sleep(10)
while True:
logging.info('Updating cards...')
get_cards(card_data_queue)
logging.info('Got {} cards from API'.format(str(len(res)))) time.sleep(300)
card_data_queue.put(res)
logging.info('Writing data to file')
with open(CARDS_FILE, 'w') as f:
json.dump(res, f)
def watchdog_thread(): def watchdog_thread():
while True: while True:

@ -1,3 +1,2 @@
# Door cards API key # Fabman API key
# should be equal to the auth token value set in Spaceport FABMAN_API_KEY = ''
DOOR_API_KEY = ''

Loading…
Cancel
Save