Source code for pyndn.security.pib.pib_certificate_container

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2017-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
# Author: From ndn-cxx security https://github.com/named-data/ndn-cxx/blob/master/ndn-cxx/security/pib/certificate-container.cpp
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

"""
This modules defines the PibCertificateContainer class which is used to
search/enumerate the certificates of a key. (A PibCertificateContainer object
can only be created by PibKey.)
"""

from pyndn.name import Name
from pyndn.security.v2.certificate_v2 import CertificateV2

[docs]class PibCertificateContainer(object): """ Create a PibCertificateContainer for a key with keyName. This constructor should only be called by PibKeyImpl. :param Name keyName: The name of the key, which is copied. :param PibImpl pibImpl: The PIB backend implementation. """ def __init__(self, keyName, pibImpl): # The cache of loaded certificates. certificateName => CertificateV2. self._certificates = {} self._keyName = Name(keyName) self._pibImpl = pibImpl if pibImpl == None: raise ValueError("The pibImpl is None") # A set of Name. self._certificateNames = self._pibImpl.getCertificatesOfKey(keyName)
[docs] def size(self): """ Get the number of certificates in the container. :return: The number of certificates. :rtype: int """ return len(self._certificateNames)
[docs] def add(self, certificate): """ Add certificate into the container. If the certificate already exists, this replaces it. :param CertificateV2 certificate: The certificate to add. This copies the object. :raises ValueError: If the name of the certificate does not match the key name. """ if not self._keyName.equals(certificate.getKeyName()): raise ValueError("The certificate name `" + certificate.getKeyName().toUri() + "` does not match the key name") certificateName = Name(certificate.getName()) self._certificateNames.add(certificateName) # Copy the certificate. self._certificates[certificateName] = CertificateV2(certificate) self._pibImpl.addCertificate(certificate)
[docs] def remove(self, certificateName): """ Remove the certificate with name certificateName from the container. If the certificate does not exist, do nothing. :param Name certificateName: The name of the certificate. :raises ValueError: If certificateName does not match the key name. """ if (not CertificateV2.isValidName(certificateName) or not CertificateV2.extractKeyNameFromCertName(certificateName).equals (self._keyName)): raise ValueError("Certificate name `" + certificateName.toUri() + "` is invalid or does not match key name") try: self._certificateNames.remove(certificateName) except KeyError: # Do nothing if it doesn't exist. pass try: del self._certificates[certificateName] except KeyError: # Do nothing if it doesn't exist. pass self._pibImpl.removeCertificate(certificateName)
[docs] def get(self, certificateName): """ Get the certificate with certificateName from the container. :param Name certificateName: The name of the certificate. :return: A copy of the certificate. :rtype: CertificateV2 :raises ValueError: If certificateName does not match the key name :raises Pib.Error: If the certificate does not exist. """ try: cachedCertificate = self._certificates[certificateName] except KeyError: cachedCertificate = None if cachedCertificate != None: # Make a copy. # TODO: Copy is expensive. Can we just tell the caller not to modify it? return CertificateV2(cachedCertificate) # Get from the PIB and cache. if (not CertificateV2.isValidName(certificateName) or not CertificateV2.extractKeyNameFromCertName(certificateName).equals (self._keyName)): raise ValueError("Certificate name `" + certificateName.toUri() + "` is invalid or does not match key name") certificate = self._pibImpl.getCertificate(certificateName) # Copy the certificate Name. self._certificates[Name(certificateName)] = certificate # Make a copy. # TODO: Copy is expensive. Can we just tell the caller not to modify it? return CertificateV2(certificate)
[docs] def isConsistent(self): """ Check if the container is consistent with the backend storage. :return: True if the container is consistent, False otherwise. :rtype: bool :note: This method is heavy-weight and should be used in a debugging mode only. """ return (self._certificateNames == self._pibImpl.getCertificatesOfKey(self._keyName))