From 47ad8b575abf627f441a4ab3fb832dba1194e591 Mon Sep 17 00:00:00 2001 From: Tanner Collin Date: Mon, 23 Sep 2019 04:52:24 +0000 Subject: [PATCH] Protect main OTP against unauthed changes This is a problem if someone sends: 000000handshake xyz ...because they will have set our OTP to xyz and could guess codes. Fixed by using a separate OTP object for handshaking only. --- README.md | 10 +++++---- bwb/common.py | 62 +++++++++++++++++++++++++++++---------------------- setup.py | 2 +- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index eae0e09..95145fb 100644 --- a/README.md +++ b/README.md @@ -31,11 +31,10 @@ event.respond('000000handshake ' + bwb.handshake(data)) On `000000handshake [data]`: ```text -event.respond(bwb.wrap('secret ' + bwb.secret(data))) -bwb.set_otp(bwb.init_secret) +event.respond(bwb.wrap('secret ' + bwb.secret(data), handshake=True)) ``` -On _OTP authed_ `123456secret [data]`: +On _Handshake OTP authed_ `123456secret [data]`: ```text bwb.set_secret(data) @@ -62,6 +61,9 @@ if text.startswith('!'): ... elif text.startswith('000000'): text = text[6:] +elif bwb.check_auth(text, handshake=True): + handshake_authed = True + text = text[6:] elif bwb.check_auth(text): authed = True text = text[6:] @@ -74,7 +76,7 @@ Use `bwb.wrap()` to auth and encode outgoing commands. Params: ```text -wrap(text, target=None, b58=False, enc=False) +wrap(text, handshake=False, target=None, b58=False, enc=False) ``` Examples: diff --git a/bwb/common.py b/bwb/common.py index 25345f4..ab562d0 100644 --- a/bwb/common.py +++ b/bwb/common.py @@ -20,21 +20,11 @@ from cryptography.hazmat.primitives.kdf.hkdf import HKDF OTP_LOOKAHEAD = 25 -class common: - def __init__(self): - self.init_secret = '' - self.key = None - self.master_pub = None - self.otp = None - self.otp_secret = '' +class OTP: + def __init__(self, secret): + self.otp = HOTP(secret.encode(), 6, SHA1(), backend=default_backend(), enforce_key_length=False) self.otp_count = 0 - def set_otp(self, otp_secret): - self.otp = HOTP(otp_secret.encode(), 6, SHA1(), backend=default_backend(), enforce_key_length=False) - self.otp_secret = otp_secret - self.otp_count = 0 - return True - def get_otp(self, target): if not self.otp: raise RuntimeError('bwb not init yet') @@ -55,7 +45,7 @@ class common: self.otp_count += 1 return code - def check_otp(self, code): + def check_otp(self, code, me): if not self.otp: return False @@ -65,11 +55,20 @@ class common: if otp_at == str(code): # broadcast self.otp_count = count + 1 return True - elif self.TELEGRAM_ID % int(otp_at) == int(code): + elif me % int(otp_at) == int(code): self.otp_count = count + 1 return True return False +class common: + def __init__(self): + self.key = None + self.master_pub = None + self.init_secret = '' + self.handshake_otp = None + self.enc_secret = '' + self.otp = None + def to_b58(self, text): return 'l' + base58.b58encode(text.encode()).decode() @@ -83,7 +82,7 @@ class common: def enc(self, text, key=None): if not key: - key = self.otp_secret.encode() + key = self.enc_secret.encode() if not key: raise RuntimeError('bwb not init yet') @@ -105,7 +104,7 @@ class common: def dec(self, ciphertext, key=None): if not key: - key = self.otp_secret.encode() + key = self.enc_secret.encode() if not key: return False # so we can run every message through if not ciphertext.startswith('I'): @@ -132,11 +131,13 @@ class common: except BaseException as e: return False - def wrap(self, text, target=None, b58=False, enc=False): - if target: - code = self.get_otp(target) + def wrap(self, text, handshake=False, target=None, b58=False, enc=False): + if handshake: + code = self.handshake_otp.get_otp_cast() + elif target: + code = self.otp.get_otp(target) else: - code = self.get_otp_cast() + code = self.otp.get_otp_cast() text = code + text @@ -150,13 +151,18 @@ class common: def parse(self, text): return self.dec(text) or self.from_b58(text) or text - def check_auth(self, text): + def check_auth(self, text, handshake=False): try: int(text[:6]) except ValueError: return False - if self.check_otp(text[:6]): + if handshake: + check = self.handshake_otp + else: + check = self.otp + + if check.check_otp(text[:6], me=self.TELEGRAM_ID): return True else: return False @@ -168,18 +174,19 @@ class common: def init(self): self.init_secret = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(14)) + self.otp = OTP(self.init_secret) return self.get_pub() def handshake(self, text): self.init_secret = '' # clear it self.master_pub = X25519PublicKey.from_public_bytes(base58.b58decode(text)) slave_pub = self.get_pub() - self.set_otp(slave_pub) + self.handshake_otp = OTP(slave_pub) return slave_pub def secret(self, text): if not self.init_secret: return - self.set_otp(text) + self.handshake_otp = OTP(text) slave_pub = X25519PublicKey.from_public_bytes(base58.b58decode(text)) shared_key = self.key.exchange(slave_pub) derived_key = HKDF(SHA256(), length=32, salt=b'Qot.', info=None, backend=default_backend()).derive(shared_key) @@ -188,5 +195,6 @@ class common: def set_secret(self, text): shared_key = self.key.exchange(self.master_pub) derived_key = HKDF(SHA256(), length=32, salt=b'Qot.', info=None, backend=default_backend()).derive(shared_key) - otp_secret = self.dec(text, derived_key) - return self.set_otp(otp_secret) + secret = self.dec(text, derived_key) + self.otp = OTP(secret) + return True diff --git a/setup.py b/setup.py index ec88e1a..54e0566 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with io.open('README.md', encoding='utf-8') as fh: long_description = fh.read() setuptools.setup(name='bwb', - version='2.0.3', + version='2.1.0', description='bwb', long_description=long_description, long_description_content_type='text/markdown',