Source code for pyndn.security.v2.dynamic_trust_anchor_group
# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2018-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
# Author: From ndn-cxx security https://github.com/named-data/ndn-cxx/blob/master/ndn-cxx/security/v2/trust-anchor-group.cpp
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
This module defines the DynamicTrustAnchorGroup class which extends
TrustAnchorGroup to implement a dynamic trust anchor group.
"""
import os
import logging
from pyndn.util.common import Common
from pyndn.security.v2.trust_anchor_group import TrustAnchorGroup
[docs]class DynamicTrustAnchorGroup(TrustAnchorGroup):
"""
Create a DynamicTrustAnchorGroup to use an existing container.
:param CertificateContainer certificateContainer: The existing certificate
container which implements the CertificateContainer interface.
:param str id: The group ID.
:param str path: The file path for trust anchor(s), which could be a
directory or a file. If it is a directory, all the certificates in the
directory will be loaded.
:param float refreshPeriod: The refresh time in milliseconds for the
anchors under path. This must be positive.
:param bool isDirectory: If True, then path is a directory. If False, it
is a single file.
:raises: ValueError If refreshPeriod is not positive.
"""
def __init__(self, certificateContainer, id, path, refreshPeriod, isDirectory):
super(DynamicTrustAnchorGroup, self).__init__(certificateContainer, id)
self._isDirectory = isDirectory
self._path = path
self._refreshPeriod = refreshPeriod
self._expireTime = 0.0
if refreshPeriod <= 0.0:
raise ValueError(
"Refresh period for the dynamic group must be positive")
logging.getLogger(__name__).info(
"Create a dynamic trust anchor group " + str(id) + " for file/dir " +
path + " with refresh time " + str(refreshPeriod))
self.refresh()
[docs] def refresh(self):
"""
Request a certificate refresh.
"""
now = Common.getNowMilliseconds()
if self._expireTime > now:
return
self._expireTime = now + self._refreshPeriod
logging.getLogger(__name__).info(
"Reloading the dynamic trust anchor group")
# Save a copy of _anchorNames .
oldAnchorNames = set(self._anchorNames)
if not self._isDirectory:
self._loadCertificate(self._path, oldAnchorNames)
else:
try:
allFiles = [f for f in os.listdir(self._path)
if os.path.isfile(os.path.join(self._path, f))]
except:
raise RuntimeError("Cannot list files in directory " + self._path)
for f in allFiles:
self._loadCertificate(os.path.join(self._path, f), oldAnchorNames)
# Remove old certificates.
for name in oldAnchorNames:
self._anchorNames.remove(name)
self._certificates.remove(name)
def _loadCertificate(self, file, oldAnchorNames):
"""
:param str file:
:type oldAnchorNames: set of Name
"""
certificate = TrustAnchorGroup.readCertificate(file)
if certificate != None:
if not (certificate.getName() in self._anchorNames):
self._anchorNames.add(certificate.getName())
self._certificates.add(certificate)
else:
oldAnchorNames.remove(certificate.getName())