Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 92 additions & 21 deletions attic/crypto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ from libc.stdlib cimport malloc, free

API_VERSION = 2

TAG_SIZE = 16 # bytes; 128 bits is the maximum allowed value. see "hack" below.
IV_SIZE = 16 # bytes; 128 bits

cdef extern from "openssl/rand.h":
int RAND_bytes(unsigned char *buf,int num)
int RAND_bytes(unsigned char *buf, int num)


cdef extern from "openssl/evp.h":
Expand All @@ -22,19 +25,28 @@ cdef extern from "openssl/evp.h":
ctypedef struct ENGINE:
pass
const EVP_MD *EVP_sha256()
const EVP_CIPHER *EVP_aes_256_ctr()
const EVP_CIPHER *EVP_aes_256_gcm()
void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a)
void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a)

int EVP_EncryptInit_ex(EVP_CIPHER_CTX *ctx,const EVP_CIPHER *cipher, ENGINE *impl,
int EVP_EncryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
const unsigned char *key, const unsigned char *iv)
int EVP_EncryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out,
int *outl, const unsigned char *in_, int inl)

int EVP_DecryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
const unsigned char *key, const unsigned char *iv)
int EVP_EncryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl,
const unsigned char *in_, int inl)
int EVP_DecryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl,
const unsigned char *in_, int inl)
int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
int EVP_CIPHER_CTX_ctrl(EVP_CIPHER_CTX *ctx, int type, int arg, unsigned char *ptr)
int PKCS5_PBKDF2_HMAC(const char *password, int passwordlen,
const unsigned char *salt, int saltlen, int iter,
const EVP_MD *digest,
int keylen, unsigned char *out)
int EVP_CTRL_GCM_GET_TAG
int EVP_CTRL_GCM_SET_TAG
int EVP_CTRL_GCM_SET_IVLEN

import struct

Expand All @@ -47,7 +59,8 @@ long_to_bytes = lambda x: _long.pack(x)


def num_aes_blocks(length):
"""Return the number of AES blocks required to encrypt/decrypt *length* bytes of data
"""Return the number of AES blocks required to encrypt/decrypt *length* bytes of data.
Note: this is only correct for modes without padding, like AES-CTR.
"""
return (length + 15) // 16

Expand Down Expand Up @@ -85,11 +98,19 @@ cdef class AES:
"""A thin wrapper around the OpenSSL EVP cipher API
"""
cdef EVP_CIPHER_CTX ctx
cdef int is_encrypt

def __cinit__(self, key, iv=None):
def __cinit__(self, is_encrypt, key, iv=None):
EVP_CIPHER_CTX_init(&self.ctx)
if not EVP_EncryptInit_ex(&self.ctx, EVP_aes_256_ctr(), NULL, NULL, NULL):
raise Exception('EVP_EncryptInit_ex failed')
self.is_encrypt = is_encrypt
# Set cipher type and mode
cipher_mode = EVP_aes_256_gcm()
if self.is_encrypt:
if not EVP_EncryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
raise Exception('EVP_EncryptInit_ex failed')
else: # decrypt
if not EVP_DecryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
raise Exception('EVP_DecryptInit_ex failed')
self.reset(key, iv)

def __dealloc__(self):
Expand All @@ -102,24 +123,74 @@ cdef class AES:
key2 = key
if iv:
iv2 = iv
if not EVP_EncryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
raise Exception('EVP_EncryptInit_ex failed')

@property
def iv(self):
return self.ctx.iv[:16]
# Set IV length (bytes)
if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_IVLEN, IV_SIZE, NULL):
raise Exception('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
# Initialise key and IV
if self.is_encrypt:
if not EVP_EncryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
raise Exception('EVP_EncryptInit_ex failed')
else: # decrypt
if not EVP_DecryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
raise Exception('EVP_DecryptInit_ex failed')

def add(self, aad):
cdef int aadl = len(aad)
cdef int outl
# Zero or more calls to specify any AAD
if self.is_encrypt:
if not EVP_EncryptUpdate(&self.ctx, NULL, &outl, aad, aadl):
raise Exception('EVP_EncryptUpdate failed')
else: # decrypt
if not EVP_DecryptUpdate(&self.ctx, NULL, &outl, aad, aadl):
raise Exception('EVP_DecryptUpdate failed')

def encrypt(self, data):
def compute_tag_and_encrypt(self, data):
cdef int inl = len(data)
cdef int outl
cdef unsigned char *out = <unsigned char *>malloc(inl)
cdef int ctl = 0
cdef int outl = 0
# note: modes that use padding, need up to one extra AES block (16B)
cdef unsigned char *out = <unsigned char *>malloc(inl+16)
cdef unsigned char *tag = <unsigned char *>malloc(TAG_SIZE)
if not out:
raise MemoryError
try:
if not EVP_EncryptUpdate(&self.ctx, out, &outl, data, inl):
raise Exception('EVP_EncryptUpdate failed')
return out[:inl]
ctl = outl
if not EVP_EncryptFinal_ex(&self.ctx, out+ctl, &outl):
raise Exception('EVP_EncryptFinal failed')
ctl += outl
# Get tag
if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_GET_TAG, TAG_SIZE, tag):
raise Exception('EVP_CIPHER_CTX_ctrl GET TAG failed')
# hack: caller wants 32B tags (256b), so we give back that amount
return (tag[:TAG_SIZE] + b'\x00'*16), out[:ctl]
finally:
free(tag)
free(out)
decrypt = encrypt

def check_tag_and_decrypt(self, tag, data):
cdef int inl = len(data)
cdef int ptl = 0
cdef int outl = 0
# note: modes that use padding, need up to one extra AES block (16B).
# This is what the openssl docs say. I am not sure this is correct,
# but OTOH it will not cause any harm if our buffer is a little bigger.
cdef unsigned char *out = <unsigned char *>malloc(inl+16)
if not out:
raise MemoryError
try:
if not EVP_DecryptUpdate(&self.ctx, out, &outl, data, inl):
raise Exception('EVP_DecryptUpdate failed')
ptl = outl
# Set expected tag value.
if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_TAG, TAG_SIZE, tag):
raise Exception('EVP_CIPHER_CTX_ctrl SET TAG failed')
if EVP_DecryptFinal_ex(&self.ctx, out+ptl, &outl) <= 0:
# a failure here means corrupted / tampered tag or data
raise Exception('EVP_DecryptFinal failed')
ptl += outl
return out[:ptl]
finally:
free(out)
77 changes: 50 additions & 27 deletions attic/key.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self):
self.TYPE_STR = bytes([self.TYPE])

def id_hash(self, data):
"""Return HMAC hash using the "id" HMAC key
"""Return a HASH (no id_key) or a MAC (using the "id_key" key)
"""

