# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2017-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
# Author: From ndn-cxx security https://github.com/named-data/ndn-cxx/blob/master/ndn-cxx/security/pib/pib-memory.cpp
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
This module defines the PibMemory class which extends PibImpl and is used by
the Pib class as an in-memory implementation of a PIB. All the contents in the
PIB are stored in memory and have the same lifetime as the PibMemory instance.
"""
from pyndn.name import Name
from pyndn.util.blob import Blob
from pyndn.security.pib.pib import Pib
from pyndn.security.pib.pib_key import PibKey
from pyndn.security.v2.certificate_v2 import CertificateV2
from pyndn.security.pib.pib_impl import PibImpl
[docs]class PibMemory(PibImpl):
"""
Create an empty PibMemory.
"""
def __init__(self):
super(PibMemory, self).__init__()
self._tpmLocator = ""
self._defaultIdentityName = None
# Set of Name.
self._identityNames = set()
# identityName => default key Name.
self._defaultKeyNames = {}
# keyName => keyBits Blob.
self._keys = {}
# keyName => default certificate Name.
self._defaultCertificateNames = {}
# certificateName => CertificateV2.
self._certificates = {}
[docs] @staticmethod
def getScheme():
return "pib-memory"
# TpmLocator management.
[docs] def setTpmLocator(self, tpmLocator):
"""
Set the corresponding TPM information to tpmLocator. This method does not
reset the contents of the PIB.
:param str tpmLocator: The TPM locator string.
"""
self._tpmLocator = tpmLocator
[docs] def getTpmLocator(self):
"""
Get the TPM Locator.
:return: The TPM locator string.
:rtype: str
"""
return self._tpmLocator
# Identity management.
[docs] def hasIdentity(self, identityName):
"""
Check for the existence of an identity.
:param Name identityName: The name of the identity.
:return: True if the identity exists, otherwise False.
:rtype: bool
"""
return identityName in self._identityNames
[docs] def addIdentity(self, identityName):
"""
Add the identity. If the identity already exists, do nothing. If no
default identity has been set, set the added identity as the default.
:param Name identityName: The name of the identity to add. This copies
the name.
"""
identityNameCopy = Name(identityName)
self._identityNames.add(identityNameCopy)
if self._defaultIdentityName == None:
self._defaultIdentityName = identityNameCopy
[docs] def removeIdentity(self, identityName):
"""
Remove the identity and its related keys and certificates. If the
default identity is being removed, no default identity will be selected.
If the identity does not exist, do nothing.
:param Name identityName: The name of the identity to remove.
"""
try:
self._identityNames.remove(identityName)
except KeyError:
# Do nothing if it doesn't exist.
pass
if (self._defaultIdentityName != None and
identityName.equals(self._defaultIdentityName)):
self._defaultIdentityName = None
for keyName in self.getKeysOfIdentity(identityName):
self.removeKey(keyName)
[docs] def clearIdentities(self):
"""
Erase all certificates, keys, and identities.
"""
self._defaultIdentityName = None
self._identityNames = set()
self._defaultKeyNames = {}
self._keys = {}
self._defaultCertificateNames = {}
self._certificates = {}
[docs] def getIdentities(self):
"""
Get the names of all the identities.
:return: A fresh set of identity names. The Name objects are fresh
copies.
:rtype: set of Name
"""
# Copy the Name objects.
result = set()
for name in self._identityNames:
result.add(Name(name))
return result
[docs] def setDefaultIdentity(self, identityName):
"""
Set the identity with the identityName as the default identity. If the
identity with identityName does not exist, then it will be created.
:param Name identityName: The name for the default identity. This copies
the name.
"""
self.addIdentity(identityName)
# Copy the name.
self._defaultIdentityName = Name(identityName)
[docs] def getDefaultIdentity(self):
"""
Get the default identity.
:return: The name of the default identity, as a fresh copy.
:rtype: Name
:raises Pib.Error: For no default identity.
"""
if self._defaultIdentityName != None:
# Copy the name.
return Name(self._defaultIdentityName)
raise Pib.Error("No default identity")
# Key management.
[docs] def hasKey(self, keyName):
"""
Check for the existence of a key with keyName.
:param Name keyName: The name of the key.
:return: True if the key exists, otherwise False. Return False if the
identity does not exist.
:rtype: bool
"""
return keyName in self._keys
[docs] def addKey(self, identityName, keyName, key):
"""
Add the key. If a key with the same name already exists, overwrite the
key. If the identity does not exist, it will be created. If no default
key for the identity has been set, then set the added key as the default
for the identity. If no default identity has been set, identity becomes
the default.
:param Name identityName: The name of the identity that the key belongs
to. This copies the name.
:param Name keyName: The name of the key. This copies the name.
:param key: The public key bits. This copies the array.
:type key: an array which implements the buffer protocol
"""
self.addIdentity(identityName)
keyNameCopy = Name(keyName)
self._keys[keyNameCopy] = Blob(key, True)
if not identityName in self._defaultKeyNames:
# Copy the identityName.
self._defaultKeyNames[Name(identityName)] = keyNameCopy
[docs] def removeKey(self, keyName):
"""
Remove the key with keyName and its related certificates. If the key
does not exist, do nothing.
:param Name keyName: The name of the key.
"""
identityName = PibKey.extractIdentityFromKeyName(keyName)
try:
del self._keys[keyName]
except KeyError:
# Do nothing if it doesn't exist.
pass
try:
del self._defaultKeyNames[identityName]
except KeyError:
# Do nothing if it doesn't exist.
pass
for certificateName in self.getCertificatesOfKey(keyName):
self.removeCertificate(certificateName)
[docs] def getKeyBits(self, keyName):
"""
Get the key bits of a key with name keyName.
:param Name keyName: The name of the key.
:return: The key bits.
:rtype: Blob
:raises Pib.Error: If the key does not exist.
"""
if not self.hasKey(keyName):
raise Pib.Error("Key `" + keyName.toUri() + "` not found")
key = self._keys[keyName]
return key
[docs] def getKeysOfIdentity(self, identityName):
"""
Get all the key names of the identity with the name identityName. The
returned key names can be used to create a KeyContainer. With a key name
and a backend implementation, one can create a Key front end instance.
:param Name identityName: The name of the identity.
:return: The set of key names. The Name objects are fresh copies. If the
identity does not exist, return an empty set.
:rtype: set of Name
"""
ids = set()
for keyName in self._keys:
if identityName.equals(PibKey.extractIdentityFromKeyName(keyName)):
# Copy the name.
ids.add(Name(keyName))
return ids
[docs] def setDefaultKeyOfIdentity(self, identityName, keyName):
"""
Set the key with keyName as the default key for the identity with name
identityName.
:param Name identityName: The name of the identity. This copies the name.
:param Name keyName: The name of the key. This copies the name.
:raises Pib.Error: If the key does not exist.
"""
if not self.hasKey(keyName):
raise Pib.Error("Key `" + keyName.toUri() + "` not found")
# Copy the Names.
self._defaultKeyNames[Name(identityName)] = Name(keyName)
[docs] def getDefaultKeyOfIdentity(self, identityName):
"""
Get the name of the default key for the identity with name identityName.
:param Name identityName: The name of the identity.
:return: The name of the default key, as a fresh copy.
:rtype: Name
:raises Pib.Error: If there is no default key or if the identity does
not exist.
"""
try:
defaultKey = self._defaultKeyNames[identityName]
except KeyError:
raise Pib.Error(
"No default key for identity `" + identityName.toUri() + "`")
# Copy the name.
return Name(defaultKey)
# Certificate management.
[docs] def hasCertificate(self, certificateName):
"""
Check for the existence of a certificate with name certificateName.
:param Name certificateName: The name of the certificate.
:return: True if the certificate exists, otherwise False.
:rtype: bool
"""
return certificateName in self._certificates
[docs] def addCertificate(self, certificate):
"""
Add the certificate. If a certificate with the same name (without
implicit digest) already exists, then overwrite the certificate. If the
key or identity does not exist, they will be created. If no default
certificate for the key has been set, then set the added certificate as
the default for the key. If no default key was set for the identity, it
will be set as the default key for the identity. If no default identity
was selected, the certificate's identity becomes the default.
:param CertificateV2 certificate: The certificate to add. This copies
the object.
"""
certificateNameCopy = Name(certificate.getName())
# getKeyName already makes a new Name.
keyNameCopy = certificate.getKeyName()
identity = certificate.getIdentity()
self.addKey(identity, keyNameCopy, certificate.getContent().toBytes())
self._certificates[certificateNameCopy] = CertificateV2(certificate)
if not (keyNameCopy in self._defaultCertificateNames):
self._defaultCertificateNames[keyNameCopy] = certificateNameCopy
[docs] def removeCertificate(self, certificateName):
"""
Remove the certificate with name certificateName. If the certificate
does not exist, do nothing.
:param Name certificateName: The name of the certificate.
"""
try:
del self._certificates[certificateName]
except KeyError:
# Do nothing if it doesn't exist.
pass
keyName = CertificateV2.extractKeyNameFromCertName(certificateName)
try:
defaultCertificateName = self._defaultCertificateNames[keyName]
except KeyError:
defaultCertificateName = None
if (defaultCertificateName != None and
defaultCertificateName.equals(certificateName)):
del self._defaultCertificateNames[keyName]
[docs] def getCertificate(self, certificateName):
"""
Get the certificate with name certificateName.
:param Name certificateName: The name of the certificate.
:return: A copy of the certificate.
:rtype: CertificateV2
:raises Pib.Error: If the certificate does not exist.
"""
if not self.hasCertificate(certificateName):
raise Pib.Error(
"Certificate `" + certificateName.toUri() + "` does not exist")
return CertificateV2(self._certificates[certificateName])
[docs] def getCertificatesOfKey(self, keyName):
"""
Get a list of certificate names of the key with id keyName. The returned
certificate names can be used to create a PibCertificateContainer. With a
certificate name and a backend implementation, one can obtain the
certificate.
:param Name keyName: The name of the key.
:return: The set of certificate names. The Name objects are fresh
copies. If the key does not exist, return an empty set.
:rtype: set of Name
"""
certificateNames = set()
for certificateName in self._certificates:
if (CertificateV2.extractKeyNameFromCertName
(self._certificates[certificateName].getName()).equals(keyName)):
# Copy the Name.
certificateNames.add(Name(certificateName))
return certificateNames
[docs] def setDefaultCertificateOfKey(self, keyName, certificateName):
"""
Set the cert with name certificateName as the default for the key with
keyName.
:param Name keyName: The name of the key.
:param Name certificateName: The name of the certificate. This copies
the name.
:raises Pib.Error: If the certificate with name certificateName does not
exist.
"""
if not self.hasCertificate(certificateName):
raise Pib.Error(
"Certificate `" + certificateName.toUri() + "` does not exist")
# Copy the Names.
self._defaultCertificateNames[Name(keyName)] = Name(certificateName)
[docs] def getDefaultCertificateOfKey(self, keyName):
"""
Get the default certificate for the key with eyName.
:param Name keyName: The name of the key.
:return: A copy of the default certificate.
:rtype: CertificateV2
:raises Pib.Error: If the default certificate does not exist.
"""
try:
certificateName = self._defaultCertificateNames[keyName]
except KeyError:
raise Pib.Error(
"No default certificate for key `" + keyName.toUri() + "`")
certificate = self._certificates[certificateName]
return CertificateV2(certificate)