From ec6de719abcd524cd0294e279b03b25006b1d8ad Mon Sep 17 00:00:00 2001 From: Tanner Collin Date: Thu, 5 Oct 2017 00:30:11 -0600 Subject: [PATCH] Move crypto stuff to separate file, clean code up --- api.py | 127 ++----------------------------------------------- crypt.py | 105 ++++++++++++++++++++++++++++++++++++++++ itemmanager.py | 4 +- 3 files changed, 110 insertions(+), 126 deletions(-) create mode 100644 crypt.py diff --git a/api.py b/api.py index 6b4d033..dab1a64 100644 --- a/api.py +++ b/api.py @@ -1,10 +1,6 @@ -import hashlib, hmac, json, requests, time -from base64 import b64encode, b64decode -from binascii import hexlify, unhexlify -from Crypto.Cipher import AES -from Crypto.Random import random -from copy import deepcopy +import json, requests, time +from crypt import EncryptionHelper class RESTAPI: def __init__(self, base_url): @@ -17,113 +13,11 @@ class RESTAPI: def post(self, route, data=None): url = self.base_url + route - print(data) - res = requests.post(url, json=data, headers=self.headers) - print(res.text) - return res.json() + return requests.post(url, json=data, headers=self.headers).json() def addHeader(self, header): self.headers.update(header) -class EncryptionHelper: - def pure_generatePasswordAndKey(self, password, pw_salt, pw_cost): - output = hashlib.pbkdf2_hmac('sha512', password.encode(), pw_salt.encode(), pw_cost, dklen=96) - output = hexlify(output).decode() - - output_length = len(output) - split_length = output_length // 3 - pw = output[0 : split_length] - mk = output[split_length : split_length * 2] - ak = output[split_length * 2 : split_length * 3] - return dict(pw=pw, mk=mk, ak=ak) - - def encryptDirtyItems(self, dirty_items, keys): - return [self.pure_encryptItem(item, keys) for item in dirty_items] - - def decryptResponseItems(self, response_items, keys): - return [self.pure_decryptItem(item, keys) for item in response_items] - - def pure_encryptItem(self, item, keys): - uuid = item['uuid'] - content = json.dumps(item['content']) - - item_key = hex(random.getrandbits(512)) - item_key = item_key[2:].rjust(128, '0') # remove '0x', pad to 128 - item_key_length = len(item_key) - item_ek = item_key[:item_key_length//2] - item_ak = item_key[item_key_length//2:] - - enc_item = deepcopy(item) - enc_item['content'] = self.pure_encryptString002(content, item_ek, item_ak, uuid) - enc_item['enc_item_key'] = self.pure_encryptString002(item_key, keys['mk'], keys['ak'], uuid) - return enc_item - - def pure_decryptItem(self, item, keys): - uuid = item['uuid'] - content = item['content'] - enc_item_key = item['enc_item_key'] - - if not content: - return item - - if content[:3] == '002': - item_key = self.pure_decryptString002(enc_item_key, keys['mk'], keys['ak'], uuid) - item_key_length = len(item_key) - item_ek = item_key[:item_key_length//2] - item_ak = item_key[item_key_length//2:] - - dec_content = self.pure_decryptString002(content, item_ek, item_ak, uuid) - else: - print('Invalid protocol version.') - - dec_item = deepcopy(item) - dec_item['content'] = json.loads(dec_content) - return dec_item - - def pure_encryptString002(self, string_to_encrypt, encryption_key, auth_key, uuid): - IV = hex(random.getrandbits(128)) - IV = IV[2:].rjust(32, '0') # remove '0x', pad to 32 - - cipher = AES.new(unhexlify(encryption_key), AES.MODE_CBC, unhexlify(IV)) - pt = string_to_encrypt.encode() - pad = 16 - len(pt) % 16 - padded_pt = pt + pad * bytes([pad]) - ciphertext = b64encode(cipher.encrypt(padded_pt)).decode() - - string_to_auth = ':'.join(['002', uuid, IV, ciphertext]) - auth_hash = hmac.new(unhexlify(auth_key), string_to_auth.encode(), 'sha256').digest() - auth_hash = hexlify(auth_hash).decode() - - result = ':'.join(['002', auth_hash, uuid, IV, ciphertext]) - - return result - - def pure_decryptString002(self, string_to_decrypt, encryption_key, auth_key, uuid): - components = string_to_decrypt.split(':') - version = components[0] - auth_hash = components[1] - local_uuid = components[2] - IV = components[3] - ciphertext = components[4] - - if local_uuid != uuid: - print('UUID does not match.') - return - - string_to_auth = ':'.join([version, uuid, IV, ciphertext]) - local_auth_hash = hmac.new(unhexlify(auth_key), string_to_auth.encode(), 'sha256').digest() - local_auth_hash = hexlify(local_auth_hash).decode() - - if local_auth_hash != auth_hash: - print('Message has been tampered with.') - return - - cipher = AES.new(unhexlify(encryption_key), AES.MODE_CBC, unhexlify(IV)) - result = cipher.decrypt(b64decode(ciphertext)) - result = result[:-result[-1]] # remove PKCS#7 padding - - return result.decode() - class StandardNotesAPI: encryption_helper = EncryptionHelper() base_url = 'https://sync.standardnotes.org' @@ -140,13 +34,7 @@ class StandardNotesAPI: def sync(self, dirty_items): items = self.handleDirtyItems(dirty_items) - if items: - json_items = items - print(json_items) - else: - json_items = [] - response = self.api.post('/items/sync', dict(sync_token=self.sync_token, items=json_items)) - print(json.dumps(response)) + response = self.api.post('/items/sync', dict(sync_token=self.sync_token, items=items)) self.sync_token = response['sync_token'] return self.handleResponseItems(response) @@ -164,10 +52,3 @@ class StandardNotesAPI: self.api = RESTAPI(self.base_url) self.username = username self.signIn(password) - -if __name__ == '__main__': - standard_notes = StandardNotesAPI('tanner@domain.com', 'complexpass') - test_item = standard_notes.encryption_helper.pure_encryptItem(dict(content=dict(hello='world'), uuid='1234'), standard_notes.keys) - print(test_item) - test_item = standard_notes.encryption_helper.pure_decryptItem(test_item, standard_notes.keys) - print(test_item) diff --git a/crypt.py b/crypt.py new file mode 100644 index 0000000..7b4226b --- /dev/null +++ b/crypt.py @@ -0,0 +1,105 @@ +import hashlib, hmac, json +from base64 import b64encode, b64decode +from binascii import hexlify, unhexlify +from Crypto.Cipher import AES +from Crypto.Random import random +from copy import deepcopy + +class EncryptionHelper: + def pure_generatePasswordAndKey(self, password, pw_salt, pw_cost): + output = hashlib.pbkdf2_hmac('sha512', password.encode(), pw_salt.encode(), pw_cost, dklen=96) + output = hexlify(output).decode() + + output_length = len(output) + split_length = output_length // 3 + pw = output[0 : split_length] + mk = output[split_length : split_length * 2] + ak = output[split_length * 2 : split_length * 3] + return dict(pw=pw, mk=mk, ak=ak) + + def encryptDirtyItems(self, dirty_items, keys): + return [self.pure_encryptItem(item, keys) for item in dirty_items] + + def decryptResponseItems(self, response_items, keys): + return [self.pure_decryptItem(item, keys) for item in response_items] + + def pure_encryptItem(self, item, keys): + uuid = item['uuid'] + content = json.dumps(item['content']) + + item_key = hex(random.getrandbits(512)) + item_key = item_key[2:].rjust(128, '0') # remove '0x', pad to 128 + item_key_length = len(item_key) + item_ek = item_key[:item_key_length//2] + item_ak = item_key[item_key_length//2:] + + enc_item = deepcopy(item) + enc_item['content'] = self.pure_encryptString002(content, item_ek, item_ak, uuid) + enc_item['enc_item_key'] = self.pure_encryptString002(item_key, keys['mk'], keys['ak'], uuid) + return enc_item + + def pure_decryptItem(self, item, keys): + uuid = item['uuid'] + content = item['content'] + enc_item_key = item['enc_item_key'] + + if not content: + return item + + if content[:3] == '002': + item_key = self.pure_decryptString002(enc_item_key, keys['mk'], keys['ak'], uuid) + item_key_length = len(item_key) + item_ek = item_key[:item_key_length//2] + item_ak = item_key[item_key_length//2:] + + dec_content = self.pure_decryptString002(content, item_ek, item_ak, uuid) + else: + print('Invalid protocol version.') + + dec_item = deepcopy(item) + dec_item['content'] = json.loads(dec_content) + return dec_item + + def pure_encryptString002(self, string_to_encrypt, encryption_key, auth_key, uuid): + IV = hex(random.getrandbits(128)) + IV = IV[2:].rjust(32, '0') # remove '0x', pad to 32 + + cipher = AES.new(unhexlify(encryption_key), AES.MODE_CBC, unhexlify(IV)) + pt = string_to_encrypt.encode() + pad = 16 - len(pt) % 16 + padded_pt = pt + pad * bytes([pad]) + ciphertext = b64encode(cipher.encrypt(padded_pt)).decode() + + string_to_auth = ':'.join(['002', uuid, IV, ciphertext]) + auth_hash = hmac.new(unhexlify(auth_key), string_to_auth.encode(), 'sha256').digest() + auth_hash = hexlify(auth_hash).decode() + + result = ':'.join(['002', auth_hash, uuid, IV, ciphertext]) + + return result + + def pure_decryptString002(self, string_to_decrypt, encryption_key, auth_key, uuid): + components = string_to_decrypt.split(':') + version = components[0] + auth_hash = components[1] + local_uuid = components[2] + IV = components[3] + ciphertext = components[4] + + if local_uuid != uuid: + print('UUID does not match.') + return + + string_to_auth = ':'.join([version, uuid, IV, ciphertext]) + local_auth_hash = hmac.new(unhexlify(auth_key), string_to_auth.encode(), 'sha256').digest() + local_auth_hash = hexlify(local_auth_hash).decode() + + if local_auth_hash != auth_hash: + print('Message has been tampered with.') + return + + cipher = AES.new(unhexlify(encryption_key), AES.MODE_CBC, unhexlify(IV)) + result = cipher.decrypt(b64decode(ciphertext)) + result = result[:-result[-1]] # remove PKCS#7 padding + + return result.decode() diff --git a/itemmanager.py b/itemmanager.py index f42a727..2506dab 100644 --- a/itemmanager.py +++ b/itemmanager.py @@ -27,13 +27,12 @@ class ItemManager: def syncItems(self): dirty_items = [item for uuid, item in self.items.items() if item['dirty']] - # remove keys (note: this removed them from self.items as well) + # remove keys (note: this removes them from self.items as well) for item in dirty_items: item.pop('dirty', None) item.pop('updated_at', None) response = self.standard_notes.sync(dirty_items) - print('info: ', response) self.mapResponseItemsToLocalItems(response['response_items']) self.mapResponseItemsToLocalItems(response['saved_items'], metadata_only=True) @@ -62,7 +61,6 @@ class ItemManager: def writeNote(self, uuid, text): item = self.items[uuid] - item['content']['text'] = text.strip() item['dirty'] = True self.syncItems()