Source code for pyndn.security.tpm.tpm_private_key

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2017-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
# Author: From https://github.com/named-data/ndn-cxx/blob/master/ndn-cxx/security/transform/private-key.cpp
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

"""
This module defines the TpmPrivateKey class which holds an in-memory private key
and provides cryptographic operations such as for signing by the in-memory TPM.
"""

import sys
from random import SystemRandom
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from pyndn.security.security_types import DigestAlgorithm
from pyndn.security.security_types import KeyType
from pyndn.util.blob import Blob
from pyndn.encoding.der.der_node import DerNode, DerInteger
from pyndn.encoding.der.der_node import DerSequence, DerOctetString, DerOid
from pyndn.encoding.der.der_exceptions import DerDecodingException

# The Python documentation says "Use SystemRandom if you require a
#   cryptographically secure pseudo-random number generator."
# http://docs.python.org/2/library/random.html
_systemRandom = SystemRandom()

[docs]class TpmPrivateKey(object): """ Create an uninitialized TpmPrivateKey. You must call a load method to initialize it, such as loadPkcs1. """ def __init__(self): self._keyType = None self._privateKey = None
[docs] class Error(Exception): """ Create a TpmPrivateKey.Error to represents an error in private key processing. :param str message: The error message. """ def __init__(self, message): super(TpmPrivateKey.Error, self).__init__(message)
[docs] def loadPkcs1(self, encoding, keyType = None): """ Load the unencrypted private key from a buffer with the PKCS #1 encoding. This replaces any existing private key in this object. :param encoding: The byte buffer with the private key encoding. :type encoding: str, or an array type with int elements which is converted to str :param KeyType keyType: (optional) The KeyType, such as KeyType.RSA. If omitted or None, then partially decode the private key to determine the key type. :raises TpmPrivateKey.Error: For errors decoding the key. """ if keyType == None: # Try to determine the key type. try: parsedNode = DerNode.parse(Blob(encoding, False).buf()) children = parsedNode.getChildren() # An RsaPrivateKey has integer version 0 and 8 integers. if (len(children) == 9 and isinstance(children[0], DerInteger) and children[0].toVal() == 0 and isinstance(children[1], DerInteger) and isinstance(children[2], DerInteger) and isinstance(children[3], DerInteger) and isinstance(children[4], DerInteger) and isinstance(children[5], DerInteger) and isinstance(children[6], DerInteger) and isinstance(children[7], DerInteger) and isinstance( children[8], DerInteger)): keyType = KeyType.RSA else: # Assume it is an EC key. Try decoding it below. keyType = KeyType.EC except DerDecodingException: # Assume it is an EC key. Try decoding it below. keyType = KeyType.EC if keyType == KeyType.EC or keyType == KeyType.RSA: # serialization can load PKCS #1 directly. self._privateKey = serialization.load_der_private_key( Blob(encoding, False).toBytes(), password = None, backend = default_backend()) else: raise TpmPrivateKey.Error( "loadPkcs1: Unrecognized keyType: " + str(keyType)) self._keyType = keyType
[docs] def loadPkcs8(self, encoding, keyType = None): """ Load the unencrypted private key from a buffer with the PKCS #8 encoding. This replaces any existing private key in this object. :param encoding: The byte buffer with the private key encoding. :type encoding: str, or an array type with int elements which is converted to str :param KeyType keyType: (optional) The KeyType, such as KeyType.RSA. If omitted or None, then partially decode the private key to determine the key type. :raises TpmPrivateKey.Error: For errors decoding the key. """ if keyType == None: # Decode the PKCS #8 DER to find the algorithm OID. oidString = None try: parsedNode = DerNode.parse(Blob(encoding, False).buf()) pkcs8Children = parsedNode.getChildren() algorithmIdChildren = DerNode.getSequence( pkcs8Children, 1).getChildren() oidString = "" + algorithmIdChildren[0].toVal() except Exception as ex: raise TpmPrivateKey.Error( "Cannot decode the PKCS #8 private key: " + str(ex)) if oidString == TpmPrivateKey.EC_ENCRYPTION_OID: keyType = KeyType.EC elif oidString == TpmPrivateKey.RSA_ENCRYPTION_OID: keyType = KeyType.RSA else: raise TpmPrivateKey.Error( "loadPkcs8: Unrecognized private key OID: " + oidString) if keyType == KeyType.EC or keyType == KeyType.RSA: self._privateKey = serialization.load_der_private_key( Blob(encoding, False).toBytes(), password = None, backend = default_backend()) else: raise TpmPrivateKey.Error( "loadPkcs8: Unrecognized keyType: " + str(keyType)) self._keyType = keyType
[docs] def loadEncryptedPkcs8(self, encoding, password): """ Load the encrypted private key from a buffer with the PKCS #8 encoding of the EncryptedPrivateKeyInfo. This replaces any existing private key in this object. This partially decodes the private key to determine the key type. :param encoding: The byte buffer with the private key encoding. :type encoding: str, or an array type with int elements which is converted to str :param password: The password for decrypting the private key, which should have characters in the range of 1 to 127.. :type password: an array which implements the buffer protocol :raises TpmPrivateKey.Error: For errors decoding the key. """ # Decode the PKCS #8 EncryptedPrivateKeyInfo. # See https://tools.ietf.org/html/rfc5208. oidString = None parameters = None encryptedKey = None try: parsedNode = DerNode.parse(Blob(encoding, False).buf()) encryptedPkcs8Children = parsedNode.getChildren() algorithmIdChildren = DerNode.getSequence( encryptedPkcs8Children, 0).getChildren() oidString = algorithmIdChildren[0].toVal() parameters = algorithmIdChildren[1] encryptedKey = encryptedPkcs8Children[1].toVal() except Exception as ex: raise TpmPrivateKey.Error( "Cannot decode the PKCS #8 EncryptedPrivateKeyInfo: " + str(ex)) if oidString == TpmPrivateKey.PBES2_OID: # Decode the PBES2 parameters. See https://www.ietf.org/rfc/rfc2898.txt . keyDerivationOidString = None keyDerivationParameters = None encryptionSchemeOidString = None encryptionSchemeParameters = None try: parametersChildren = parameters.getChildren() keyDerivationAlgorithmIdChildren = DerNode.getSequence( parametersChildren, 0).getChildren() keyDerivationOidString = keyDerivationAlgorithmIdChildren[0].toVal() keyDerivationParameters = keyDerivationAlgorithmIdChildren[1] encryptionSchemeAlgorithmIdChildren = DerNode.getSequence( parametersChildren, 1).getChildren() encryptionSchemeOidString = encryptionSchemeAlgorithmIdChildren[0].toVal() encryptionSchemeParameters = encryptionSchemeAlgorithmIdChildren[1] except Exception as ex: raise TpmPrivateKey.Error( "Cannot decode the PBES2 parameters: " + str(ex)) # Get the derived key from the password. derivedKey = None if keyDerivationOidString == TpmPrivateKey.PBKDF2_OID: # Decode the PBKDF2 parameters. salt = None nIterations = None try: pbkdf2ParametersChildren = keyDerivationParameters.getChildren() salt = pbkdf2ParametersChildren[0].toVal() nIterations = pbkdf2ParametersChildren[1].toVal() except Exception as ex: raise TpmPrivateKey.Error( "Cannot decode the PBES2 parameters: " + str(ex)) # Check the encryption scheme here to get the needed result length. resultLength = None if encryptionSchemeOidString == TpmPrivateKey.DES_EDE3_CBC_OID: resultLength = TpmPrivateKey.DES_EDE3_KEY_LENGTH else: raise TpmPrivateKey.Error( "Unrecognized PBES2 encryption scheme OID: " + encryptionSchemeOidString) else: raise TpmPrivateKey.Error( "Unrecognized PBES2 key derivation OID: " + keyDerivationOidString) pbkdf2 = PBKDF2HMAC(algorithm = hashes.SHA1(), length = resultLength, salt = Blob(salt, False).toBytes(), iterations = nIterations, backend = default_backend()) derivedKey = pbkdf2.derive(Blob(password, False).toBytes()) # Use the derived key to get the unencrypted pkcs8Encoding. if encryptionSchemeOidString == TpmPrivateKey.DES_EDE3_CBC_OID: # Decode the DES-EDE3-CBC parameters. initialVector = None try: initialVector = encryptionSchemeParameters.toVal() except Exception as ex: raise TpmPrivateKey.Error( "Cannot decode the DES-EDE3-CBC parameters: " + str(ex)) try: cipher = Cipher(algorithms.TripleDES( Blob(derivedKey, False).toBytes()), modes.CBC(Blob(initialVector, False).toBytes()), backend = default_backend()) # For the cryptography package, we have to remove the padding. decryptor = cipher.decryptor() resultWithPad = decryptor.update( Blob(encryptedKey, False).toBytes()) + decryptor.finalize() if sys.version_info[0] <= 2: padLength = ord(resultWithPad[-1]) else: padLength = resultWithPad[-1] pkcs8Encoding = resultWithPad[:-padLength] except Exception as ex: raise TpmPrivateKey.Error( "Error decrypting the PKCS #8 key with DES-EDE3-CBC: " + str(ex)) else: raise TpmPrivateKey.Error( "Unrecognized PBES2 encryption scheme OID: " + encryptionSchemeOidString) else: raise TpmPrivateKey.Error( "Unrecognized PKCS #8 EncryptedPrivateKeyInfo OID: " + oidString) self.loadPkcs8(pkcs8Encoding)
[docs] def derivePublicKey(self): """ Get the encoded public key for this private key. :return: The public key encoding Blob. :rtype: Blob :raises TpmPrivateKey.Error: If no private key is loaded, or error converting to a public key. """ if (self._keyType == KeyType.EC or self._keyType == KeyType.RSA): publicKeyDer = self._privateKey.public_key().public_bytes( encoding = serialization.Encoding.DER, format = serialization.PublicFormat.SubjectPublicKeyInfo) return Blob(publicKeyDer, False) else: raise TpmPrivateKey.Error( "derivePublicKey: The private key is not loaded")
[docs] def decrypt(self, cipherText, algorithmType = None): """ Decrypt the cipherText using this private key according the encryption algorithmType. Only RSA encryption is supported for now. :param cipherText: The cipher text byte buffer. :type cipherText: an array which implements the buffer protocol :param EncryptAlgorithmType algorithmType: (optional) This decrypts according to algorithmType. If omitted, use RsaOaep. :return: The decrypted data. :rtype: Blob :raises TpmPrivateKey.Error: If the private key is not loaded, if decryption is not supported for this key type, or for error decrypting. """ # TODO: Fix import loops and import at load time. from pyndn.encrypt.algo.encrypt_params import EncryptAlgorithmType if algorithmType == None: algorithmType = EncryptAlgorithmType.RsaOaep if self._keyType == None: raise TpmPrivateKey.Error("decrypt: The private key is not loaded") if algorithmType == EncryptAlgorithmType.RsaOaep: try: paddingObject = padding.OAEP( mgf = padding.MGF1(algorithm = hashes.SHA1()), algorithm = hashes.SHA1(), label = None) result = self._privateKey.decrypt(cipherText, paddingObject) except Exception as ex: raise TpmPrivateKey.Error("Error in decrypt: " + str(ex)) elif algorithmType == EncryptAlgorithmType.RsaPkcs: try: paddingObject = padding.PKCS1v15() result = self._privateKey.decrypt(cipherText, paddingObject) except Exception as ex: raise TpmPrivateKey.Error("Error in decrypt: " + str(ex)) else: raise TpmPrivateKey.Error("unsupported padding scheme") return Blob(result, False)
[docs] def sign(self, data, digestAlgorithm): """ Sign the data with this private key, returning a signature Blob. :param data: The input byte buffer. :type data: an array which implements the buffer protocol :param digestAlgorithm: The digest algorithm. :type digestAlgorithm: int from DigestAlgorithm :return: The signature Blob, or an isNull Blob if this private key is not initialized. :rtype: Blob :raises TpmPrivateKey.Error: For unrecognized digestAlgorithm or an error in signing. """ if digestAlgorithm != DigestAlgorithm.SHA256: raise TpmPrivateKey.Error( "TpmPrivateKey.sign: Unsupported digest algorithm") if self._keyType == KeyType.RSA: signature = self._privateKey.sign( data, padding.PKCS1v15(), hashes.SHA256()) elif self._keyType == KeyType.EC: signature = self._privateKey.sign(data, ec.ECDSA(hashes.SHA256())) else: return Blob() return Blob(bytearray(signature), False)
[docs] def toPkcs1(self): """ Get the encoded unencrypted private key in PKCS #1. :return: The private key encoding Blob. :rtype: Blob :raises TpmPrivateKey.Error: If no private key is loaded, or error encoding. """ if self._keyType == None: raise TpmPrivateKey.Error("toPkcs1: The private key is not loaded") # Decode the PKCS #8 private key. try: parsedNode = DerNode.parse(self.toPkcs8().buf(), 0) pkcs8Children = parsedNode.getChildren() return pkcs8Children[2].getPayload() except Exception as ex: raise TpmPrivateKey.Error( "Error decoding PKCS #8 private key: " + str(ex))
[docs] def toPkcs8(self): """ Get the encoded unencrypted private key in PKCS #8. :return: The private key encoding Blob. :rtype: Blob :raises TpmPrivateKey.Error: If no private key is loaded, or error encoding. """ if self._keyType == None: raise TpmPrivateKey.Error("toPkcs8: The private key is not loaded") privateKeyDer = self._privateKey.private_bytes( encoding = serialization.Encoding.DER, format = serialization.PrivateFormat.PKCS8, encryption_algorithm = serialization.NoEncryption()) return Blob(privateKeyDer, False)
[docs] def toEncryptedPkcs8(self, password): """ Get the encoded encrypted private key in PKCS #8. :param password: The password for encrypting the private key, which should have characters in the range of 1 to 127.. :type password: an array which implements the buffer protocol :return: The encoding Blob of the EncryptedPrivateKeyInfo. :rtype: Blob :raises TpmPrivateKey.Error: If no private key is loaded, or error encoding. """ if self._keyType == None: raise TpmPrivateKey.Error( "toEncryptedPkcs8: The private key is not loaded") # Create the derivedKey from the password. nIterations = 2048 salt = bytearray(8) for i in range(len(salt)): salt[i] = _systemRandom.randint(0, 0xff) pbkdf2 = PBKDF2HMAC(algorithm = hashes.SHA1(), length = TpmPrivateKey.DES_EDE3_KEY_LENGTH, salt = Blob(salt, False).toBytes(), iterations = nIterations, backend = default_backend()) derivedKey = pbkdf2.derive(Blob(password, False).toBytes()) pkcs8Encoding = self.toPkcs8() # Use the derived key to get the encrypted pkcs8Encoding. encryptedEncoding = None initialVector = bytearray(8) for i in range(len(initialVector)): initialVector[i] = _systemRandom.randint(0, 0xff) try: # For the cryptography package, we have to do the padding. padLength = 16 - (pkcs8Encoding.size() % 16) if sys.version_info[0] <= 2: pad = chr(padLength) * padLength else: pad = bytes([padLength]) * padLength cipher = Cipher(algorithms.TripleDES( Blob(derivedKey, False).toBytes()), modes.CBC(Blob(initialVector, False).toBytes()), backend = default_backend()) encryptor = cipher.encryptor() encryptedEncoding = encryptor.update( pkcs8Encoding.toBytes() + pad) + encryptor.finalize() except Exception as ex: raise TpmPrivateKey.Error( "Error encrypting the PKCS #8 key with DES-EDE3-CBC: " + str(ex)) try: # Encode the PBES2 parameters. See https://www.ietf.org/rfc/rfc2898.txt . keyDerivationParameters = DerSequence() keyDerivationParameters.addChild(DerOctetString(Blob(salt, False))) keyDerivationParameters.addChild(DerInteger(nIterations)) keyDerivationAlgorithmIdentifier = DerSequence() keyDerivationAlgorithmIdentifier.addChild( DerOid(TpmPrivateKey.PBKDF2_OID)) keyDerivationAlgorithmIdentifier.addChild(keyDerivationParameters) encryptionSchemeAlgorithmIdentifier = DerSequence() encryptionSchemeAlgorithmIdentifier.addChild( DerOid(TpmPrivateKey.DES_EDE3_CBC_OID)) encryptionSchemeAlgorithmIdentifier.addChild( DerOctetString(Blob(initialVector, False))) encryptedKeyParameters = DerSequence() encryptedKeyParameters.addChild(keyDerivationAlgorithmIdentifier) encryptedKeyParameters.addChild(encryptionSchemeAlgorithmIdentifier) encryptedKeyAlgorithmIdentifier = DerSequence() encryptedKeyAlgorithmIdentifier.addChild( DerOid(TpmPrivateKey.PBES2_OID)) encryptedKeyAlgorithmIdentifier.addChild(encryptedKeyParameters) # Encode the PKCS #8 EncryptedPrivateKeyInfo. # See https://tools.ietf.org/html/rfc5208. encryptedKey = DerSequence() encryptedKey.addChild(encryptedKeyAlgorithmIdentifier) encryptedKey.addChild(DerOctetString(Blob(encryptedEncoding, False))) return encryptedKey.encode() except Exception as ex: raise TpmPrivateKey.Error( "Error encoding the encryped PKCS #8 private key: " + str(ex))
[docs] @staticmethod def generatePrivateKey(keyParams): """ Generate a key pair according to keyParams and return a new TpmPrivateKey with the private key. You can get the public key with derivePublicKey. :param KeyParams keyParams: The parameters of the key. :return: A new TpmPrivateKey. :rtype: TpmPrivateKey :raises ValueError: If the key type is not supported. :raises TpmPrivateKey.Error: For an invalid key size, or an error generating. """ if (keyParams.getKeyType() == KeyType.RSA or keyParams.getKeyType() == KeyType.EC): if keyParams.getKeyType() == KeyType.RSA: privateKey = rsa.generate_private_key( public_exponent = 65537, key_size = keyParams.getKeySize(), backend = default_backend()) else: privateKey = ec.generate_private_key( TpmPrivateKey._getEcCurve(keyParams.getKeySize()), default_backend()) else: raise ValueError( "Cannot generate a key pair of type " + str(keyParams.getKeyType())) result = TpmPrivateKey() result._privateKey = privateKey result._keyType = keyParams.getKeyType() return result
@staticmethod def _getEcCurve(keySize): """ Get the Elliptic Curve algorithm object for the key size. :param int keySize: The key size. :raises TpmPrivateKey.Error: If the key size is not supported. """ if keySize == 256: return ec.SECP256R1() elif keySize == 384: return ec.SECP384R1() elif keySize == 521: return ec.SECP521R1() else: raise TpmPrivateKey.Error("Unsupported EC key size: " + str(keySize)) RSA_ENCRYPTION_OID = "1.2.840.113549.1.1.1" EC_ENCRYPTION_OID = "1.2.840.10045.2.1" PBES2_OID = "1.2.840.113549.1.5.13" PBKDF2_OID = "1.2.840.113549.1.5.12" DES_EDE3_CBC_OID = "1.2.840.113549.3.7" DES_EDE3_KEY_LENGTH = 24