You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
5.1KB

  1. from base64 import b64decode, b64encode
  2. from binascii import hexlify, unhexlify
  3. from copy import deepcopy
  4. import hashlib
  5. import hmac
  6. import json
  7. import sys
  8. from Crypto.Cipher import AES
  9. from Crypto.Random import random
  10. BITS_PER_HEX_DIGIT = 4
  11. PASS_KEY_LEN = 96
  12. AES_KEY_LEN = 256
  13. AES_BLK_SIZE = 16
  14. AES_STR_KEY_LEN = AES_KEY_LEN // BITS_PER_HEX_DIGIT
  15. AES_IV_LEN = 128
  16. AES_STR_IV_LEN = AES_IV_LEN // BITS_PER_HEX_DIGIT
  17. class EncryptionHelper:
  18. def pure_generate_password_and_key(self, password, pw_salt, pw_cost):
  19. output = hashlib.pbkdf2_hmac(
  20. 'sha512', password.encode(), pw_salt.encode(), pw_cost,
  21. dklen=PASS_KEY_LEN)
  22. output = hexlify(output).decode()
  23. output_length = len(output)
  24. split_length = output_length // 3
  25. pw = output[0 : split_length]
  26. mk = output[split_length : split_length * 2]
  27. ak = output[split_length * 2 : split_length * 3]
  28. return dict(pw=pw, mk=mk, ak=ak)
  29. def encrypt_dirty_items(self, dirty_items, keys):
  30. return [self.pure_encrypt_item(item, keys) for item in dirty_items]
  31. def decrypt_response_items(self, response_items, keys):
  32. return [self.pure_decrypt_item(item, keys) for item in response_items]
  33. def pure_encrypt_item(self, item, keys):
  34. uuid = item['uuid']
  35. content = json.dumps(item['content'])
  36. # all this is to follow the Standard Notes spec
  37. item_key = hex(random.getrandbits(AES_KEY_LEN * 2))
  38. # remove '0x', pad with 0's, then split in half
  39. item_key = item_key[2:].rjust(AES_STR_KEY_LEN * 2, '0')
  40. item_ek = item_key[:AES_STR_KEY_LEN]
  41. item_ak = item_key[AES_STR_KEY_LEN:]
  42. enc_item = deepcopy(item)
  43. enc_item['content'] = self.pure_encrypt_string_002(
  44. content, item_ek, item_ak, uuid)
  45. enc_item['enc_item_key'] = self.pure_encrypt_string_002(
  46. item_key, keys['mk'], keys['ak'], uuid)
  47. return enc_item
  48. def pure_decrypt_item(self, item, keys):
  49. if item['deleted']:
  50. return item
  51. uuid = item['uuid']
  52. content = item['content']
  53. enc_item_key = item['enc_item_key']
  54. if content[:3] == '001':
  55. print('Old encryption protocol detected. This version is not '
  56. 'supported by standardnotes-fs. Please resync all of '
  57. 'your notes by following the instructions here:\n'
  58. 'https://standardnotes.org/help/resync')
  59. sys.exit(1)
  60. elif content[:3] == '002':
  61. item_key = self.pure_decrypt_string_002(
  62. enc_item_key, keys['mk'], keys['ak'], uuid)
  63. item_key_length = len(item_key)
  64. item_ek = item_key[:item_key_length//2]
  65. item_ak = item_key[item_key_length//2:]
  66. dec_content = self.pure_decrypt_string_002(
  67. content, item_ek, item_ak, uuid)
  68. else:
  69. print('Invalid protocol version. This could indicate tampering or '
  70. 'that something is wrong with the server. Exiting.')
  71. sys.exit(1)
  72. dec_item = deepcopy(item)
  73. dec_item['content'] = json.loads(dec_content)
  74. return dec_item
  75. def pure_encrypt_string_002(self, string_to_encrypt, encryption_key,
  76. auth_key, uuid):
  77. IV = hex(random.getrandbits(AES_IV_LEN))
  78. IV = IV[2:].rjust(AES_STR_IV_LEN, '0') # remove '0x', pad with 0's
  79. cipher = AES.new(unhexlify(encryption_key), AES.MODE_CBC, unhexlify(IV))
  80. pt = string_to_encrypt.encode()
  81. pad = AES_BLK_SIZE - len(pt) % AES_BLK_SIZE
  82. padded_pt = pt + pad * bytes([pad])
  83. ciphertext = b64encode(cipher.encrypt(padded_pt)).decode()
  84. string_to_auth = ':'.join(['002', uuid, IV, ciphertext])
  85. auth_hash = hmac.new(
  86. unhexlify(auth_key), string_to_auth.encode(), 'sha256').digest()
  87. auth_hash = hexlify(auth_hash).decode()
  88. result = ':'.join(['002', auth_hash, uuid, IV, ciphertext])
  89. return result
  90. def pure_decrypt_string_002(self, string_to_decrypt, encryption_key,
  91. auth_key, uuid):
  92. components = string_to_decrypt.split(':')
  93. version, auth_hash, local_uuid, IV, ciphertext = components
  94. if local_uuid != uuid:
  95. print('UUID does not match. This could indicate tampering or '
  96. 'that something is wrong with the server. Exiting.')
  97. sys.exit(1)
  98. string_to_auth = ':'.join([version, uuid, IV, ciphertext])
  99. local_auth_hash = hmac.new(
  100. unhexlify(auth_key), string_to_auth.encode(), 'sha256').digest()
  101. local_auth_hash = hexlify(local_auth_hash).decode()
  102. if local_auth_hash != auth_hash:
  103. print('Auth hash does not match. This could indicate tampering or '
  104. 'that something is wrong with the server. Exiting.')
  105. sys.exit(1)
  106. cipher = AES.new(unhexlify(encryption_key), AES.MODE_CBC, unhexlify(IV))
  107. result = cipher.decrypt(b64decode(ciphertext))
  108. result = result[:-result[-1]] # remove PKCS#7 padding
  109. result = result.decode()
  110. return result