@@ -0,0 +1,7 @@ | |||
.coverage | |||
__pycache__ | |||
cryptography.egg-info | |||
pycaenv | |||
venv |
@@ -0,0 +1,16 @@ | |||
VIRTUALENV ?= python3 -m venv | |||
VRITUALENVARGS = | |||
MODULES=cryptography.tests | |||
test: venv pycaenv | |||
find cryptography -name '*.py' | entr make test-noentr | |||
test-noentr: | |||
( . ./pycaenv/bin/activate && cd cryptography && python -m unittest tests) && ( . ./venv/bin/activate && pip install -e . && python -m coverage run -m unittest $(MODULES) && coverage report --omit=p/\* -m -i) | |||
venv: | |||
($(VIRTUALENV) $(VIRTUALENVARGS) venv && . ./venv/bin/activate && pip install -r requirements.txt) | |||
pycaenv: | |||
($(VIRTUALENV) $(VIRTUALENVARGS) pycaenv && . ./pycaenv/bin/activate && pip install cryptography) |
@@ -0,0 +1,33 @@ | |||
pycryptowrap | |||
============ | |||
This is a translation layer to let software written for | |||
[cryptography](https://cryptography.io/en/latest/) to used with | |||
[pycryptodome](https://www.pycryptodome.org/). | |||
It currently only implements a minimal interface to get | |||
[pywebpush](https://github.com/web-push-libs/pywebpush) working. | |||
Currently implemented, tested, and working: | |||
* AES-GCM | |||
* ECC SECP256R1 - DSS and ECDH | |||
It shouldn't be too hard to add some additional ciphers or curves. | |||
Testing | |||
======= | |||
The `Makefile` does the neccessary work to build an environment. To | |||
run the tests: | |||
``` | |||
make test-noentr | |||
``` | |||
This will first run the tests against the cryptography module, and then | |||
run the tests against this module. This makes sure that the tests are | |||
valid, and also makes it easier to add known answers to make sure | |||
you don't end up only self compatible. | |||
To make developing easier, the `test` target has one dependency, | |||
[entr](https://github.com/eradman/entr). This will watch for changes | |||
in the `py` files, and automatically rerun the test. |
@@ -0,0 +1,5 @@ | |||
class InvalidTag(Exception): | |||
pass | |||
class InvalidSignature(Exception): | |||
pass |
@@ -0,0 +1,2 @@ | |||
def default_backend(): | |||
pass |
@@ -0,0 +1,87 @@ | |||
from cryptography.exceptions import InvalidSignature | |||
from Crypto.PublicKey import ECC | |||
from Crypto.Protocol.DH import _compute_ecdh | |||
from Crypto.Signature import DSS | |||
# https://www.pycryptodome.org/src/public_key/ecc# | |||
SECP256R1 = lambda: 'secp256r1' | |||
class ECDSA: | |||
def __init__(self, algorithm): | |||
self._algo = algorithm | |||
class ECDH: | |||
pass | |||
class EllipticCurvePrivateNumbers: | |||
def __init__(self, key): | |||
self._key = key | |||
@property | |||
def public_numbers(self): | |||
return self._key._ecc.pointQ | |||
@property | |||
def private_value(self): | |||
return self._key._ecc.d | |||
class ECCWrapper: | |||
def __init__(self, ecckey): | |||
self._ecc = ecckey | |||
def private_numbers(self): | |||
return EllipticCurvePrivateNumbers(self) | |||
def public_key(self): | |||
return EllipticCurvePublicKey(self._ecc.public_key()) | |||
_format_to_compress = dict(nocompress=False, compress=True) | |||
def public_bytes(self, encoding, format): | |||
return self.public_key()._ecc.export_key(format=encoding, compress=self._format_to_compress[format]) | |||
def private_bytes(self, encoding, format, encryption_algorithm): | |||
return self._ecc.export_key(format=encoding, compress=format) | |||
# https://www.pycryptodome.org/src/protocol/dh | |||
def exchange(self, typ, pubkey): | |||
assert isinstance(typ, ECDH) | |||
return _compute_ecdh(self._ecc, pubkey._ecc) | |||
@classmethod | |||
def generate_private_key(cls, keytype, backend=None): | |||
if callable(keytype): | |||
keytype = keytype() | |||
return cls(ECC.generate(curve=keytype)) | |||
def sign(self, data, signature_algorithm): | |||
h = signature_algorithm._algo.new(data) | |||
signer = DSS.new(self._ecc, 'fips-186-3', 'der') | |||
return signer.sign(h) | |||
def verify(self, signature, data, signature_algorithm): | |||
h = signature_algorithm._algo.new(data) | |||
verifier = DSS.new(self._ecc, 'fips-186-3', 'der') | |||
try: | |||
verifier.verify(h, signature) | |||
except ValueError: | |||
raise InvalidSignature | |||
class EllipticCurvePublicKey(ECCWrapper): | |||
@classmethod | |||
def from_encoded_point(cls, curve, key): | |||
return cls(ECC.import_key(key, curve_name=curve)) | |||
generate_private_key = ECCWrapper.generate_private_key | |||
def load_der_private_key(key, password, backend=None): | |||
if password is not None: | |||
raise ValueError('unsupported') | |||
return ECCWrapper(ECC.import_key(key)) |
@@ -0,0 +1,6 @@ | |||
from Crypto.Util.asn1 import DerSequence | |||
def decode_dss_signature(signature): | |||
obj = DerSequence().decode(der_encoded=signature) | |||
return tuple(obj) |
@@ -0,0 +1,70 @@ | |||
from Crypto.Cipher import AES as ccAES | |||
from cryptography.exceptions import InvalidTag | |||
# https://www.pycryptodome.org/src/cipher/modern#gcm-mode | |||
class CipherEncryptor: | |||
def __init__(self, encor): | |||
self._encor = encor | |||
self.authenticate_additional_data = encor.update | |||
self.update = encor.encrypt | |||
@property | |||
def tag(self): | |||
return self._encor.digest() | |||
def finalize(self): | |||
return b'' | |||
class CipherDecryptor: | |||
def __init__(self, decor, tag=None): | |||
self._decor = decor | |||
self._tag = tag | |||
self.authenticate_additional_data = decor.update | |||
self.update = decor.decrypt | |||
def finalize(self): | |||
try: | |||
#print(repr(self._decor)) | |||
self._decor.verify(self._tag) | |||
except ValueError: | |||
raise InvalidTag('tag mismatch') | |||
return b'' | |||
class Cipher: | |||
def __init__(self, algo, mode, backend=None): | |||
self._algo = algo | |||
self._mode = mode | |||
def _getmode(self): | |||
if isinstance(self._mode, GCM): | |||
return ccAES.MODE_GCM | |||
def _nonce(self): | |||
return self._mode._iv | |||
def encryptor(self): | |||
return CipherEncryptor(ccAES.new(self._algo._key, | |||
self._getmode(), nonce=self._nonce())) | |||
def decryptor(self): | |||
return CipherDecryptor(ccAES.new(self._algo._key, | |||
self._getmode(), nonce=self._nonce()), tag=self._mode._tag) | |||
class AES: | |||
def __init__(self, key): | |||
self._key = key | |||
class algorithms: | |||
AES = AES | |||
class GCM: | |||
def __init__(self, iv, tag=None): | |||
self._iv = iv | |||
self._tag = tag | |||
class modes: | |||
GCM = GCM |
@@ -0,0 +1,4 @@ | |||
def SHA256(): | |||
from Crypto.Hash import SHA256 | |||
return SHA256 |
@@ -0,0 +1,15 @@ | |||
from Crypto.Protocol.KDF import HKDF as baseHKDF | |||
# https://www.pycryptodome.org/src/protocol/kdf#hkdf | |||
# https://cryptography.io/en/latest/hazmat/primitives/key-derivation-functions/#hkdf | |||
class HKDF: | |||
def __init__(self, algorithm, length, salt, info, backend=None): | |||
self._algo = algorithm | |||
self._len = length | |||
self._salt = salt | |||
self._info = info | |||
def derive(self, key): | |||
return baseHKDF(key, self._len, self._salt, self._algo, context=self._info) |
@@ -0,0 +1,15 @@ | |||
from .asymmetric.ec import load_der_private_key | |||
class Encoding: | |||
X962 = 'SEC1' | |||
Raw = 'raw' | |||
DER = 'DER' | |||
class PublicFormat: | |||
UncompressedPoint = 'nocompress' | |||
class PrivateFormat: | |||
PKCS8 = 'pkcs8' | |||
def NoEncryption(): | |||
return None |
@@ -0,0 +1,2 @@ | |||
from .ecc import TestECC | |||
from .aes import TestAES |
@@ -0,0 +1,61 @@ | |||
from cryptography.hazmat.backends import default_backend | |||
from cryptography.hazmat.primitives.ciphers import ( | |||
Cipher, algorithms, modes | |||
) | |||
from cryptography.exceptions import InvalidTag | |||
import random | |||
import unittest | |||
TAG_LENGTH = 16 | |||
class TestAES(unittest.TestCase): | |||
def test_aesgcm_cav(self): | |||
pass | |||
def test_aesgcm(self): | |||
buf = bytes.fromhex('00000000000000000000000000000000') | |||
key = bytes.fromhex('00000000000000000000000000000000') | |||
iv = bytes.fromhex('000000000000000000000000') | |||
origdata = buf | |||
# encryption | |||
encryptor = Cipher(algorithms.AES(key), | |||
modes.GCM(iv), backend=default_backend() | |||
).encryptor() | |||
last = True | |||
#if version == 'aes128gcm': | |||
data = encryptor.update(buf) | |||
self.assertEqual(data, bytes.fromhex('0388dace60b6a392f328c2b971b2fe78')) | |||
data += encryptor.finalize() | |||
self.assertEqual(encryptor.tag, bytes.fromhex('ab6e47d42cec13bdf53a67b21257bddf')) | |||
data += encryptor.tag | |||
encdata = data | |||
# decryption | |||
content = encdata | |||
decryptor = Cipher(algorithms.AES(key), | |||
modes.GCM(iv, tag=content[-TAG_LENGTH:]), | |||
backend=default_backend() | |||
).decryptor() | |||
decdata = decryptor.update(content[:-TAG_LENGTH]) + decryptor.finalize() | |||
self.assertEqual(origdata, decdata) | |||
# decryption | |||
content = encdata | |||
decryptor = Cipher(algorithms.AES(key), | |||
modes.GCM(iv, tag=b'\x00' * TAG_LENGTH), | |||
backend=default_backend() | |||
).decryptor() | |||
decdata = decryptor.update(content[:-TAG_LENGTH]) | |||
self.assertRaises(InvalidTag, decryptor.finalize) |
@@ -0,0 +1,128 @@ | |||
import unittest | |||
from cryptography.exceptions import InvalidSignature | |||
from cryptography.hazmat.backends import default_backend | |||
from cryptography.hazmat.primitives import serialization | |||
from cryptography.hazmat.primitives import hashes | |||
from cryptography.hazmat.primitives.asymmetric import ec | |||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature | |||
from cryptography.hazmat.primitives.kdf.hkdf import HKDF | |||
class TestECC(unittest.TestCase): | |||
def test_dh(self): | |||
# slightly modified from: | |||
# https://cryptography.io/en/latest/hazmat/primitives/asymmetric/ec/#elliptic-curve-key-exchange-algorithm | |||
# private keys for use in the exchange. | |||
keya = b'0\x81\x87\x02\x01\x000\x13\x06\x07*\x86H\xce=\x02\x01\x06\x08*\x86H\xce=\x03\x01\x07\x04m0k\x02\x01\x01\x04 \x04\xa3X\xbd\x0e\xff*\x8cw\xf8\x9f\x05BD<\nY\xb3\xf1\xd2\xc1\xb0\r\x1e\xedu\x92]4M?\x01\xa1D\x03B\x00\x04P\xd9y\x92f\t\xa7x\xf3\xcf\x17O\xad\x93\xf9\x18"\t\xd3\x13*]3\xa7#\x8bH$j\xea\xfb\x8a\xd3\xb5\xee\xd9\x0f\x9c\xdb\xcc\xf1\xd7\x10\x88\x10e\x82-\x15CR\x08\xbe\x0c\x1e\x82p\x00C\xb2.O\x17\xd4' | |||
keyb = b'0\x81\x87\x02\x01\x000\x13\x06\x07*\x86H\xce=\x02\x01\x06\x08*\x86H\xce=\x03\x01\x07\x04m0k\x02\x01\x01\x04 \xfb\xf8\xf7\x9f\xa3\xb7\xed\x8cT@`\xf6\x9c\xbbv\x0e?\x87\xb1(\xf6\xa8\xb3`\x91\xb4\x92W\xc6\xaa\xf5~\xa1D\x03B\x00\x04\xb8\xfe\xe4\x8dkukc\xa4^\x87\x98\x9c\xb9\xa8\xec\x86\xf8\xc2\x89\xaeF\xe8q\xb9q\x92I\x98n\xfe\xe3<{\x1c&R\x82\xb1\x94=\xa5h*)m/\x13\xfb\x05\x1d\x98u\xec\x1ew\xdfW\x84\xfe\x9eSl\x83' | |||
#server_private_key = ec.generate_private_key(ec.SECP256R1()) | |||
#print('spk:', repr(server_private_key.private_bytes( | |||
# encoding=serialization.Encoding.DER, | |||
# format=serialization.PublicFormat.UncompressedPoint, | |||
# algorithm=None))) | |||
server_private_key = serialization.load_der_private_key(keya, | |||
None, backend=default_backend()) | |||
pubpoint = server_private_key.private_numbers().public_numbers | |||
self.assertEqual(server_private_key.private_numbers(). \ | |||
private_value, | |||
2097859916579721232322403601989230314767884081400167668022347085958538411777) | |||
self.assertEqual(pubpoint.x, | |||
36569272757924220784927781299997671003354453138026426189464987345196826688394) | |||
self.assertEqual(pubpoint.y, | |||
95759458837377270694950453035845273914475242769728982836658929275588228093908) | |||
self.assertEqual(keya, server_private_key.private_bytes( | |||
encoding=serialization.Encoding.DER, | |||
format=serialization.PrivateFormat.PKCS8, | |||
encryption_algorithm=serialization.NoEncryption())) | |||
# In a real handshake the peer is a remote client. For this | |||
# example we'll generate another local private key though. | |||
peer_private_key = serialization.load_der_private_key(keyb, | |||
None) | |||
shared_key = server_private_key.exchange( | |||
ec.ECDH(), peer_private_key.public_key()) | |||
self.assertEqual(shared_key, | |||
b'\x1a\x9f\x93c\xb0s\xa2\x15]{\xa3\xcc\xcf&Q\xd6g\x83\x86%\x7f\t\xfem@\xcb\xe9:U\x16\x07\x02') | |||
# Perform key derivation. | |||
derived_key = HKDF(algorithm=hashes.SHA256(), | |||
length=32, salt=None, info=b'handshake data', backend=None | |||
).derive(shared_key) | |||
# And now we can demonstrate that the handshake performed in | |||
# the opposite direction gives the same final value | |||
same_shared_key = peer_private_key.exchange( | |||
ec.ECDH(), server_private_key.public_key()) | |||
self.assertEqual(shared_key, same_shared_key) | |||
# Perform key derivation. | |||
same_derived_key = HKDF(algorithm=hashes.SHA256(), | |||
length=32, salt=None, info=b'handshake data', | |||
).derive(same_shared_key) | |||
self.assertEqual(derived_key, | |||
b'\x89\r\xf7\xf0\xa6\xb9Z\xb9\xd7\xd0\x9b\x95y\xe0M\x11,\xb4\xe1Z\xe5\xa2j\xee)\xa0I\xb5Q\x18\x94\xd1') | |||
self.assertEqual(derived_key, same_derived_key) | |||
def test_decode_sig(self): | |||
sig = b"0D\x02 P\x92\xaf\xffoN\xadq\r=\x92\xb5\r\xe0l3\xf2\x80*\xdd|\xfe\xd8'\xb8\\\xe8\x94\xd6\xa1\xdb\xea\x02 \x18\x89j\xa8P\x83jk*\xb8\xa2\x15r&d\xa1\x9e\xf6\xec\xd2\xf4 \xd6\x08\x91bs\x18\xb5\x11/\x04" | |||
res = (36444202250238074078057463719437572015031876874679459625290239663827367091178, 11098302536735876471048588108325227764001515075886106999029234198831298588420) | |||
self.assertEqual(decode_dss_signature(sig), res) | |||
def test_sign(self): | |||
private_key = ec.generate_private_key(ec.SECP256R1()) | |||
data = b"this is some data I'd like to sign" | |||
signature = private_key.sign(data, ec.ECDSA(hashes.SHA256())) | |||
# make sure we can decode our own signatures | |||
decode_dss_signature(signature) | |||
public_key = private_key.public_key() | |||
public_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) | |||
wrongsig = bytearray(signature) | |||
wrongsig[0] ^= 1 | |||
wrongsig[1] ^= 4 | |||
wrongsig = bytes(wrongsig) | |||
self.assertRaises(InvalidSignature, public_key.verify, wrongsig, data, ec.ECDSA(hashes.SHA256())) | |||
def test_misc(self): | |||
keya = b'0\x81\x87\x02\x01\x000\x13\x06\x07*\x86H\xce=\x02\x01\x06\x08*\x86H\xce=\x03\x01\x07\x04m0k\x02\x01\x01\x04 \x04\xa3X\xbd\x0e\xff*\x8cw\xf8\x9f\x05BD<\nY\xb3\xf1\xd2\xc1\xb0\r\x1e\xedu\x92]4M?\x01\xa1D\x03B\x00\x04P\xd9y\x92f\t\xa7x\xf3\xcf\x17O\xad\x93\xf9\x18"\t\xd3\x13*]3\xa7#\x8bH$j\xea\xfb\x8a\xd3\xb5\xee\xd9\x0f\x9c\xdb\xcc\xf1\xd7\x10\x88\x10e\x82-\x15CR\x08\xbe\x0c\x1e\x82p\x00C\xb2.O\x17\xd4' | |||
skey = serialization.load_der_private_key(keya, | |||
None, backend=default_backend()) | |||
ckey = skey.public_key().public_bytes( | |||
encoding=serialization.Encoding.X962, | |||
format=serialization.PublicFormat.UncompressedPoint | |||
) | |||
self.assertEqual(ckey, b'\x04P\xd9y\x92f\t\xa7x\xf3\xcf\x17O\xad\x93\xf9\x18"\t\xd3\x13*]3\xa7#\x8bH$j\xea\xfb\x8a\xd3\xb5\xee\xd9\x0f\x9c\xdb\xcc\xf1\xd7\x10\x88\x10e\x82-\x15CR\x08\xbe\x0c\x1e\x82p\x00C\xb2.O\x17\xd4') | |||
pubkey = ec.EllipticCurvePublicKey.from_encoded_point( | |||
ec.SECP256R1(), ckey) | |||
self.assertTrue(isinstance(pubkey, ec.EllipticCurvePublicKey)) | |||
self.assertEqual(ckey, pubkey.public_bytes( | |||
encoding=serialization.Encoding.X962, | |||
format=serialization.PublicFormat.UncompressedPoint | |||
)) |
@@ -0,0 +1,3 @@ | |||
-e . | |||
-e .[dev] |
@@ -0,0 +1,15 @@ | |||
from setuptools import setup, Extension | |||
setup(name = "cryptography", version = "41.0.5", | |||
description = "Emulate cryptography enough for some needs", | |||
author = "John-Mark Gurney", | |||
author_email = "jmg@funkthat.com", | |||
url = 'about:blank', | |||
packages = [ 'cryptography' ], | |||
extras_require = { | |||
'dev': [ 'coverage' ], | |||
}, | |||
install_requires=[ | |||
'pycryptodome', | |||
], | |||
) |