Finish prototype bridge that can check text messages

This commit is contained in:
Tanner Collin 2024-10-01 19:24:18 +00:00
parent 2c2f7e60db
commit 552f8c3c21
2 changed files with 150 additions and 0 deletions

143
main.py Normal file
View File

@ -0,0 +1,143 @@
import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
from telethon import TelegramClient, events
import asyncio
from telnetlib import Telnet
import csv
import json
import settings
bot = TelegramClient('data/bot', settings.API_ID, settings.API_HASH).start(bot_token=settings.API_TOKEN)
ESCAPE_CHAR = '\x1d'
@bot.on(events.NewMessage(pattern='/start'))
async def start(event):
await event.reply('Hello world')
async def delete_message(mid):
logging.info('Deleting message ID: %s...', mid)
with Telnet('192.168.168.1', 23, timeout=10) as tn:
tn.read_until(b'login: ')
tn.write(b'admin\n')
tn.read_until(b'Password: ')
tn.write(settings.ROUTER_PASS.encode() + b'\n')
tn.read_until(b'UserDevice> ')
command = 'AT+CMGD={}\r\n'.format(str(mid))
tn.write(command.encode())
res = tn.read_until(b'UserDevice> ', timeout=4).decode()
logging.debug('Response data: %s', res)
if 'OK' in res:
logging.info('Successfully deleted.')
return True
else:
logging.info('Problem deleting: %s', res)
return False
async def get_messages():
# get list of SMS messages from the Microhard modem
# parse them into a list and return
# handles no messages (empty response) fine
with Telnet('192.168.168.1', 23, timeout=10) as tn:
tn.read_until(b'login: ')
tn.write(b'admin\n')
tn.read_until(b'Password: ')
tn.write(settings.ROUTER_PASS.encode() + b'\n')
tn.read_until(b'UserDevice> ')
tn.write(b'AT+CMGL\r\n')
res = tn.read_until(b'UserDevice> ', timeout=4).decode()
logging.debug('Message data: %s', res)
if res.startswith('AT+CMGL\r\n'):
res = res[len('AT+CMGL\r\n'):]
if res.endswith('UserDevice> '):
res = res[:-len('UserDevice> ')]
data = res.split('+CMGL: ')
# remove first blank entry
data = data[1:]
messages = []
for entry in data:
metadata, text = entry.split('\r\r\n', 1)
x = csv.reader([metadata], delimiter=',', quotechar='"')
mid, status, sender, _, date = list(x)[0]
logging.info('SMS: %s %s %s %s', mid, status, sender, date)
lines = text.strip().split('\r\n')
clean = '\n'.join([x.strip() for x in lines])
logging.info(' text: %s', clean)
message = dict(mid=mid, status=status, sender=sender, date=date, text=clean)
messages.append(message)
return messages
async def check_messages():
messages = await get_messages()
if not len(messages):
logging.debug('No messages found')
return
logging.info('Get messages result:\n%s', json.dumps(messages, indent=4))
for message in messages:
bot_message = '[Receive SMS]\nFrom: {}\nContent: {}'.format(
message['sender'],
message['text'],
)
tg = await bot.send_message(settings.TANNER_ID, bot_message, link_preview=False)
logging.info('Sent TG bot message %s', tg.id)
await delete_message(message['mid'])
async def main():
await asyncio.sleep(1)
while True:
await asyncio.sleep(10)
try:
logging.debug('Checking for messages...')
await check_messages()
except KeyboardInterrupt:
raise
except BaseException as e:
logging.error('Problem checking messages: {} - {}'.format(e.__class__.__name__, str(e)))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(main())
bot.run_until_disconnected()

7
settings.py.example Normal file
View File

@ -0,0 +1,7 @@
API_ID = 0
API_HASH = ''
API_TOKEN = ''
TANNER_ID = 0
ROUTER_PASS = ''