|
|
|
@ -13,20 +13,27 @@ from cryptography.hazmat.primitives.ciphers.algorithms import AES |
|
|
|
|
from cryptography.hazmat.primitives.ciphers.modes import CBC |
|
|
|
|
from cryptography.hazmat.primitives.padding import PKCS7 |
|
|
|
|
|
|
|
|
|
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey |
|
|
|
|
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat |
|
|
|
|
from cryptography.hazmat.primitives.asymmetric.ec import ECDH |
|
|
|
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDF |
|
|
|
|
|
|
|
|
|
OTP_LOOKAHEAD = 25 |
|
|
|
|
|
|
|
|
|
class common: |
|
|
|
|
def __init__(self): |
|
|
|
|
self.init_secret = '' |
|
|
|
|
self.key = None |
|
|
|
|
self.master_pub = None |
|
|
|
|
self.otp = None |
|
|
|
|
self.secret = '' |
|
|
|
|
self.otp_secret = '' |
|
|
|
|
self.otp_count = 0 |
|
|
|
|
|
|
|
|
|
def init(self, secret=None): |
|
|
|
|
secret = secret or ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(12)) |
|
|
|
|
self.otp = HOTP(secret.encode(), 6, SHA1(), backend=default_backend(), enforce_key_length=False) |
|
|
|
|
self.secret = secret |
|
|
|
|
def set_otp(self, otp_secret): |
|
|
|
|
self.otp = HOTP(otp_secret.encode(), 6, SHA1(), backend=default_backend(), enforce_key_length=False) |
|
|
|
|
self.otp_secret = otp_secret |
|
|
|
|
self.otp_count = 0 |
|
|
|
|
return secret |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
def get_otp(self, target): |
|
|
|
|
if not self.otp: |
|
|
|
@ -64,7 +71,7 @@ class common: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
def to_b58(self, text): |
|
|
|
|
return 'l' + base58.b58encode(text.encode()).decode() |
|
|
|
|
return 'l' + base58.b58encode(text).decode() |
|
|
|
|
|
|
|
|
|
def from_b58(self, text): |
|
|
|
|
if not text.startswith('l'): |
|
|
|
@ -74,8 +81,10 @@ class common: |
|
|
|
|
except BaseException as e: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
def enc(self, text): |
|
|
|
|
if not self.secret: |
|
|
|
|
def enc(self, text, key=None): |
|
|
|
|
if not key: |
|
|
|
|
key = self.otp_secret.encode() |
|
|
|
|
if not key: |
|
|
|
|
raise RuntimeError('bwb not init yet') |
|
|
|
|
|
|
|
|
|
padder = PKCS7(AES.block_size).padder() |
|
|
|
@ -83,7 +92,7 @@ class common: |
|
|
|
|
padded += padder.finalize() |
|
|
|
|
|
|
|
|
|
digest = Hash(SHA256(), default_backend()) |
|
|
|
|
digest.update(self.secret.encode()) |
|
|
|
|
digest.update(key) |
|
|
|
|
key = digest.finalize() |
|
|
|
|
|
|
|
|
|
iv = os.urandom(4) |
|
|
|
@ -94,8 +103,10 @@ class common: |
|
|
|
|
|
|
|
|
|
return 'I' + base58.b58encode(iv + ct).decode() |
|
|
|
|
|
|
|
|
|
def dec(self, ciphertext): |
|
|
|
|
if not self.secret: |
|
|
|
|
def dec(self, ciphertext, key=None): |
|
|
|
|
if not key: |
|
|
|
|
key = self.otp_secret.encode() |
|
|
|
|
if not key: |
|
|
|
|
return False # so we can run every message through |
|
|
|
|
if not ciphertext.startswith('I'): |
|
|
|
|
return False |
|
|
|
@ -106,7 +117,7 @@ class common: |
|
|
|
|
ct = ciphertext[4:] |
|
|
|
|
|
|
|
|
|
digest = Hash(SHA256(), default_backend()) |
|
|
|
|
digest.update(self.secret.encode()) |
|
|
|
|
digest.update(key) |
|
|
|
|
key = digest.finalize() |
|
|
|
|
|
|
|
|
|
cipher = Cipher(AES(key), CBC(iv * 4), default_backend()) |
|
|
|
@ -119,6 +130,7 @@ class common: |
|
|
|
|
|
|
|
|
|
return unpadded.decode() |
|
|
|
|
except BaseException as e: |
|
|
|
|
print(e) |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
def wrap(self, text, target=None, b58=False, enc=False): |
|
|
|
@ -149,3 +161,32 @@ class common: |
|
|
|
|
return True |
|
|
|
|
else: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
def get_pub(self): |
|
|
|
|
self.key = X25519PrivateKey.generate() |
|
|
|
|
pub_key = self.key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) |
|
|
|
|
return base58.b58encode(pub_key).decode() |
|
|
|
|
|
|
|
|
|
def init(self): |
|
|
|
|
self.init_secret = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(14)) |
|
|
|
|
return self.get_pub() |
|
|
|
|
|
|
|
|
|
def handshake(self, text): |
|
|
|
|
self.master_pub = X25519PublicKey.from_public_bytes(base58.b58decode(text)) |
|
|
|
|
slave_pub = self.get_pub() |
|
|
|
|
self.set_otp(slave_pub) |
|
|
|
|
return slave_pub |
|
|
|
|
|
|
|
|
|
def secret(self, text): |
|
|
|
|
if not self.init_secret: return |
|
|
|
|
self.set_otp(text) |
|
|
|
|
slave_pub = X25519PublicKey.from_public_bytes(base58.b58decode(text)) |
|
|
|
|
shared_key = self.key.exchange(slave_pub) |
|
|
|
|
derived_key = HKDF(SHA256(), length=32, salt=b'Qot.', info=None, backend=default_backend()).derive(shared_key) |
|
|
|
|
return self.enc(self.init_secret, derived_key) |
|
|
|
|
|
|
|
|
|
def set_secret(self, text): |
|
|
|
|
shared_key = self.key.exchange(self.master_pub) |
|
|
|
|
derived_key = HKDF(SHA256(), length=32, salt=b'Qot.', info=None, backend=default_backend()).derive(shared_key) |
|
|
|
|
otp_secret = self.dec(text, derived_key) |
|
|
|
|
return self.set_otp(otp_secret) |
|
|
|
|