Switch from pycryptodome to cryptography library
This commit is contained in:
parent
9a7e0bb23c
commit
b95c6593c8
|
@ -1,13 +1,17 @@
|
|||
import base58
|
||||
import base64
|
||||
import pyotp
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Random import get_random_bytes
|
||||
from Crypto.Hash import SHA256, SHA512, MD5
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.twofactor.hotp import HOTP
|
||||
from cryptography.hazmat.primitives.hashes import Hash, SHA1, SHA256
|
||||
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
from cryptography.hazmat.primitives.ciphers.modes import CBC
|
||||
from cryptography.hazmat.primitives.padding import PKCS7
|
||||
|
||||
OTP_LOOKAHEAD = 25
|
||||
|
||||
|
@ -19,9 +23,7 @@ class common:
|
|||
|
||||
def init(self, secret=None):
|
||||
secret = secret or ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(12))
|
||||
encoded = secret.encode()
|
||||
b32 = base64.b32encode(encoded)
|
||||
self.otp = pyotp.HOTP(b32)
|
||||
self.otp = HOTP(secret.encode(), 6, SHA1(), backend=default_backend(), enforce_key_length=False)
|
||||
self.secret = secret
|
||||
self.otp_count = 0
|
||||
return secret
|
||||
|
@ -34,7 +36,7 @@ class common:
|
|||
if int(target) <= 999999:
|
||||
raise ValueError('invalid target')
|
||||
|
||||
code = self.otp.at(self.otp_count)
|
||||
code = self.otp.generate(self.otp_count).decode()
|
||||
self.otp_count += 1
|
||||
return str(int(target) % int(code)).zfill(6)
|
||||
|
||||
|
@ -42,7 +44,7 @@ class common:
|
|||
if not self.otp:
|
||||
raise RuntimeError('bwb not init yet')
|
||||
|
||||
code = self.otp.at(self.otp_count)
|
||||
code = self.otp.generate(self.otp_count).decode()
|
||||
self.otp_count += 1
|
||||
return code
|
||||
|
||||
|
@ -52,10 +54,11 @@ class common:
|
|||
|
||||
for i in range(OTP_LOOKAHEAD):
|
||||
count = self.otp_count + i
|
||||
if self.otp.at(count) == str(code): # broadcast
|
||||
otp_at = self.otp.generate(count).decode()
|
||||
if otp_at == str(code): # broadcast
|
||||
self.otp_count = count + 1
|
||||
return True
|
||||
elif self.TELEGRAM_ID % int(self.otp.at(count)) == int(code):
|
||||
elif self.TELEGRAM_ID % int(otp_at) == int(code):
|
||||
self.otp_count = count + 1
|
||||
return True
|
||||
return False
|
||||
|
@ -75,10 +78,20 @@ class common:
|
|||
if not self.secret:
|
||||
raise RuntimeError('bwb not init yet')
|
||||
|
||||
key = SHA256.new(self.secret.encode()).digest()
|
||||
iv = get_random_bytes(4)
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv * 4)
|
||||
ct = cipher.encrypt(pad(text.encode(), AES.block_size))
|
||||
padder = PKCS7(AES.block_size).padder()
|
||||
padded = padder.update(text.encode())
|
||||
padded += padder.finalize()
|
||||
|
||||
digest = Hash(SHA256(), default_backend())
|
||||
digest.update(self.secret.encode())
|
||||
key = digest.finalize()
|
||||
|
||||
iv = os.urandom(4)
|
||||
|
||||
cipher = Cipher(AES(key), CBC(iv * 4), default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
ct = encryptor.update(padded) + encryptor.finalize()
|
||||
|
||||
return 'I' + base58.b58encode(iv + ct).decode()
|
||||
|
||||
def dec(self, ciphertext):
|
||||
|
@ -89,12 +102,22 @@ class common:
|
|||
|
||||
try:
|
||||
ciphertext = base58.b58decode(ciphertext[1:])
|
||||
key = SHA256.new(self.secret.encode()).digest()
|
||||
iv = ciphertext[:4]
|
||||
ct = ciphertext[4:]
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv * 4)
|
||||
pt = unpad(cipher.decrypt(ct), AES.block_size)
|
||||
return pt.decode()
|
||||
|
||||
digest = Hash(SHA256(), default_backend())
|
||||
digest.update(self.secret.encode())
|
||||
key = digest.finalize()
|
||||
|
||||
cipher = Cipher(AES(key), CBC(iv * 4), default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
pt = decryptor.update(ct) + decryptor.finalize()
|
||||
|
||||
unpadder = PKCS7(AES.block_size).unpadder()
|
||||
unpadded = unpadder.update(pt)
|
||||
unpadded += unpadder.finalize()
|
||||
|
||||
return unpadded.decode()
|
||||
except BaseException as e:
|
||||
return False
|
||||
|
||||
|
|
4
setup.py
4
setup.py
|
@ -5,7 +5,7 @@ with io.open('README.md', encoding='utf-8') as fh:
|
|||
long_description = fh.read()
|
||||
|
||||
setuptools.setup(name='bwb',
|
||||
version='1.0.9',
|
||||
version='1.1.0',
|
||||
description='bwb',
|
||||
long_description=long_description,
|
||||
long_description_content_type='text/markdown',
|
||||
|
@ -14,5 +14,5 @@ setuptools.setup(name='bwb',
|
|||
license='QPL.txt',
|
||||
url='https://qotmail.com',
|
||||
packages=setuptools.find_packages(),
|
||||
install_requires=['base58', 'pycryptodome', 'pyotp'],
|
||||
install_requires=['base58', 'cryptography'],
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue
Block a user