|
|
|
@ -2,6 +2,8 @@ import hashlib, hmac, json, requests, time |
|
|
|
|
from base64 import b64encode, b64decode |
|
|
|
|
from binascii import hexlify, unhexlify |
|
|
|
|
from Crypto.Cipher import AES |
|
|
|
|
from Crypto.Random import random |
|
|
|
|
from copy import deepcopy |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RESTAPI: |
|
|
|
@ -15,7 +17,10 @@ class RESTAPI: |
|
|
|
|
|
|
|
|
|
def post(self, route, data=None): |
|
|
|
|
url = self.base_url + route |
|
|
|
|
return requests.post(url, data, headers=self.headers).json() |
|
|
|
|
print(data) |
|
|
|
|
res = requests.post(url, json=data, headers=self.headers) |
|
|
|
|
print(res.text) |
|
|
|
|
return res.json() |
|
|
|
|
|
|
|
|
|
def addHeader(self, header): |
|
|
|
|
self.headers.update(header) |
|
|
|
@ -30,11 +35,29 @@ class EncryptionHelper: |
|
|
|
|
pw = output[0 : split_length] |
|
|
|
|
mk = output[split_length : split_length * 2] |
|
|
|
|
ak = output[split_length * 2 : split_length * 3] |
|
|
|
|
return {'pw': pw, 'mk': mk, 'ak': ak} |
|
|
|
|
return dict(pw=pw, mk=mk, ak=ak) |
|
|
|
|
|
|
|
|
|
def pure_decryptResponseItems(self, response_items, keys): |
|
|
|
|
def encryptDirtyItems(self, dirty_items, keys): |
|
|
|
|
return [self.pure_encryptItem(item, keys) for item in dirty_items] |
|
|
|
|
|
|
|
|
|
def decryptResponseItems(self, response_items, keys): |
|
|
|
|
return [self.pure_decryptItem(item, keys) for item in response_items] |
|
|
|
|
|
|
|
|
|
def pure_encryptItem(self, item, keys): |
|
|
|
|
uuid = item['uuid'] |
|
|
|
|
content = json.dumps(item['content']) |
|
|
|
|
|
|
|
|
|
item_key = hex(random.getrandbits(512)) |
|
|
|
|
item_key = item_key[2:].rjust(128, '0') # remove '0x', pad to 128 |
|
|
|
|
item_key_length = len(item_key) |
|
|
|
|
item_ek = item_key[:item_key_length//2] |
|
|
|
|
item_ak = item_key[item_key_length//2:] |
|
|
|
|
|
|
|
|
|
enc_item = deepcopy(item) |
|
|
|
|
enc_item['content'] = self.pure_encryptString002(content, item_ek, item_ak, uuid) |
|
|
|
|
enc_item['enc_item_key'] = self.pure_encryptString002(item_key, keys['mk'], keys['ak'], uuid) |
|
|
|
|
return enc_item |
|
|
|
|
|
|
|
|
|
def pure_decryptItem(self, item, keys): |
|
|
|
|
uuid = item['uuid'] |
|
|
|
|
content = item['content'] |
|
|
|
@ -53,10 +76,28 @@ class EncryptionHelper: |
|
|
|
|
else: |
|
|
|
|
print('Invalid protocol version.') |
|
|
|
|
|
|
|
|
|
dec_item = item |
|
|
|
|
dec_item = deepcopy(item) |
|
|
|
|
dec_item['content'] = json.loads(dec_content) |
|
|
|
|
return dec_item |
|
|
|
|
|
|
|
|
|
def pure_encryptString002(self, string_to_encrypt, encryption_key, auth_key, uuid): |
|
|
|
|
IV = hex(random.getrandbits(128)) |
|
|
|
|
IV = IV[2:].rjust(32, '0') # remove '0x', pad to 32 |
|
|
|
|
|
|
|
|
|
cipher = AES.new(unhexlify(encryption_key), AES.MODE_CBC, unhexlify(IV)) |
|
|
|
|
pt = string_to_encrypt.encode() |
|
|
|
|
pad = 16 - len(pt) % 16 |
|
|
|
|
padded_pt = pt + pad * bytes([pad]) |
|
|
|
|
ciphertext = b64encode(cipher.encrypt(padded_pt)).decode() |
|
|
|
|
|
|
|
|
|
string_to_auth = ':'.join(['002', uuid, IV, ciphertext]) |
|
|
|
|
auth_hash = hmac.new(unhexlify(auth_key), string_to_auth.encode(), 'sha256').digest() |
|
|
|
|
auth_hash = hexlify(auth_hash).decode() |
|
|
|
|
|
|
|
|
|
result = ':'.join(['002', auth_hash, uuid, IV, ciphertext]) |
|
|
|
|
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
def pure_decryptString002(self, string_to_decrypt, encryption_key, auth_key, uuid): |
|
|
|
|
components = string_to_decrypt.split(':') |
|
|
|
|
version = components[0] |
|
|
|
@ -89,26 +130,44 @@ class StandardNotesAPI: |
|
|
|
|
sync_token = None |
|
|
|
|
|
|
|
|
|
def getAuthParamsForEmail(self): |
|
|
|
|
return self.api.get('/auth/params', {'email': self.username}) |
|
|
|
|
return self.api.get('/auth/params', dict(email=self.username)) |
|
|
|
|
|
|
|
|
|
def signIn(self, password): |
|
|
|
|
pw_info = self.getAuthParamsForEmail() |
|
|
|
|
self.keys = self.encryption_helper.pure_generatePasswordAndKey(password, pw_info['pw_salt'], pw_info['pw_cost']) |
|
|
|
|
res = self.api.post('/auth/sign_in', {'email': self.username, 'password': self.keys['pw']}) |
|
|
|
|
self.api.addHeader({'Authorization': 'Bearer ' + res['token']}) |
|
|
|
|
res = self.api.post('/auth/sign_in', dict(email=self.username, password=self.keys['pw'])) |
|
|
|
|
self.api.addHeader(dict(Authorization='Bearer ' + res['token'])) |
|
|
|
|
|
|
|
|
|
def sync(self, dirty_items): |
|
|
|
|
res = self.api.post('/items/sync', {'sync_token': self.sync_token}) |
|
|
|
|
print(json.dumps(res)) |
|
|
|
|
items = self.handleDirtyItems(dirty_items) |
|
|
|
|
if items: |
|
|
|
|
json_items = items |
|
|
|
|
print(json_items) |
|
|
|
|
else: |
|
|
|
|
json_items = [] |
|
|
|
|
response = self.api.post('/items/sync', dict(sync_token=self.sync_token, items=json_items)) |
|
|
|
|
print(json.dumps(response)) |
|
|
|
|
|
|
|
|
|
self.sync_token = response['sync_token'] |
|
|
|
|
return self.handleResponseItems(response) |
|
|
|
|
|
|
|
|
|
self.sync_token = res['sync_token'] |
|
|
|
|
return self.handleResponseItems(res['retrieved_items']) |
|
|
|
|
def handleDirtyItems(self, dirty_items): |
|
|
|
|
items = self.encryption_helper.encryptDirtyItems(dirty_items, self.keys) |
|
|
|
|
return items |
|
|
|
|
|
|
|
|
|
def handleResponseItems(self, response_items): |
|
|
|
|
decrypted_items = self.encryption_helper.pure_decryptResponseItems(response_items, self.keys) |
|
|
|
|
return decrypted_items |
|
|
|
|
def handleResponseItems(self, response): |
|
|
|
|
response_items = self.encryption_helper.decryptResponseItems(response['retrieved_items'], self.keys) |
|
|
|
|
saved_items = self.encryption_helper.decryptResponseItems(response['saved_items'], self.keys) |
|
|
|
|
return dict(response_items=response_items, saved_items=saved_items) |
|
|
|
|
|
|
|
|
|
def __init__(self, username, password): |
|
|
|
|
self.api = RESTAPI(self.base_url) |
|
|
|
|
self.username = username |
|
|
|
|
self.signIn(password) |
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
standard_notes = StandardNotesAPI('tanner@domain.com', 'complexpass') |
|
|
|
|
test_item = standard_notes.encryption_helper.pure_encryptItem(dict(content=dict(hello='world'), uuid='1234'), standard_notes.keys) |
|
|
|
|
print(test_item) |
|
|
|
|
test_item = standard_notes.encryption_helper.pure_decryptItem(test_item, standard_notes.keys) |
|
|
|
|
print(test_item) |
|
|
|
|