More tests for ChaCha20-Poly1305

This commit is contained in:
Helder Eijs 2018-10-06 22:13:50 +02:00
parent 6edec68cfd
commit 047c700228
5 changed files with 2605 additions and 26 deletions

View file

@ -35,7 +35,7 @@ from Crypto.Hash import Poly1305, BLAKE2s
from Crypto.Random import get_random_bytes
from Crypto.Number.number import long_to_bytes
from Crypto.Util.number import long_to_bytes
from Crypto.Util.py3compat import _copy_bytes, bord
@ -66,10 +66,10 @@ class ChaCha20Poly1305Cipher(object):
self._next = (self.update, self.encrypt, self.decrypt, self.digest,
self.verify)
self._authenticator = Poly1305(key=key, nonce=nonce, cipher=ChaCha20)
self._authenticator = Poly1305.new(key=key, nonce=nonce, cipher=ChaCha20)
self._cipher = ChaCha20(key=key, nonce=nonce)
self._cipher.seek(16) # Block counter starts at 1
self._cipher = ChaCha20.new(key=key, nonce=nonce)
self._cipher.seek(64) # Block counter starts at 1
self._len_aad = 0
self._len_ct = 0
@ -79,11 +79,18 @@ class ChaCha20Poly1305Cipher(object):
def update(self, data):
if self.update not in self._next:
raise ValueError("update() method cannot be called")
raise TypeError("update() method cannot be called")
self._len_aad += len(data)
self._authenticator.update(data)
def _pad_aad(self):
assert(self._status == _CipherStatus.PROCESSING_AUTH_DATA)
if self._len_aad & 0x0F:
self._authenticator.update(b'\x00' * (16 - (self._len_aad & 0x0F)))
self._status = _CipherStatus.PROCESSING_CIPHERTEXT
def encrypt(self, plaintext):
"""Encrypt a piece of data.
@ -97,9 +104,7 @@ class ChaCha20Poly1305Cipher(object):
raise TypeError("encrypt() method cannot be called")
if self._status == _CipherStatus.PROCESSING_AUTH_DATA:
if self._len_aad & 0x0F:
self.update(b'\x00' * (16 - (self.len_aad & 0x0F)))
self._status = _CipherStatus.PROCESSING_CIPHERTEXT
self._pad_aad()
self._next = (self.encrypt, self.digest)
@ -121,33 +126,43 @@ class ChaCha20Poly1305Cipher(object):
raise TypeError("decrypt() method cannot be called")
if self._status == _CipherStatus.PROCESSING_AUTH_DATA:
if self._len_add & 0x0F:
self.update(b'\x00' * (16 - (self.len_add & 0x0F)))
self._status = _CipherStatus.PROCESSING_CIPHERTEXT
self._pad_aad()
self._next = (self.decrypt, self.verify)
self._len_ct += len(ciphertext)
self._authenticator.update(ciphertext)
return self._cipher.decrypt(ciphertext)
def _compute_mac(self):
"""Finalize the cipher (if not done already and return the MAC"""
if self._mac_tag:
assert(self._status == _CipherStatus.PROCESSING_DONE)
return self._mac_tag
assert(self._status != _CipherStatus.PROCESSING_DONE)
if self._status == _CipherStatus.PROCESSING_AUTH_DATA:
self._pad_aad()
if self._len_ct & 0x0F:
self._authenticator.update(b'\x00' * (16 - (self._len_ct & 0x0F)))
self._status = _CipherStatus.PROCESSING_DONE
self._authenticator.update(long_to_bytes(self._len_aad, 8)[::-1])
self._authenticator.update(long_to_bytes(self._len_ct, 8)[::-1])
self._mac_tag = self._authenticator.digest()
return self._mac_tag
def digest(self):
if self.digest not in self._next:
raise TypeError("digest() method cannot be called")
self._next = (self.digest)
self._next = (self.digest,)
if self._mac_tag:
return self._mac_tag
if self._len_ct & 0x0F:
self._authenticator.update(b'\x00' * (16 - (self._len_ct & 0x0F)))
self._status = _CipherStatus.PROCESSING_DONE
self._authenticator.update(long_to_bytes(self._len_aad, 16)[::-1])
self._authenticator.update(long_to_bytes(self._len_ct, 16)[::-1])
self._mac_tag = self._authenticator.digest()
return self._mac_tag
return self._compute_mac()
def hexdigest(self):
"""Compute the *printable* MAC tag.
@ -178,12 +193,14 @@ class ChaCha20Poly1305Cipher(object):
if self.verify not in self._next:
raise TypeError("verify() cannot be called"
" when encrypting a message")
self._next = (self.verify)
self._next = (self.verify,)
secret = get_random_bytes(16)
self._compute_mac()
mac1 = BLAKE2s.new(digest_bits=160, key=secret,
data=self._compute_mac())
data=self._mac_tag)
mac2 = BLAKE2s.new(digest_bits=160, key=secret,
data=received_mac_tag)
@ -275,6 +292,9 @@ def new(**kwargs):
if len(nonce) not in (8, 12):
raise ValueError("Nonce must be 8 or 12 bytes long")
if isinstance(nonce, unicode):
raise TypeError("nonce must be a byte string")
if kwargs:
raise TypeError("Unknown parameters: " + str(kwargs))