Lock screen on MQTT command

This commit is contained in:
2024-12-03 13:18:56 -07:00
parent a683f9d976
commit f231a4d010
+48 -1
View File
@@ -5,11 +5,58 @@ logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO) level=logging.DEBUG if DEBUG else logging.INFO)
import subprocess import subprocess
import socket
import asyncio
from aiomqtt import Client, TLSParameters
tls_params = TLSParameters(
ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem',
)
HOSTNAME = socket.gethostname()
def lock_screen(): def lock_screen():
subprocess.run(['xdg-screensaver', 'lock']) subprocess.run(['xdg-screensaver', 'lock'])
async def process_lock_command(text):
target = text
logging.info('Lock target: %s', text)
if target == HOSTNAME or target == 'all':
logging.info('Locking screen...')
lock_screen()
async def process_mqtt(message):
text = message.payload.decode()
topic = message.topic.value
logging.debug('MQTT topic: %s, message: %s', topic, text)
if topic.startswith('lock'):
await process_lock_command(text)
else:
logging.debug('Invalid topic, returning')
return
async def fetch_mqtt():
await asyncio.sleep(3)
async with Client(
hostname='mqtt-dev.dns.t0.vc',
port=8081,
#username='reader',
#password=secrets.MQTT_READER_PASSWORD,
tls_params=tls_params,
) as client:
await client.subscribe('#')
async for message in client.messages:
loop = asyncio.get_event_loop()
loop.create_task(process_mqtt(message))
if __name__ == '__main__': if __name__ == '__main__':
lock_screen() logging.info('')
logging.info('==========================')
logging.info('Booting up...')
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_mqtt())