microhard-sms-bridge/main.py

238 lines
6.6 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
from telethon import TelegramClient, events
import asyncio
from telnetlib import Telnet
import csv
import json
import re
import settings
bot = TelegramClient('data/bot', settings.API_ID, settings.API_HASH).start(bot_token=settings.API_TOKEN)
ESCAPE_CHAR = '\x1d'
CTRL_Z = '\x1a'
@bot.on(events.NewMessage(pattern='/start'))
async def start(event):
await event.reply('Hello world')
async def delete_message(mid):
logging.info('Deleting message ID: %s...', mid)
with Telnet('192.168.168.1', 23, timeout=10) as tn:
tn.read_until(b'login: ', timeout=4)
tn.write(b'admin\n')
tn.read_until(b'Password: ', timeout=4)
tn.write(settings.ROUTER_PASS.encode() + b'\n')
tn.read_until(b'UserDevice> ', timeout=4)
command = 'AT+CMGD={}\r\n'.format(str(mid))
tn.write(command.encode())
res = tn.read_until(b'UserDevice> ', timeout=4).decode()
logging.debug('Response data: %s', res)
if 'OK' in res:
logging.info('Successfully deleted.')
return True
else:
logging.info('Problem deleting: %s', res)
return False
async def send_message(number, text):
logging.info('Sending to %s message: %s', number, text)
with Telnet('192.168.168.1', 23, timeout=10) as tn:
tn.read_until(b'login: ', timeout=4)
tn.write(b'admin\n')
tn.read_until(b'Password: ', timeout=4)
tn.write(settings.ROUTER_PASS.encode() + b'\n')
tn.read_until(b'UserDevice> ', timeout=4)
command = 'AT+CMGS={}\r\n'.format(str(number))
tn.write(command.encode())
res = tn.read_until(b'whatever', timeout=4).decode()
logging.debug('Response data: %s', res)
message = text + CTRL_Z
tn.write(message.encode())
res = tn.read_until(b'UserDevice> ', timeout=6).decode()
logging.debug('Response data: %s', res)
if 'OK' in res:
logging.info('Successfully sent.')
return True
else:
logging.info('Problem sending, response: %s,', res)
return False
async def get_messages():
# get list of SMS messages from the Microhard modem
# parse them into a list and return
# handles no messages (empty response) fine
with Telnet('192.168.168.1', 23, timeout=10) as tn:
tn.read_until(b'login: ', timeout=4)
tn.write(b'admin\n')
tn.read_until(b'Password: ', timeout=4)
tn.write(settings.ROUTER_PASS.encode() + b'\n')
tn.read_until(b'UserDevice> ', timeout=4)
tn.write(b'AT+CMGL\r\n')
res = tn.read_until(b'UserDevice> ', timeout=4).decode()
logging.debug('Message data: %s', res)
if res.startswith('AT+CMGL\r\n'):
res = res[len('AT+CMGL\r\n'):]
if res.endswith('UserDevice> '):
res = res[:-len('UserDevice> ')]
data = res.split('+CMGL: ')
# remove first blank entry
data = data[1:]
messages = []
for entry in data:
metadata, text = entry.split('\r\r\n', 1)
x = csv.reader([metadata], delimiter=',', quotechar='"')
mid, status, sender, _, date = list(x)[0]
logging.info('SMS: %s %s %s %s', mid, status, sender, date)
lines = text.strip().split('\r\n')
clean = '\n'.join([x.strip() for x in lines])
logging.info(' text: %s', clean)
message = dict(mid=mid, status=status, sender=sender, date=date, text=clean)
messages.append(message)
return messages
async def check_messages():
messages = await get_messages()
if not len(messages):
logging.debug('No messages found')
return
logging.info('Get messages result:\n%s', json.dumps(messages, indent=4))
for message in messages:
bot_message = '[Receive SMS]\nFrom: {}\nContent: {}'.format(
message['sender'],
message['text'],
)
tg = await bot.send_message(settings.TANNER_ID, bot_message, link_preview=False)
logging.info('Sent TG bot message %s', tg.id)
await delete_message(message['mid'])
@bot.on(events.NewMessage(pattern='/send'))
async def send(event):
logging.info('Send command: %s', event.raw_text)
data = event.raw_text.split(' ', 1)
try:
_, number, message = event.raw_text.split(' ', 2)
except:
logging.info('Bad command args')
await event.reply('Bad command args, usage: /send [number] [message]')
return
if message[0].isdigit():
logging.info('Message starts with digit, possible spaces in number.')
await event.reply('Phone number may have a space, aborting.')
return
if not re.match(r'^\+?[1-9]\d{1,14}$', number):
logging.info('Bad number: %s', number)
await event.reply('Bad number character. Format: +1231231234 or 1231231234')
return
res = await send_message(number, message)
if res:
await event.reply('ok')
else:
await event.reply('Error sending SMS.')
@bot.on(events.NewMessage(pattern='^(?!\/).*$'))
async def new_message(event):
reply_id = event.message.reply_to_msg_id
logging.info('=> Telegram - id: {}, reply_id: {}, sender: {} ({}), text: {}'.format(
event.message.id, reply_id, event.sender.first_name, event.sender_id, event.raw_text
))
if not reply_id:
logging.info(' Not a reply, ignoring')
return
if not event.raw_text:
logging.info(' No text, aborting.')
await event.reply('Error: No text found. Media not supported yet.', silent=True)
return
original = await event.message.get_reply_message()
logging.info('Original message contents:\n%s', original.raw_text)
from_field = original.raw_text.split('From: ', 1)[1].split()[0]
logging.info('Original number found: %s', from_field)
res = await send_message(from_field, event.raw_text)
if not res:
await event.reply('Error sending SMS.')
async def main():
await asyncio.sleep(1)
while True:
await asyncio.sleep(10)
try:
logging.debug('Checking for messages...')
await check_messages()
except KeyboardInterrupt:
raise
except BaseException as e:
logging.error('Problem checking messages: {} - {}'.format(e.__class__.__name__, str(e)))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(main())
bot.run_until_disconnected()