def encrypt(self, data):
Expand Down Expand Up @@ -92,9 +92,9 @@ def decrypt(self, id, data):
class AESKeyBase(KeyBase):
"""Common base class shared by KeyfileKey and PassphraseKey

Chunks are encrypted using 256bit AES in Counter Mode (CTR)
Chunks are encrypted using 256bit AES in Galois Counter Mode (GCM)

Payload layout: TYPE(1) + HMAC(32) + NONCE(8) + CIPHERTEXT
Payload layout: TYPE(1) + TAG(32) + NONCE(8) + CIPHERTEXT

To reduce payload size only 8 bytes of the 16 bytes nonce is saved
in the payload, the first 8 bytes are always zeros. This does not
Expand All @@ -105,47 +105,70 @@ class AESKeyBase(KeyBase):
PAYLOAD_OVERHEAD = 1 + 32 + 8 # TYPE + HMAC + NONCE

def id_hash(self, data):
"""Return HMAC hash using the "id" HMAC key
"""
return HMAC(self.id_key, data, sha256).digest()
Return GMAC using the "id_key" GMAC key

XXX do we need a cryptographic hash function here or is a keyed hash
function like GMAC / GHASH good enough? See NIST SP 800-38D.

IMPORTANT: in 1 repo, there should be only 1 kind of id_hash, otherwise
data hashed/maced with one id_hash might result in same ID as already
exists in the repo for other data created with another id_hash method.
somehow unlikely considering 128 or 256bits, but still.
"""
mac_cipher = AES(is_encrypt=True, key=self.id_key, iv=b'\0'*16) # XXX do we need an IV here?
# GMAC = aes-gcm with all data as AAD, no data as to-be-encrypted data
mac_cipher.add(bytes(data))
tag, _ = mac_cipher.compute_tag_and_encrypt(b'')
return tag

def encrypt(self, data):
data = zlib.compress(data)
self.enc_cipher.reset()
data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
hmac = HMAC(self.enc_hmac_key, data, sha256).digest()
return b''.join((self.TYPE_STR, hmac, data))
self.enc_cipher.reset(iv=self.enc_iv)
iv_last8 = self.enc_iv[8:]
self.enc_cipher.add(iv_last8)
tag, data = self.enc_cipher.compute_tag_and_encrypt(data)
# increase the IV (counter) value so same value is never used twice
current_iv = bytes_to_long(iv_last8)
self.enc_iv = PREFIX + long_to_bytes(current_iv + num_aes_blocks(len(data)))
return b''.join((self.TYPE_STR, tag, iv_last8, data))

