Source code for pyndn.security.v2.certificate_cache_v2

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2017-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
# Author: From ndn-cxx security https://github.com/named-data/ndn-cxx/blob/master/ndn-cxx/security/v2/certificate-cache.cpp
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

"""
This module defines the CertificateCacheV2 class which holds other user's
verified certificates in security v2 format CertificateV2. A certificate is
removed no later than its NotAfter time, or maxLifetime after it has been added
to the cache.
"""

import sys
import logging
import bisect
from pyndn.name import Name
from pyndn.security.v2.certificate_v2 import CertificateV2
from pyndn.encrypt.schedule import Schedule
from pyndn.util.common import Common

[docs]class CertificateCacheV2(object): """ Create a CertificateCacheV2. :param float maxLifetimeMilliseconds: (optional) The maximum time that certificates can live inside the cache, in milliseconds. If omitted use getDefaultLifetime() """ def __init__(self, maxLifetimeMilliseconds = None): if maxLifetimeMilliseconds == None: maxLifetimeMilliseconds = CertificateCacheV2.getDefaultLifetime() # Name => CertificateCacheV2._Entry. self._certificatesByName = {} # The keys of _certificatesByName in sorted order, kept in sync with it. # (We don't use OrderedDict because it doesn't sort keys on insert.) self._certificatesByNameKeys = [] self._nextRefreshTime = sys.float_info.max self._maxLifetimeMilliseconds = maxLifetimeMilliseconds self._nowOffsetMilliseconds = 0
[docs] def insert(self, certificate): """ Insert the certificate into the cache. The inserted certificate will be removed no later than its NotAfter time, or maxLifetimeMilliseconds given to the constructor. :param CertificateV2 certificate: The certificate object, which is copied. """ notAfterTime = certificate.getValidityPeriod().getNotAfter() # _nowOffsetMilliseconds is only used for testing. now = Common.getNowMilliseconds() + self._nowOffsetMilliseconds if notAfterTime < now: logging.getLogger(__name__).info("Not adding " + certificate.getName().toUri() + ": already expired at " + Schedule.toIsoString(notAfterTime)) return removalTime = min(notAfterTime, now + self._maxLifetimeMilliseconds) if removalTime < self._nextRefreshTime: # We need to run refresh() sooner. self._nextRefreshTime = removalTime logging.getLogger(__name__).info("Adding " + certificate.getName().toUri() + ", will remove in " + str((removalTime - now) / (3600 * 1000.0)) + " hours") certificateCopy = CertificateV2(certificate) certificateName = certificateCopy.getName() if certificateName in self._certificatesByName: # A duplicate name. Simply replace. self._certificatesByName[certificateName]._certificate = certificateCopy self._certificatesByName[certificateName]._removalTime = removalTime else: # Insert into _certificatesByNameKeys sorted. # Keep it sync with _certificatesByName. self._certificatesByName[certificateName] = CertificateCacheV2._Entry( certificateCopy, removalTime) bisect.insort(self._certificatesByNameKeys, certificateName)
[docs] def find(self, certificatePrefixOrInterest): """ Find the certificate by the given key name or interest. :param certificatePrefixOrInterest: If a Name, it is the certificate prefix for searching for the certificate. If an Interest, it is the input interest object. :type certificatePrefixOrInterest: Name or Interest :return: The found certificate which matches the interest, or None if not found. You must not modify the returned object. If you need to modify it, then make a copy. :note: If searching by Interest, the ChildSelector is not supported. """ if isinstance(certificatePrefixOrInterest, Name): certificatePrefix = certificatePrefixOrInterest if (certificatePrefix.size() > 0 and certificatePrefix.get(-1).isImplicitSha256Digest()): logging.getLogger(__name__).error( "Certificate search using a name with an implicit digest is not yet supported") self._refresh() # Find the first that is greater than or equal to certificatePrefix. i = bisect.bisect_left(self._certificatesByNameKeys, certificatePrefix) if (i >= len(self._certificatesByNameKeys) or not certificatePrefix.isPrefixOf(self._certificatesByNameKeys[i])): return None return self._certificatesByName[self._certificatesByNameKeys[i]]._certificate else: interest = certificatePrefixOrInterest if interest.getChildSelector() != None: logging.getLogger(__name__).error( "Certificate search using a ChildSelector is not supported. Searching as if this selector not specified") if (interest.getName().size() > 0 and interest.getName().get(-1).isImplicitSha256Digest()): logging.getLogger(__name__).error( "Certificate search using a name with an implicit digest is not yet supported") self._refresh() # Find the first that is greater than or equal to interest.getName(). i = bisect.bisect_left(self._certificatesByNameKeys, interest.getName()) if i >= len(self._certificatesByNameKeys): return None while i < len(self._certificatesByNameKeys): key = self._certificatesByNameKeys[i] certificate = self._certificatesByName[key]._certificate if not interest.getName().isPrefixOf(certificate.getName()): break if interest.matchesData(certificate): return certificate i += 1 return None
[docs] def deleteCertificate(self, certificateName): """ Remove the certificate whose name equals the given name. If no such certificate is in the cache, do nothing. :param Name certificateName: The name of the certificate. """ try: del self._certificatesByName[certificateName] except KeyError: # Do nothing if it doesn't exist. pass try: self._certificatesByNameKeys.remove(certificateName) except ValueError: # Do nothing if it doesn't exist. pass
# This may be the certificate to be removed at _nextRefreshTime by # _refresh(), but just allow _refresh() to run instead of updating # _nextRefreshTime now.
[docs] def clear(self): """ Clear all certificates from the cache. """ self._certificatesByName = {} self._certificatesByNameKeys = [] self._nextRefreshTime = sys.float_info.max
[docs] @staticmethod def getDefaultLifetime(): """ Get the default maximum lifetime (1 hour). :return: The lifetime in milliseconds. :rtype: float """ return 3600.0 * 1000
def _setNowOffsetMilliseconds(self, nowOffsetMilliseconds): """ Set the offset when insert() and _refresh() get the current time, which should only be used for testing. :param float nowOffsetMilliseconds: The offset in milliseconds. """ self._nowOffsetMilliseconds = nowOffsetMilliseconds class _Entry(object): """ CertificateCacheV2._Entry is the value of the _certificatesByName map. Create a new CertificateCacheV2.Entry with the given values. :param CertificateV2 certificate: The certificate. :param float removalTime: The removal time for this entry as milliseconds since Jan 1, 1970 UTC. """ def __init__(self, certificate, removalTime): self._certificate = certificate self._removalTime = removalTime def _refresh(self): """ Remove all outdated certificate entries. """ # _nowOffsetMilliseconds is only used for testing. now = Common.getNowMilliseconds() + self._nowOffsetMilliseconds if now < self._nextRefreshTime: return # We recompute _nextRefreshTime. nextRefreshTime = sys.float_info.max # Go backwards through the list so we can erase entries. i = len(self._certificatesByNameKeys) - 1 while i >= 0: entry = self._certificatesByName[self._certificatesByNameKeys[i]] if entry._removalTime <= now: del self._certificatesByName[self._certificatesByNameKeys[i]] self._certificatesByNameKeys.pop(i) else: nextRefreshTime = min(nextRefreshTime, entry._removalTime) i -= 1 self._nextRefreshTime = nextRefreshTime