# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2017-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
# Author: From ndn-cxx security https://github.com/named-data/ndn-cxx/blob/master/ndn-cxx/security/safe-bag.cpp
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
This module defines the SafeBag class which represents a container for sensitive
related information such as a certificate and private key.
"""
from pyndn.name import Name
from pyndn.data import Data
from pyndn.meta_info import ContentType
from pyndn.key_locator import KeyLocatorType
from pyndn.sha256_with_ecdsa_signature import Sha256WithEcdsaSignature
from pyndn.sha256_with_rsa_signature import Sha256WithRsaSignature
from pyndn.key_locator import KeyLocator
from pyndn.validity_period import ValidityPeriod
from pyndn.security.security_types import KeyType
from pyndn.security.v2.certificate_v2 import CertificateV2
from pyndn.security.certificate.public_key import PublicKey
from pyndn.security.security_types import DigestAlgorithm
from pyndn.util.common import Common
from pyndn.util.blob import Blob
from pyndn.security.tpm.tpm import Tpm
from pyndn.security.tpm.tpm_back_end_memory import TpmBackEndMemory
from pyndn.encoding.wire_format import WireFormat
from pyndn.encoding.tlv_wire_format import TlvWireFormat
from pyndn.encoding.tlv.tlv_encoder import TlvEncoder
from pyndn.encoding.tlv.tlv_decoder import TlvDecoder
from pyndn.encoding.tlv.tlv import Tlv
[docs]class SafeBag(object):
"""
There are three forms of the SafeBag constructor:
SafeBag(certificate, privateKeyBag) - Create a SafeBag with the given
certificate and private key.
SafeBag(keyName, privateKeyBag, publicKeyEncoding [, password,
digestAlgorithm, wireFormat]) - Create a SafeBag with given private key
and a new self-signed certificate for the given public key.
SafeBag(input) - Create a SafeBag by decoding the input as an NDN-TLV SafeBag.
:param Data certificate: The certificate data packet (used only for
SafeBag(certificate, privateKeyBag)). This copies the object.
:param Blob privateKeyBag: The encoded private key. If encrypted, this is a
PKCS #8 EncryptedPrivateKeyInfo. If not encrypted, this is an unencrypted
PKCS #8 PrivateKeyInfo.
:param password: (optional) The password for decrypting the private key in
order to sign the self-signed certificate, which should have characters in
the range of 1 to 127. If the password is supplied, use it to decrypt the
PKCS #8 EncryptedPrivateKeyInfo. If the password is omitted or None,
privateKeyBag is an unencrypted PKCS #8 PrivateKeyInfo.
:type password: an array which implements the buffer protocol
:param int digestAlgorithm: (optional) The digest algorithm for signing the
self-signed certificate. If omitted, use DigestAlgorithm.SHA256 .
:type digestAlgorithm: int from the DigestAlgorithm enum
:param WireFormat wireFormat: (optional) A WireFormat object used to encode
the self-signed certificate in order to sign it. If omitted, use
WireFormat.getDefaultWireFormat().
:param input: The array with the bytes to decode.
:type input: A Blob or an array type with int elements
"""
def __init__(self, arg1, privateKeyBag = None,
publicKeyEncoding = None, password = None,
digestAlgorithm = DigestAlgorithm.SHA256, wireFormat = None):
if isinstance(arg1, Name):
keyName = arg1
if wireFormat == None:
# Don't use a default argument since getDefaultWireFormat can change.
wireFormat = WireFormat.getDefaultWireFormat()
self._certificate = SafeBag._makeSelfSignedCertificate(
keyName, privateKeyBag, publicKeyEncoding, password,
digestAlgorithm, wireFormat)
self._privateKeyBag = privateKeyBag
elif isinstance(arg1, Data):
# The certificate is supplied.
self._certificate = Data(arg1)
self._privateKeyBag = privateKeyBag
else:
# Assume the first argument is the encoded SafeBag.
self.wireDecode(arg1)
[docs] def getCertificate(self):
"""
Get the certificate data packet.
:return: The certificate as a Data packet. If you need to process it as
a certificate object then you must create a new CertificateV2(data).
:rtype: Data
"""
return self._certificate
[docs] def getPrivateKeyBag(self):
"""
Get the encoded private key.
:return: The encoded private key. If encrypted, this is a PKCS #8
EncryptedPrivateKeyInfo. If not encrypted, this is an unencrypted PKCS
#8 PrivateKeyInfo.
:rtype: Blob
"""
return self._privateKeyBag
[docs] def wireDecode(self, input):
"""
Decode the input as an NDN-TLV SafeBag and update this object.
:param input: The array with the bytes to decode.
:type input: A Blob or an array type with int elements
"""
if isinstance(input, Blob):
input = input.buf()
# Decode directly as TLV. We don't support the WireFormat abstraction
# because this isn't meant to go directly on the wire.
decoder = TlvDecoder(input)
endOffset = decoder.readNestedTlvsStart(Tlv.SafeBag_SafeBag)
# Get the bytes of the certificate and decode.
certificateBeginOffset = decoder.getOffset()
certificateEndOffset = decoder.readNestedTlvsStart(Tlv.Data)
decoder.seek(certificateEndOffset)
self._certificate = Data()
self._certificate.wireDecode(
decoder.getSlice(certificateBeginOffset, certificateEndOffset),
TlvWireFormat.get())
self._privateKeyBag = Blob(
decoder.readBlobTlv(Tlv.SafeBag_EncryptedKeyBag), True)
decoder.finishNestedTlvs(endOffset)
[docs] def wireEncode(self, wireFormat = None):
"""
Encode this as an NDN-TLV SafeBag.
:return: The encoded byte array as a Blob.
:rtype: Blob
"""
# Encode directly as TLV. We don't support the WireFormat abstraction
# because this isn't meant to go directly on the wire.
encoder = TlvEncoder(256)
saveLength = len(encoder)
# Encode backwards.
encoder.writeBlobTlv(
Tlv.SafeBag_EncryptedKeyBag, self._privateKeyBag.buf())
# Add the entire Data packet encoding as is.
encoder.writeBuffer(
self._certificate.wireEncode(TlvWireFormat.get()).buf())
encoder.writeTypeAndLength(
Tlv.SafeBag_SafeBag, len(encoder) - saveLength)
return Blob(encoder.getOutput(), False)
@staticmethod
def _makeSelfSignedCertificate(
keyName, privateKeyBag, publicKeyEncoding, password, digestAlgorithm,
wireFormat):
certificate = CertificateV2()
# Set the name.
now = Common.getNowMilliseconds()
certificateName = Name(keyName)
certificateName.append("self").appendVersion(int(now))
certificate.setName(certificateName)
# Set the MetaInfo.
certificate.getMetaInfo().setType(ContentType.KEY)
# Set a one-hour freshness period.
certificate.getMetaInfo().setFreshnessPeriod(3600 * 1000.0)
# Set the content.
publicKey = PublicKey(publicKeyEncoding)
certificate.setContent(publicKey.getKeyDer())
# Create a temporary in-memory Tpm and import the private key.
tpm = Tpm("", "", TpmBackEndMemory())
tpm._importPrivateKey(keyName, privateKeyBag.toBytes(), password)
# Set the signature info.
if publicKey.getKeyType() == KeyType.RSA:
certificate.setSignature(Sha256WithRsaSignature())
elif publicKey.getKeyType() == KeyType.EC:
certificate.setSignature(Sha256WithEcdsaSignature())
else:
raise ValueError("Unsupported key type")
signatureInfo = certificate.getSignature()
KeyLocator.getFromSignature(signatureInfo).setType(KeyLocatorType.KEYNAME)
KeyLocator.getFromSignature(signatureInfo).setKeyName(keyName)
# Set a 20-year validity period.
ValidityPeriod.getFromSignature(signatureInfo).setPeriod(
now, now + 20 * 365 * 24 * 3600 * 1000.0)
# Encode once to get the signed portion.
encoding = certificate.wireEncode(wireFormat)
signatureBytes = tpm.sign(encoding.toSignedBytes(), keyName,
digestAlgorithm)
signatureInfo.setSignature(signatureBytes)
# Encode again to include the signature.
certificate.wireEncode(wireFormat)
return certificate