def decrypt(self, id, data):
if data[0] != self.TYPE:
raise IntegrityError('Invalid encryption envelope')
hmac = memoryview(data)[1:33]
if memoryview(HMAC(self.enc_hmac_key, memoryview(data)[33:], sha256).digest()) != hmac:
iv_last8 = data[1+32:1+40]
iv = PREFIX + iv_last8
self.dec_cipher.reset(iv=iv)
self.dec_cipher.add(iv_last8)
tag, data = data[1:1+32], data[1+40:]
try:
data = self.dec_cipher.check_tag_and_decrypt(tag, data)
except Exception:
raise IntegrityError('Encryption envelope checksum mismatch')
self.dec_cipher.reset(iv=PREFIX + data[33:41])
data = zlib.decompress(self.dec_cipher.decrypt(data[41:])) # should use memoryview
if id and HMAC(self.id_key, data, sha256).digest() != id:
data = zlib.decompress(data)
if id and self.id_hash(data) != id:
raise IntegrityError('Chunk id verification failed')
return data

def extract_nonce(self, payload):
if payload[0] != self.TYPE:
raise IntegrityError('Invalid encryption envelope')
raise IntegrityError('Invalid encryption envelope')
nonce = bytes_to_long(payload[33:41])
return nonce

def init_from_random_data(self, data):
self.enc_key = data[0:32]
self.enc_hmac_key = data[32:64]
self.enc_hmac_key = data[32:64] # XXX enc_hmac_key not used for AES-GCM
self.id_key = data[64:96]
self.chunk_seed = bytes_to_int(data[96:100])
# Convert to signed int32
if self.chunk_seed & 0x80000000:
self.chunk_seed = self.chunk_seed - 0xffffffff - 1

def init_ciphers(self, enc_iv=b''):
self.enc_cipher = AES(self.enc_key, enc_iv)
self.dec_cipher = AES(self.enc_key)
def init_ciphers(self, enc_iv=PREFIX * 2): # default IV = 16B zero
self.enc_iv = enc_iv
self.enc_cipher = AES(is_encrypt=True, key=self.enc_key, iv=enc_iv)
self.dec_cipher = AES(is_encrypt=False, key=self.enc_key)


class PassphraseKey(AESKeyBase):
Expand Down Expand Up @@ -242,25 +265,25 @@ def load(self, filename, passphrase):
def decrypt_key_file(self, data, passphrase):
d = msgpack.unpackb(data)
assert d[b'version'] == 1
assert d[b'algorithm'] == b'sha256'
assert d[b'algorithm'] == b'gmac'
key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32)
data = AES(key).decrypt(d[b'data'])
if HMAC(key, data, sha256).digest() != d[b'hash']:
try:
data = AES(is_encrypt=False, key=key, iv=b'\0'*16).check_tag_and_decrypt(d[b'hash'], d[b'data'])
return data
except Exception:
return None
return data

def encrypt_key_file(self, data, passphrase):
salt = get_random_bytes(32)
iterations = 100000
key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32)
hash = HMAC(key, data, sha256).digest()
cdata = AES(key).encrypt(data)
tag, cdata = AES(is_encrypt=True, key=key, iv=b'\0'*16).compute_tag_and_encrypt(data)
d = {
'version': 1,
'salt': salt,
'iterations': iterations,
'algorithm': 'sha256',
'hash': hash,
'algorithm': 'gmac',
'hash': tag,
'data': cdata,
}
return msgpack.packb(d)
Expand Down
24 changes: 15 additions & 9 deletions attic/testsuite/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@ def test_get_random_bytes(self):
self.assert_equal(len(bytes2), 10)
self.assert_not_equal(bytes, bytes2)

def test_aes(self):
def test_aes_gcm(self):
key = b'X' * 32
iv = b'A' * 16
data = b'foo' * 10
aes = AES(key)
self.assert_equal(bytes_to_long(aes.iv, 8), 0)
cdata = aes.encrypt(data)
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(bytes_to_long(aes.iv, 8), 2)
self.assert_not_equal(data, aes.decrypt(cdata))
aes.reset(iv=b'\0' * 16)
self.assert_equal(data, aes.decrypt(cdata))
# encrypt
aes = AES(is_encrypt=True, key=key, iv=iv)
tag, cdata = aes.compute_tag_and_encrypt(data)
self.assert_equal(hexlify(tag), b'c98aa10eb6b7031bcc2160878d9438fb00000000000000000000000000000000')
self.assert_equal(hexlify(cdata), b'841bcce405df769d22ee9f7f012edf5dc7fb2594d924c7400ffd050f2741')
# decrypt (correct tag/cdata)
aes = AES(is_encrypt=False, key=key, iv=iv)
pdata = aes.check_tag_and_decrypt(tag, cdata)
self.assert_equal(data, pdata)
# decrypt (incorrect tag/cdata)
aes = AES(is_encrypt=False, key=key, iv=iv)
cdata = b'x' + cdata[1:] # corrupt cdata
self.assertRaises(Exception, aes.check_tag_and_decrypt, tag, cdata)
Loading