Add motion control server

This commit is contained in:
Tanner 2025-02-11 14:30:05 -07:00
parent 9f394947a3
commit 9d37ff6128

93
main.py Normal file
View File

@ -0,0 +1,93 @@
import os
import logging
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if os.environ.get('DEBUG') else logging.INFO)
logging.info('')
logging.info('BOOT UP')
import asyncio
import aiomqtt
import dyn4
dmm = None
async def process_mqtt(message):
try:
text = message.payload.decode()
except UnicodeDecodeError:
return
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if topic == 'motion/set_rpm':
rpm = int(float(text))
dmm.set_speed(rpm)
async def monitor_mqtt():
await asyncio.sleep(1)
# from https://sbtinstruments.github.io/aiomqtt/reconnection.html
# modified to make new client since their code didn't work
# https://github.com/sbtinstruments/aiomqtt/issues/269
while True:
try:
async with aiomqtt.Client('localhost') as client:
await client.subscribe('motion/#')
async for message in client.messages:
await process_mqtt(message)
except aiomqtt.MqttError:
logging.info('MQTT connection lost, reconnecting in 5 seconds...')
await asyncio.sleep(5)
async def monitor_dyn4():
while True:
await asyncio.sleep(0.1)
if not dmm:
continue
pos = dmm.read_AbsPos32()
encoder_ppr = 65536
revolutions = pos / 65536
async with aiomqtt.Client('localhost') as client:
message = str(revolutions)
topic = 'server/position'
await client.publish(topic, payload=message.encode())
async def init():
global dmm
dmm = dyn4.DMMDrive('/dev/ttyUSB0', 0)
dmm.set_speed(0)
def task_died(future):
if os.environ.get('SHELL'):
logging.error('Motion control task died!')
else:
logging.error('Motion control task died! Waiting 60s and exiting...')
time.sleep(60)
exit()
def main():
loop = asyncio.get_event_loop()
a = loop.create_task(monitor_dyn4()).add_done_callback(task_died)
b = loop.create_task(monitor_mqtt()).add_done_callback(task_died)
z = loop.create_task(init())
loop.run_forever()
if __name__ == '__main__':
main()