Add support for scrypt

scrypt is a robust password-based key derivation function.
These set of changes implements it according to the RFC draft:

http://tools.ietf.org/html/draft-josefsson-scrypt-kdf-01

scrypt is also added to the algorithms understood by PKCS#8
(so that one can protect private keys at rest with it).

Additionally, this patch adds tests cases for PBES functions.
This commit is contained in:
Legrandin 2013-08-10 18:54:04 +02:00
parent f36ee269d4
commit 102cd21c8d
12 changed files with 535 additions and 58 deletions

View file

@ -32,6 +32,9 @@ The following mechanisms are fully supported:
* *PBKDF2WithHMAC-SHA1AndAES192-CBC*
* *PBKDF2WithHMAC-SHA1AndAES256-CBC*
* *PBKDF2WithHMAC-SHA1AndDES-EDE3-CBC*
* *scryptAndAES128-CBC*
* *scryptAndAES192-CBC*
* *scryptAndAES256-CBC*
The following mechanisms are only supported for importing keys.
They are much weaker than the ones listed above, and they are provided
@ -96,13 +99,22 @@ def wrap(private_key, key_oid, passphrase=None, protection=None,
| Key | Description |
+==================+===============================================+
| iteration_count | The KDF algorithm is repeated several times to|
| | slow down brute force attacks on passwords. |
| | The default value is 1 000. |
| | slow down brute force attacks on passwords |
| | (called *N* or CPU/memory cost in scrypt). |
| | |
| | The default value for PBKDF2 is 1 000. |
| | The default value for scrypt is 16 384. |
+------------------+-----------------------------------------------+
| salt_size | Salt is used to thwart dictionary and rainbow |
| | attacks on passwords. The default value is 8 |
| | bytes. |
+------------------+-----------------------------------------------+
| block_size | *(scrypt only)* Memory-cost (r). The default |
| | value is 8. |
+------------------+-----------------------------------------------+
| parallelization | *(scrypt only)* CPU-cost (p). The default |
| | value is 1. |
+------------------+-----------------------------------------------+
key_params : DER object
The algorithm parameters associated to the private key.

View file

@ -25,12 +25,13 @@ if sys.version_info[0] == 2 and sys.version_info[1] == 1:
from Crypto.Util.py3compat import *
from Crypto import Random
from Crypto.Util.asn1 import *
from Crypto.Util.asn1 import DerSequence, DerOctetString,\
DerObjectId, DerInteger, newDerSequence
from Crypto.Util.Padding import pad, unpad
from Crypto.Hash import MD5, SHA1
from Crypto.Cipher import DES, ARC2, DES3, AES
from Crypto.Protocol.KDF import PBKDF1, PBKDF2
from Crypto.Protocol.KDF import PBKDF1, PBKDF2, scrypt
# These are the ASN.1 definitions used by the PBES1/2 logic:
@ -170,13 +171,23 @@ class PBES2(object):
| Key | Description |
+==================+===============================================+
| iteration_count | The KDF algorithm is repeated several times to|
| | slow down brute force attacks on passwords. |
| | The default value is 1 000. |
| | slow down brute force attacks on passwords |
| | (called *N* or CPU/memory cost in scrypt). |
| | |
| | The default value for PBKDF2 is 1 000. |
| | The default value for scrypt is 16 384. |
+------------------+-----------------------------------------------+
| salt_size | Salt is used to thwart dictionary and rainbow |
| | attacks on passwords. The default value is 8 |
| | bytes. |
+------------------+-----------------------------------------------+
| block_size | *(scrypt only)* Memory-cost (r). The default |
| | value is 8. |
+------------------+-----------------------------------------------+
| parallelization | *(scrypt only)* CPU-cost (p). The default |
| | value is 1. |
+------------------+-----------------------------------------------+
randfunc : callable
Random number generation function; it should accept
@ -197,22 +208,25 @@ class PBES2(object):
if protection == 'PBKDF2WithHMAC-SHA1AndDES-EDE3-CBC':
key_size = 24
module = DES3
protection = DES3.MODE_CBC
cipher_mode = DES3.MODE_CBC
enc_oid = "1.2.840.113549.3.7"
elif protection == 'PBKDF2WithHMAC-SHA1AndAES128-CBC':
elif protection in ('PBKDF2WithHMAC-SHA1AndAES128-CBC',
'scryptAndAES128-CBC'):
key_size = 16
module = AES
protection = AES.MODE_CBC
cipher_mode = AES.MODE_CBC
enc_oid = "2.16.840.1.101.3.4.1.2"
elif protection == 'PBKDF2WithHMAC-SHA1AndAES192-CBC':
elif protection in ('PBKDF2WithHMAC-SHA1AndAES192-CBC',
'scryptAndAES192-CBC'):
key_size = 24
module = AES
protection = AES.MODE_CBC
cipher_mode = AES.MODE_CBC
enc_oid = "2.16.840.1.101.3.4.1.22"
elif protection == 'PBKDF2WithHMAC-SHA1AndAES256-CBC':
elif protection in ('PBKDF2WithHMAC-SHA1AndAES256-CBC',
'scryptAndAES256-CBC'):
key_size = 32
module = AES
protection = AES.MODE_CBC
cipher_mode = AES.MODE_CBC
enc_oid = "2.16.840.1.101.3.4.1.42"
else:
raise ValueError("Unknown mode")
@ -222,6 +236,7 @@ class PBES2(object):
salt = randfunc(prot_params.get("salt_size", 8))
# Derive key from password
if protection.startswith('PBKDF2'):
count = prot_params.get("iteration_count", 1000)
key = PBKDF2(passphrase, salt, key_size, count)
key_derivation_func = newDerSequence(
@ -231,9 +246,25 @@ class PBES2(object):
DerInteger(count)
)
)
else:
# It must be scrypt
count = prot_params.get("iteration_count", 16384)
scrypt_r = prot_params.get('block_size', 8)
scrypt_p = prot_params.get('parallelization', 1)
key = scrypt(passphrase, salt, key_size,
count, scrypt_r, scrypt_p)
key_derivation_func = newDerSequence(
DerObjectId("1.3.6.1.4.1.11591.4.11"), # scrypt
newDerSequence(
DerOctetString(salt),
DerInteger(count),
DerInteger(scrypt_r),
DerInteger(scrypt_p)
)
)
# Create cipher and use it
cipher = module.new(key, protection, iv)
cipher = module.new(key, cipher_mode, iv)
encrypted_data = cipher.encrypt(pad(data, cipher.block_size))
encryption_scheme = newDerSequence(
DerObjectId(enc_oid),
@ -292,20 +323,32 @@ class PBES2(object):
key_derivation_func[0]
).value
# For now, we only support PBKDF2
if key_derivation_oid != "1.2.840.113549.1.5.12":
raise ValueError("Unknown KDF")
# We only support PBKDF2 or scrypt
if key_derivation_oid == "1.2.840.113549.1.5.12":
pbkdf2_params = decode_der(DerSequence, key_derivation_func[1])
salt = decode_der(DerOctetString, pbkdf2_params[0]).payload
iteration_count = pbkdf2_params[1]
if len(pbkdf2_params) > 2:
pbkdf2_key_length = pbkdf2_params[2]
kdf_key_length = pbkdf2_params[2]
else:
pbkdf2_key_length = None
kdf_key_length = None
if len(pbkdf2_params) > 3:
raise ValueError("Unsupported PRF for PBKDF2")
elif key_derivation_oid == "1.3.6.1.4.1.11591.4.11":
scrypt_params = decode_der(DerSequence, key_derivation_func[1])
salt = decode_der(DerOctetString, scrypt_params[0]).payload
iteration_count, scrypt_r, scrypt_p = [scrypt_params[x]
for x in (1, 2, 3)]
if len(scrypt_params) > 4:
kdf_key_length = scrypt_params[4]
else:
kdf_key_length = None
else:
raise ValueError("Unknown KDF")
### Cipher selection
encryption_scheme = decode_der(DerSequence, pbes2_params[1])
encryption_oid = decode_der(
@ -332,14 +375,18 @@ class PBES2(object):
else:
raise ValueError("Unsupported cipher")
if pbkdf2_key_length and pbkdf2_key_length != key_size:
raise ValueError("Mismatch between PBKDF2 parameters"
if kdf_key_length and kdf_key_length != key_size:
raise ValueError("Mismatch between KDF parameters"
" and selected cipher")
IV = decode_der(DerOctetString, encryption_scheme[1]).payload
# Create cipher
if key_derivation_oid == "1.2.840.113549.1.5.12": # PBKDF2
key = PBKDF2(passphrase, salt, key_size, iteration_count)
else:
key = scrypt(passphrase, salt, key_size, iteration_count,
scrypt_r, scrypt_p)
cipher = ciphermod.new(key, ciphermod.MODE_CBC, IV)
# Decrypt data

View file

@ -43,9 +43,11 @@ if sys.version_info[0] == 2 and sys.version_info[1] == 1:
from Crypto.Util.py21compat import *
from Crypto.Util.py3compat import *
from Crypto.Hash import SHA1, HMAC, CMAC
from Crypto.Cipher import _Salsa20
from Crypto.Hash import SHA1, SHA256, HMAC, CMAC
from Crypto.Util.strxor import strxor
from Crypto.Util.number import long_to_bytes, bytes_to_long
from Crypto.Util.number import size as bit_size, long_to_bytes, bytes_to_long
from Crypto.Util.number import bytes_to_long_le
def PBKDF1(password, salt, dkLen, count=1000, hashAlgo=None):
"""Derive one key from a password (or passphrase).
@ -91,8 +93,8 @@ def PBKDF1(password, salt, dkLen, count=1000, hashAlgo=None):
def PBKDF2(password, salt, dkLen=16, count=1000, prf=None):
"""Derive one or more keys from a password (or passphrase).
This performs key derivation according to the PKCS#5 standard (v2.0),
by means of the ``PBKDF2`` algorithm.
This function performs key derivation according to
the PKCS#5 standard (v2.0), by means of the ``PBKDF2`` algorithm.
:Parameters:
password : string
@ -126,6 +128,7 @@ def PBKDF2(password, salt, dkLen=16, count=1000, prf=None):
i = i + 1
return key[:dkLen]
class _S2V(object):
"""String-to-vector PRF as defined in `RFC5297`_.
@ -207,3 +210,104 @@ class _S2V(object):
final = strxor(padded, self._double(self._cache))
mac = CMAC.new(self._key, msg=final, ciphermod=self._ciphermod)
return mac.digest()
def _scryptBlockMix(blocks):
"""Hash function for ROMix."""
x = blocks[-1]
core = _Salsa20._salsa20_8_core
result = [None]*len(blocks)
for i in xrange(len(blocks)):
x = core(strxor(x, blocks[i]))
result[i] = x
return [result[i + j] for j in xrange(2)
for i in xrange(0, len(blocks), 2)]
def _scryptROMix(blocks, n):
"""Sequential memory-hard function for scrypt."""
x = [blocks[i:i + 64] for i in xrange(0, len(blocks), 64)]
len_x = len(x)
v = []
for i in xrange(n):
v.append(x)
x = _scryptBlockMix(x)
for i in xrange(n):
j = bytes_to_long_le(x[-1]) & (n - 1)
t = [strxor(x[idx], v[j][idx]) for idx in xrange(len_x)]
x = _scryptBlockMix(t)
return b("").join(x)
def scrypt(password, salt, key_len, N, r, p, num_keys=1):
"""Derive one or more keys from a passphrase.
This function performs key derivation according to
the `scrypt`_ algorithm, introduced in Percival's paper
`"Stronger key derivation via sequential memory-hard functions"`__.
This implementation is based on the `RFC draft`__.
:Parameters:
password : string
The secret pass phrase to generate the keys from.
salt : string
A string to use for better protection from dictionary attacks.
This value does not need to be kept secret,
but it should be randomly chosen for each derivation.
It is recommended to be at least 8 bytes long.
key_len : integer
The length in bytes of every derived key.
N : integer
CPU/Memory cost parameter. It must be a power of 2 and less
than ``2**(16r)``.
r : integer
Block size parameter.
p : integer
Parallelization parameter.
It must be no greater than ``(2**32-1)/(4r)``.
num_keys : integer
The number of keys to derive. Every key is ``key_len`` bytes long.
By default, only 1 key is generated.
The maximum cumulative length of all keys is ``(2**32-1)*32``
(that is, 128TB).
A good choice of parameters *(N, r , p)* was suggested
by Colin Percival in his `presentation in 2009`__:
- *(16384, 8, 1)* for interactive logins (<=100ms)
- *(1048576, 8, 1)* for file encryption (<=5s)
:Return: A byte string or a tuple of byte strings.
.. _scrypt: http://www.tarsnap.com/scrypt.html
.. __: http://www.tarsnap.com/scrypt/scrypt.pdf
.. __: http://tools.ietf.org/html/draft-josefsson-scrypt-kdf-01
.. __: http://www.tarsnap.com/scrypt/scrypt-slides.pdf
"""
if 2 ** (bit_size(N) - 1) != N:
raise ValueError("N must be a power of 2")
if N >= 2L ** (16 * r):
raise ValueError("N is too big (or r is too small)")
if p > divmod((2L ** 32 - 1) * 32, 128 * r)[0]:
raise ValueError("p or r are too big")
prf_hmac_sha256 = lambda p, s: HMAC.new(p, s, SHA256).digest()
blocks = PBKDF2(password, salt, p * 128 * r, 1, prf=prf_hmac_sha256)
blocks = b("").join([_scryptROMix(blocks[x:x + 128 * r], N)
for x in xrange(0, len(blocks), 128 * r)])
dk = PBKDF2(password, blocks, key_len * num_keys, 1,
prf=prf_hmac_sha256)
if num_keys == 1:
return dk
kol = [dk[idx:idx + key_len]
for idx in xrange(0, key_len * num_keys, key_len)]
return kol

View file

@ -24,6 +24,7 @@
def get_tests(config={}):
tests = []
from Crypto.SelfTest.IO import test_PKCS8; tests += test_PKCS8.get_tests(config=config)
from Crypto.SelfTest.IO import test_PBES; tests += test_PBES.get_tests(config=config)
return tests
if __name__ == '__main__':

View file

@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
#
# SelfTest/IO/test_PBES.py: Self-test for the _PBES module
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
"""Self-tests for Crypto.IO._PBES module"""
import unittest
import sys
if sys.version_info[0] == 2 and sys.version_info[1] == 1:
from Crypto.Util.py21compat import *
from Crypto.Util.py3compat import *
from Crypto.IO._PBES import PBES2
class TestPBES2(unittest.TestCase):
def setUp(self):
self.ref = b("Test data")
self.passphrase = b("Passphrase")
def test1(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'PBKDF2WithHMAC-SHA1AndDES-EDE3-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)
def test2(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'PBKDF2WithHMAC-SHA1AndAES128-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)
def test3(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'PBKDF2WithHMAC-SHA1AndAES192-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)
def test4(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'scryptAndAES128-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)
def test5(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'scryptAndAES192-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)
def test6(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'scryptAndAES256-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)
def get_tests(config={}):
from Crypto.SelfTest.st_common import list_test_cases
listTests = []
listTests += list_test_cases(TestPBES2)
return listTests
if __name__ == '__main__':
suite = lambda: unittest.TestSuite(get_tests())
unittest.main(defaultTest='suite')

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# SelfTest/PublicKey/test_PKCS8.py: Self-test for the PKCS8 module
# SelfTest/IO/test_PKCS8.py: Self-test for the PKCS8 module
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
@ -20,7 +20,7 @@
# SOFTWARE.
# ===================================================================
"""Self-tests for Crypto.PublicKey.PKCS8 module"""
"""Self-tests for Crypto.IO.PKCS8 module"""
__revision__ = "$Id$"

View file

@ -28,12 +28,18 @@ from binascii import unhexlify
from Crypto.Util.py3compat import *
from Crypto.SelfTest.st_common import list_test_cases
from Crypto.Hash import SHA1, HMAC
from Crypto.Hash import SHA1, HMAC, SHA256
from Crypto.Cipher import AES, DES3
from Crypto.Protocol.KDF import PBKDF1, PBKDF2, _S2V
from Crypto.Protocol.KDF import PBKDF1, PBKDF2, _S2V, scrypt
def t2b(t): return unhexlify(b(t))
def t2b(t):
t2 = t.replace(" ", "").replace("\n", "")
return unhexlify(b(t2))
class TestVector():
pass
class PBKDF1_Tests(unittest.TestCase):
@ -88,6 +94,32 @@ class PBKDF2_Tests(unittest.TestCase):
self.assertEqual(res, t2b(v[4]))
self.assertEqual(res, res2)
def test2(self):
"""From draft-josefsson-scrypt-kdf-01, Chapter 10"""
output_1 = t2b("""
55 ac 04 6e 56 e3 08 9f ec 16 91 c2 25 44 b6 05
f9 41 85 21 6d de 04 65 e6 8b 9d 57 c2 0d ac bc
49 ca 9c cc f1 79 b6 45 99 16 64 b3 9d 77 ef 31
7c 71 b8 45 b1 e3 0b d5 09 11 20 41 d3 a1 97 83
""")
output_2 = t2b("""
4d dc d8 f6 0b 98 be 21 83 0c ee 5e f2 27 01 f9
64 1a 44 18 d0 4c 04 14 ae ff 08 87 6b 34 ab 56
a1 d4 25 a1 22 58 33 54 9a db 84 1b 51 c9 b3 17
6a 27 2b de bb a1 d0 78 47 8f 62 b3 97 f3 3c 8d
""")
prf_hmac_sha256 = lambda p, s: HMAC.new(p, s, SHA256).digest()
output = PBKDF2(b("passwd"), b("salt"), 64, 1, prf=prf_hmac_sha256)
self.assertEqual(output, output_1)
output = PBKDF2(b("Password"), b("NaCl"), 64, 80000, prf=prf_hmac_sha256)
self.assertEqual(output, output_2)
class S2V_Tests(unittest.TestCase):
# Sequence of test vectors.
@ -144,11 +176,136 @@ class S2V_Tests(unittest.TestCase):
s2v.update(b("XX"))
self.assertRaises(TypeError, s2v.update, b("YY"))
class scrypt_Tests(unittest.TestCase):
# Test vectors taken from
# http://tools.ietf.org/html/draft-josefsson-scrypt-kdf-00
test_vectors = (
(
"",
"",
16, # 2K
1,
1,
"""
77 d6 57 62 38 65 7b 20 3b 19 ca 42 c1 8a 04 97
f1 6b 48 44 e3 07 4a e8 df df fa 3f ed e2 14 42
fc d0 06 9d ed 09 48 f8 32 6a 75 3a 0f c8 1f 17
e8 d3 e0 fb 2e 0d 36 28 cf 35 e2 0c 38 d1 89 06
"""
),
(
"password",
"NaCl",
1024, # 1M
8,
16,
"""
fd ba be 1c 9d 34 72 00 78 56 e7 19 0d 01 e9 fe
7c 6a d7 cb c8 23 78 30 e7 73 76 63 4b 37 31 62
2e af 30 d9 2e 22 a3 88 6f f1 09 27 9d 98 30 da
c7 27 af b9 4a 83 ee 6d 83 60 cb df a2 cc 06 40
"""
),
(
"pleaseletmein",
"SodiumChloride",
16384, # 16M
8,
1,
"""
70 23 bd cb 3a fd 73 48 46 1c 06 cd 81 fd 38 eb
fd a8 fb ba 90 4f 8e 3e a9 b5 43 f6 54 5d a1 f2
d5 43 29 55 61 3f 0f cf 62 d4 97 05 24 2a 9a f9
e6 1e 85 dc 0d 65 1e 40 df cf 01 7b 45 57 58 87
"""
),
(
"pleaseletmein",
"SodiumChloride",
1048576, # 1G
8,
1,
"""
21 01 cb 9b 6a 51 1a ae ad db be 09 cf 70 f8 81
ec 56 8d 57 4a 2f fd 4d ab e5 ee 98 20 ad aa 47
8e 56 fd 8f 4b a5 d0 9f fa 1c 6d 92 7c 40 f4 c3
37 30 40 49 e8 a9 52 fb cb f4 5c 6f a7 7a 41 a4
"""
),
)
def setUp(self):
new_test_vectors = []
for tv in self.test_vectors:
new_tv = TestVector()
new_tv.P = b(tv[0])
new_tv.S = b(tv[1])
new_tv.N = tv[2]
new_tv.r = tv[3]
new_tv.p = tv[4]
new_tv.output = t2b(tv[5])
new_tv.dkLen = len(new_tv.output)
new_test_vectors.append(new_tv)
self.test_vectors = new_test_vectors
def _test1(self):
b_input = t2b("""
f7 ce 0b 65 3d 2d 72 a4 10 8c f5 ab e9 12 ff dd
77 76 16 db bb 27 a7 0e 82 04 f3 ae 2d 0f 6f ad
89 f6 8f 48 11 d1 e8 7b cc 3b d7 40 0a 9f fd 29
09 4f 01 84 63 95 74 f3 9a e5 a1 31 52 17 bc d7
89 49 91 44 72 13 bb 22 6c 25 b5 4d a8 63 70 fb
cd 98 43 80 37 46 66 bb 8f fc b5 bf 40 c2 54 b0
67 d2 7c 51 ce 4a d5 fe d8 29 c9 0b 50 5a 57 1b
7f 4d 1c ad 6a 52 3c da 77 0e 67 bc ea af 7e 89
""")
b_output = t2b("""
79 cc c1 93 62 9d eb ca 04 7f 0b 70 60 4b f6 b6
2c e3 dd 4a 96 26 e3 55 fa fc 61 98 e6 ea 2b 46
d5 84 13 67 3b 99 b0 29 d6 65 c3 57 60 1f b4 26
a0 b2 f4 bb a2 00 ee 9f 0a 43 d1 9b 57 1a 9c 71
ef 11 42 e6 5d 5a 26 6f dd ca 83 2c e5 9f aa 7c
ac 0b 9c f1 be 2b ff ca 30 0d 01 ee 38 76 19 c4
ae 12 fd 44 38 f2 03 a0 e4 e1 c4 7e c3 14 86 1f
4e 90 87 cb 33 39 6a 68 73 e8 f9 d2 53 9a 4b 8e
""")
from Crypto.Protocol.KDF import _scryptROMix
output = _scryptROMix(b_input, 16)
self.assertEqual(output, b_output)
def test2(self):
for tv in self.test_vectors:
# TODO: add runtime flag to enable test vectors
# with humongous memory usage
if tv.N > 100000:
continue
output = scrypt(tv.P, tv.S, tv.dkLen, tv.N, tv.r, tv.p)
self.assertEqual(output, tv.output)
def test3(self):
ref = scrypt(b("password"), b("salt"), 12, 16, 1, 1)
# Same output, but this time split over 2 keys
key1, key2 = scrypt(b("password"), b("salt"), 6, 16, 1, 1, 2)
self.assertEqual((ref[:6], ref[6:]), (key1, key2))
# Same output, but this time split over 3 keys
key1, key2, key3 = scrypt(b("password"), b("salt"), 4, 16, 1, 1, 3)
self.assertEqual((ref[:4], ref[4:8], ref[8:]), (key1, key2, key3))
def get_tests(config={}):
tests = []
tests += list_test_cases(PBKDF1_Tests)
tests += list_test_cases(PBKDF2_Tests)
tests += list_test_cases(S2V_Tests)
tests += list_test_cases(scrypt_Tests)
return tests
if __name__ == '__main__':

View file

@ -29,6 +29,7 @@ __revision__ = "$Id$"
import sys
if sys.version_info[0] == 2 and sys.version_info[1] == 1:
from Crypto.Util.py21compat import *
from Crypto.Util.py3compat import *
import unittest
@ -279,6 +280,14 @@ class MiscTests(unittest.TestCase):
self.assertEqual(number.size(0xa2ba40),8*3)
self.assertEqual(number.size(0xa2ba40ee07e3b2bd2f02ce227f36a195024486e49c19cb41bbbdfbba98b22b0e577c2eeaffa20d883a76e65e394c69d4b3c05a1e8fadda27edb2a42bc000fe888b9b32c22d15add0cd76b3e7936e19955b220dd17d4ea904b1ec102b2e4de7751222aa99151024c7cb41cc5ea21d00eeb41f7c800834d2c6e06bce3bce7ea9a5L), 1024)
def test_bytes_to_long_le(self):
result = number.bytes_to_long_le(b("\x01"))
self.assertEqual(result, 1)
result = number.bytes_to_long_le(b("\x01\x02"))
self.assertEqual(result, 0x201)
result = number.bytes_to_long_le(b("\x01\x02\x00\x00\xFF"))
self.assertEqual(result, 0xFF00000201L)
class FastmathTests(unittest.TestCase):
def setUp(self):
global number

View file

@ -416,7 +416,7 @@ def long_to_bytes(n, blocksize=0):
def bytes_to_long(s):
"""bytes_to_long(string) : long
Convert a byte string to a long integer.
Convert a byte string to a long integer (big endian).
This is (essentially) the inverse of long_to_bytes().
"""
@ -431,6 +431,18 @@ def bytes_to_long(s):
acc = (acc << 32) + unpack('>I', s[i:i+4])[0]
return acc
def bytes_to_long_le(s):
acc = 0L
unpack = struct.unpack
length = len(s)
if length & 3:
extra = 4 - (length & 3)
s = s + b('\000') * extra
length += extra
for i in range(0, length, 4):
acc = acc | (unpack('<I', s[i:i+4])[0] << (32*(i>>2)))
return acc
# For backwards compatibility...
import warnings
def long2str(n, blocksize=0):

View file

@ -47,6 +47,8 @@ static const char tau[16] = "expand 16-byte k";
#define PLUS(v,w) (U32V((v) + (w)))
#define PLUSONE(v) (PLUS((v),1))
#define _MODULE_CUSTOM_FUNCTION _salsa20_8_core
typedef struct
{
uint32_t input[16];
@ -55,16 +57,11 @@ typedef struct
} stream_state;
static void
_salsa20_block (stream_state *self)
_salsa20_block(int rounds, uint32_t *input, uint8_t *output)
{
uint32_t x0, x1, x2, x3, x4, x5, x6, x7;
uint32_t x8, x9, x10, x11, x12, x13, x14, x15;
uint8_t i;
uint32_t *input;
uint8_t *output;
input = self->input;
output = self->block;
x0 = input[0];
x1 = input[1];
@ -83,7 +80,7 @@ _salsa20_block (stream_state *self)
x14 = input[14];
x15 = input[15];
for (i = ROUNDS; i > 0; i -= 2) {
for (i = rounds; i > 0; i -= 2) {
/* Column round */
x4 = XOR ( x4, ROTATE (PLUS ( x0,x12), 7));
x8 = XOR ( x8, ROTATE (PLUS ( x4, x0), 9));
@ -163,6 +160,45 @@ _salsa20_block (stream_state *self)
}
}
static PyObject *
ALG_salsa20_8_core(PyObject *self, PyObject *args, PyObject *kwdict)
{
PyObject *input_str;
Py_ssize_t len_input;
uint8_t *input_bytes;
uint32_t input_32[16];
PyObject *output;
int i;
output = NULL;
if (!PyArg_ParseTuple(args, "S", &input_str)) {
goto out;
}
len_input = PyBytes_GET_SIZE(input_str);
if (len_input != 64) {
goto out;
}
output = PyBytes_FromStringAndSize(NULL, 64);
if (!output) {
goto out;
}
input_bytes = (uint8_t*)PyBytes_AS_STRING(input_str);
for (i=0; i<16; i++) {
U8TO32_LITTLE(input_32[i], &input_bytes[i*4]);
}
Py_BEGIN_ALLOW_THREADS;
_salsa20_block(8, input_32, (uint8_t*)PyBytes_AS_STRING(output) );
Py_END_ALLOW_THREADS;
out:
return output;
}
static void
stream_init (stream_state *self, unsigned char *key, int keylen,
unsigned char *IV, int IVlen)
@ -220,15 +256,15 @@ stream_init (stream_state *self, unsigned char *key, int keylen,
#define stream_decrypt stream_encrypt
static void
stream_encrypt (stream_state *self, unsigned char *block, int len)
stream_encrypt (stream_state *self, unsigned char *buffer, int len)
{
int i;
for (i = 0; i < len; ++i) {
if (self->blockindex == 64) {
self->blockindex = 0;
_salsa20_block (self);
_salsa20_block(ROUNDS, self->input, self->block);
}
block[i] ^= self->block[self->blockindex];
buffer[i] ^= self->block[self->blockindex];
self->blockindex ++;
}
}

View file

@ -242,6 +242,19 @@ static struct PyMethodDef modulemethods[] =
{
{"new", (PyCFunction) ALGnew,
METH_VARARGS|METH_KEYWORDS, ALGnew__doc__},
#ifdef _MODULE_CUSTOM_FUNCTION
#define XSTR(s) #s
#define XJOIN(a,b) a##b
#define STR(s) XSTR(s)
#define JOIN(a,b) XJOIN(a,b)
{STR(_MODULE_CUSTOM_FUNCTION) , (PyCFunction)
JOIN(ALG,_MODULE_CUSTOM_FUNCTION),
METH_VARARGS|METH_KEYWORDS, NULL},
#undef XSTR
#undef XJOIN
#undef STR
#undef JOIN
#endif
{NULL, NULL} /* sentinel */
};