Protect main OTP against unauthed changes
This is a problem if someone sends: 000000handshake xyz ...because they will have set our OTP to xyz and could guess codes. Fixed by using a separate OTP object for handshaking only.
This commit is contained in:
parent
03e37f5601
commit
47ad8b575a
10
README.md
10
README.md
|
@ -31,11 +31,10 @@ event.respond('000000handshake ' + bwb.handshake(data))
|
|||
On `000000handshake [data]`:
|
||||
|
||||
```text
|
||||
event.respond(bwb.wrap('secret ' + bwb.secret(data)))
|
||||
bwb.set_otp(bwb.init_secret)
|
||||
event.respond(bwb.wrap('secret ' + bwb.secret(data), handshake=True))
|
||||
```
|
||||
|
||||
On _OTP authed_ `123456secret [data]`:
|
||||
On _Handshake OTP authed_ `123456secret [data]`:
|
||||
|
||||
```text
|
||||
bwb.set_secret(data)
|
||||
|
@ -62,6 +61,9 @@ if text.startswith('!'):
|
|||
...
|
||||
elif text.startswith('000000'):
|
||||
text = text[6:]
|
||||
elif bwb.check_auth(text, handshake=True):
|
||||
handshake_authed = True
|
||||
text = text[6:]
|
||||
elif bwb.check_auth(text):
|
||||
authed = True
|
||||
text = text[6:]
|
||||
|
@ -74,7 +76,7 @@ Use `bwb.wrap()` to auth and encode outgoing commands.
|
|||
Params:
|
||||
|
||||
```text
|
||||
wrap(text, target=None, b58=False, enc=False)
|
||||
wrap(text, handshake=False, target=None, b58=False, enc=False)
|
||||
```
|
||||
|
||||
Examples:
|
||||
|
|
|
@ -20,21 +20,11 @@ from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
|||
|
||||
OTP_LOOKAHEAD = 25
|
||||
|
||||
class common:
|
||||
def __init__(self):
|
||||
self.init_secret = ''
|
||||
self.key = None
|
||||
self.master_pub = None
|
||||
self.otp = None
|
||||
self.otp_secret = ''
|
||||
class OTP:
|
||||
def __init__(self, secret):
|
||||
self.otp = HOTP(secret.encode(), 6, SHA1(), backend=default_backend(), enforce_key_length=False)
|
||||
self.otp_count = 0
|
||||
|
||||
def set_otp(self, otp_secret):
|
||||
self.otp = HOTP(otp_secret.encode(), 6, SHA1(), backend=default_backend(), enforce_key_length=False)
|
||||
self.otp_secret = otp_secret
|
||||
self.otp_count = 0
|
||||
return True
|
||||
|
||||
def get_otp(self, target):
|
||||
if not self.otp:
|
||||
raise RuntimeError('bwb not init yet')
|
||||
|
@ -55,7 +45,7 @@ class common:
|
|||
self.otp_count += 1
|
||||
return code
|
||||
|
||||
def check_otp(self, code):
|
||||
def check_otp(self, code, me):
|
||||
if not self.otp:
|
||||
return False
|
||||
|
||||
|
@ -65,11 +55,20 @@ class common:
|
|||
if otp_at == str(code): # broadcast
|
||||
self.otp_count = count + 1
|
||||
return True
|
||||
elif self.TELEGRAM_ID % int(otp_at) == int(code):
|
||||
elif me % int(otp_at) == int(code):
|
||||
self.otp_count = count + 1
|
||||
return True
|
||||
return False
|
||||
|
||||
class common:
|
||||
def __init__(self):
|
||||
self.key = None
|
||||
self.master_pub = None
|
||||
self.init_secret = ''
|
||||
self.handshake_otp = None
|
||||
self.enc_secret = ''
|
||||
self.otp = None
|
||||
|
||||
def to_b58(self, text):
|
||||
return 'l' + base58.b58encode(text.encode()).decode()
|
||||
|
||||
|
@ -83,7 +82,7 @@ class common:
|
|||
|
||||
def enc(self, text, key=None):
|
||||
if not key:
|
||||
key = self.otp_secret.encode()
|
||||
key = self.enc_secret.encode()
|
||||
if not key:
|
||||
raise RuntimeError('bwb not init yet')
|
||||
|
||||
|
@ -105,7 +104,7 @@ class common:
|
|||
|
||||
def dec(self, ciphertext, key=None):
|
||||
if not key:
|
||||
key = self.otp_secret.encode()
|
||||
key = self.enc_secret.encode()
|
||||
if not key:
|
||||
return False # so we can run every message through
|
||||
if not ciphertext.startswith('I'):
|
||||
|
@ -132,11 +131,13 @@ class common:
|
|||
except BaseException as e:
|
||||
return False
|
||||
|
||||
def wrap(self, text, target=None, b58=False, enc=False):
|
||||
if target:
|
||||
code = self.get_otp(target)
|
||||
def wrap(self, text, handshake=False, target=None, b58=False, enc=False):
|
||||
if handshake:
|
||||
code = self.handshake_otp.get_otp_cast()
|
||||
elif target:
|
||||
code = self.otp.get_otp(target)
|
||||
else:
|
||||
code = self.get_otp_cast()
|
||||
code = self.otp.get_otp_cast()
|
||||
|
||||
text = code + text
|
||||
|
||||
|
@ -150,13 +151,18 @@ class common:
|
|||
def parse(self, text):
|
||||
return self.dec(text) or self.from_b58(text) or text
|
||||
|
||||
def check_auth(self, text):
|
||||
def check_auth(self, text, handshake=False):
|
||||
try:
|
||||
int(text[:6])
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
if self.check_otp(text[:6]):
|
||||
if handshake:
|
||||
check = self.handshake_otp
|
||||
else:
|
||||
check = self.otp
|
||||
|
||||
if check.check_otp(text[:6], me=self.TELEGRAM_ID):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
@ -168,18 +174,19 @@ class common:
|
|||
|
||||
def init(self):
|
||||
self.init_secret = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(14))
|
||||
self.otp = OTP(self.init_secret)
|
||||
return self.get_pub()
|
||||
|
||||
def handshake(self, text):
|
||||
self.init_secret = '' # clear it
|
||||
self.master_pub = X25519PublicKey.from_public_bytes(base58.b58decode(text))
|
||||
slave_pub = self.get_pub()
|
||||
self.set_otp(slave_pub)
|
||||
self.handshake_otp = OTP(slave_pub)
|
||||
return slave_pub
|
||||
|
||||
def secret(self, text):
|
||||
if not self.init_secret: return
|
||||
self.set_otp(text)
|
||||
self.handshake_otp = OTP(text)
|
||||
slave_pub = X25519PublicKey.from_public_bytes(base58.b58decode(text))
|
||||
shared_key = self.key.exchange(slave_pub)
|
||||
derived_key = HKDF(SHA256(), length=32, salt=b'Qot.', info=None, backend=default_backend()).derive(shared_key)
|
||||
|
@ -188,5 +195,6 @@ class common:
|
|||
def set_secret(self, text):
|
||||
shared_key = self.key.exchange(self.master_pub)
|
||||
derived_key = HKDF(SHA256(), length=32, salt=b'Qot.', info=None, backend=default_backend()).derive(shared_key)
|
||||
otp_secret = self.dec(text, derived_key)
|
||||
return self.set_otp(otp_secret)
|
||||
secret = self.dec(text, derived_key)
|
||||
self.otp = OTP(secret)
|
||||
return True
|
||||
|
|
Loading…
Reference in New Issue
Block a user