You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
202 lines
6.4 KiB
202 lines
6.4 KiB
import base58 |
|
import base64 |
|
import os |
|
import random |
|
import string |
|
|
|
from cryptography.hazmat.backends import default_backend |
|
from cryptography.hazmat.primitives.twofactor.hotp import HOTP |
|
from cryptography.hazmat.primitives.hashes import Hash, SHA1, SHA256 |
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher |
|
from cryptography.hazmat.primitives.ciphers.algorithms import AES |
|
from cryptography.hazmat.primitives.ciphers.modes import CBC |
|
from cryptography.hazmat.primitives.padding import PKCS7 |
|
|
|
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey |
|
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat |
|
from cryptography.hazmat.primitives.asymmetric.ec import ECDH |
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDF |
|
|
|
OTP_LOOKAHEAD = 25 |
|
|
|
class OTP: |
|
def __init__(self, secret): |
|
self.otp = HOTP(secret.encode(), 6, SHA1(), backend=default_backend(), enforce_key_length=False) |
|
self.otp_count = 0 |
|
|
|
def get_otp(self, target): |
|
if not self.otp: |
|
raise RuntimeError('bwb not init yet') |
|
if not isinstance(target, int): |
|
raise ValueError('invalid target') |
|
if int(target) <= 999999: |
|
raise ValueError('invalid target') |
|
|
|
code = self.otp.generate(self.otp_count).decode() |
|
self.otp_count += 1 |
|
return str(int(target) % int(code)).zfill(6) |
|
|
|
def get_otp_cast(self): |
|
if not self.otp: |
|
raise RuntimeError('bwb not init yet') |
|
|
|
code = self.otp.generate(self.otp_count).decode() |
|
self.otp_count += 1 |
|
return code |
|
|
|
def check_otp(self, code, me): |
|
if not self.otp: |
|
return False |
|
|
|
for i in range(OTP_LOOKAHEAD): |
|
count = self.otp_count + i |
|
otp_at = self.otp.generate(count).decode() |
|
if otp_at == str(code): # broadcast |
|
self.otp_count = count + 1 |
|
return True |
|
elif me % int(otp_at) == int(code): |
|
self.otp_count = count + 1 |
|
return True |
|
return False |
|
|
|
class bwb: |
|
def __init__(self, tid): |
|
self.TELEGRAM_ID = tid |
|
self.key = None |
|
self.master_pub = None |
|
self.init_secret = '' |
|
self.handshake_otp = None |
|
self.enc_secret = '' |
|
self.otp = None |
|
|
|
def to_b58(self, text): |
|
return 'l' + base58.b58encode(text.encode()).decode() |
|
|
|
def from_b58(self, text): |
|
if not text.startswith('l'): |
|
return False |
|
try: |
|
return base58.b58decode(text[1:]).decode() |
|
except BaseException as e: |
|
return False |
|
|
|
def enc(self, text, key=None): |
|
if not key: |
|
key = self.enc_secret.encode() |
|
if not key: |
|
raise RuntimeError('bwb not init yet') |
|
|
|
padder = PKCS7(AES.block_size).padder() |
|
padded = padder.update(text.encode()) |
|
padded += padder.finalize() |
|
|
|
digest = Hash(SHA256(), default_backend()) |
|
digest.update(key) |
|
key = digest.finalize() |
|
|
|
iv = os.urandom(4) |
|
|
|
cipher = Cipher(AES(key), CBC(iv * 4), default_backend()) |
|
encryptor = cipher.encryptor() |
|
ct = encryptor.update(padded) + encryptor.finalize() |
|
|
|
return 'I' + base58.b58encode(iv + ct).decode() |
|
|
|
def dec(self, ciphertext, key=None): |
|
if not key: |
|
key = self.enc_secret.encode() |
|
if not key: |
|
return False # so we can run every message through |
|
if not ciphertext.startswith('I'): |
|
return False |
|
|
|
try: |
|
ciphertext = base58.b58decode(ciphertext[1:]) |
|
iv = ciphertext[:4] |
|
ct = ciphertext[4:] |
|
|
|
digest = Hash(SHA256(), default_backend()) |
|
digest.update(key) |
|
key = digest.finalize() |
|
|
|
cipher = Cipher(AES(key), CBC(iv * 4), default_backend()) |
|
decryptor = cipher.decryptor() |
|
pt = decryptor.update(ct) + decryptor.finalize() |
|
|
|
unpadder = PKCS7(AES.block_size).unpadder() |
|
unpadded = unpadder.update(pt) |
|
unpadded += unpadder.finalize() |
|
|
|
return unpadded.decode() |
|
except BaseException as e: |
|
return False |
|
|
|
def wrap(self, text, handshake=False, target=None, b58=False, enc=False): |
|
if handshake: |
|
code = self.handshake_otp.get_otp_cast() |
|
elif target: |
|
code = self.otp.get_otp(target) |
|
else: |
|
code = self.otp.get_otp_cast() |
|
|
|
text = code + text |
|
|
|
if enc: |
|
return self.enc(text) |
|
elif b58: |
|
return self.to_b58(text) |
|
else: |
|
return text |
|
|
|
def parse(self, text): |
|
return self.dec(text) or self.from_b58(text) or text |
|
|
|
def check_auth(self, text, handshake=False): |
|
try: |
|
int(text[:6]) |
|
except ValueError: |
|
return False |
|
|
|
if handshake: |
|
check = self.handshake_otp |
|
else: |
|
check = self.otp |
|
|
|
if check.check_otp(text[:6], me=self.TELEGRAM_ID): |
|
return True |
|
else: |
|
return False |
|
|
|
def get_pub(self): |
|
self.key = X25519PrivateKey.generate() |
|
pub_key = self.key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) |
|
return base58.b58encode(pub_key).decode() |
|
|
|
def init(self): |
|
self.init_secret = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(14)) |
|
self.enc_secret = self.init_secret |
|
self.otp = OTP(self.init_secret) |
|
return self.get_pub() |
|
|
|
def handshake(self, text): |
|
self.init_secret = '' # clear it |
|
self.master_pub = X25519PublicKey.from_public_bytes(base58.b58decode(text)) |
|
slave_pub = self.get_pub() |
|
self.handshake_otp = OTP(slave_pub) |
|
return slave_pub |
|
|
|
def secret(self, text): |
|
if not self.init_secret: return |
|
self.handshake_otp = OTP(text) |
|
slave_pub = X25519PublicKey.from_public_bytes(base58.b58decode(text)) |
|
shared_key = self.key.exchange(slave_pub) |
|
derived_key = HKDF(SHA256(), length=32, salt=b'Qot.', info=None, backend=default_backend()).derive(shared_key) |
|
return self.enc(self.init_secret, derived_key) |
|
|
|
def set_secret(self, text): |
|
shared_key = self.key.exchange(self.master_pub) |
|
derived_key = HKDF(SHA256(), length=32, salt=b'Qot.', info=None, backend=default_backend()).derive(shared_key) |
|
self.enc_secret = self.dec(text, derived_key) |
|
self.otp = OTP(self.enc_secret) |
|
return True
|
|
|