Source code for pyndn.security.v2.validator_config.config_rule
# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2018-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
# Author: From ndn-cxx security https://github.com/named-data/ndn-cxx/blob/master/ndn-cxx/security/v2/validator-config/rule.cpp
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
This module defines the ConfigRule class which represents a rule configuration
section, used by ConfigValidator.
"""
import logging
from pyndn.security.validator_config_error import ValidatorConfigError
from pyndn.security.v2.validator_config.config_filter import ConfigFilter
from pyndn.security.v2.validator_config.config_checker import ConfigChecker
[docs]class ConfigRule(object):
"""
Create a ConfigRule with empty filters and checkers.
:param str id: The rule ID from the configuration section.
:param bool isForInterest: True if the rule is for an Interest packet,
False if it is for a Data packet.
"""
def __init__(self, id, isForInterest):
self._id = id
self._isForInterest = isForInterest
self._filters = [] # of ConfigFilter
self._checkers = [] # of ConfigChecker
[docs] def getId(self):
"""
Get the rule ID.
:return: The rule ID.
:rtype: bool
"""
[docs] def getIsForInterest(self):
"""
Get the isForInterest flag.
:return: True if the rule is for an Interest packet, False if it is for
a Data packet.
:rtype: bool
"""
return self._isForInterest
[docs] def addFilter(self, filter):
"""
Add the ConfigFilter to the list of filters.
:param ConfigFilter filter: The ConfigFilter.
"""
self._filters.append(filter)
[docs] def addChecker(self, checker):
"""
Add the ConfigChecker to the list of checkers.
:param ConfigChecker checker: The ConfigChecker.
"""
self._checkers.append(checker)
[docs] def match(self, isForInterest, packetName):
"""
Check if the packet name matches the rule's filter. If no filters were
added, the rule matches everything.
:param bool isForInterest: True if packetName is for an Interest, False
if for a Data packet.
:param Name packetName: The packet name. For a signed interest, the last
two components are skipped but not removed.
:return: True if at least one filter matches the packet name, False if
none of the filters match the packet name.
:rtype: bool
:raises: ValidatorConfigError if the supplied isForInterest doesn't
match the one for which the rule is designed.
"""
logging.getLogger(__name__).info("Trying to match " + packetName.toUri())
if isForInterest != self._isForInterest:
raise ValidatorConfigError(
("Invalid packet type supplied ( " +
("interest" if isForInterest else "data") + " != " +
("interest" if self._isForInterest else "data") + ")"))
if len(self._filters) == 0:
return True
result = False
for i in range(len(self._filters)):
result = (result or self._filters[i].match(isForInterest, packetName))
if result:
break
return result
[docs] def check(self, isForInterest, packetName, keyLocatorName, state):
"""
Check if the packet satisfies the rule's condition.
:param bool isForInterest: True if packetName is for an Interest, False
if for a Data packet.
:param Name packetName: The packet name. For a signed interest, the last
two components are skipped but not removed.
:param Name keyLocatorName: The KeyLocator's name.
:param ValidationState state: This calls state.fail() if the packet is
invalid.
:return: True if further signature verification is needed, or False if
the packet is immediately determined to be invalid in which case this
calls state.fail() with the proper code and message.
:rtype: bool
:raises: ValidatorConfigError if the supplied isForInterest doesn't
match the one for which the rule is designed.
"""
logging.getLogger(__name__).info("Trying to check " + packetName.toUri() +
" with keyLocator " + keyLocatorName.toUri())
if isForInterest != self._isForInterest:
raise ValidatorConfigError(
"Invalid packet type supplied ( " +
("interest" if isForInterest else "data") + " != " +
("interest" if self._isForInterest else "data") + ")")
hasPendingResult = False
for i in range(len(self._checkers)):
result = self._checkers[i].check(
isForInterest, packetName, keyLocatorName, state)
if not result:
return result
hasPendingResult = True
return hasPendingResult
[docs] @staticmethod
def create(configSection):
"""
Create a rule from configuration section.
:param BoostInfoTree configSection: The section containing the
definition of the checker, e.g. one of "validator.rule".
:return: A new ConfigRule created from the configuration.
:rtype: ConfigRule
"""
# Get rule.id .
ruleId = configSection.getFirstValue("id")
if ruleId == None:
raise ValidatorConfigError("Expecting <rule.id>")
# Get rule.for .
usage = configSection.getFirstValue("for")
if usage == None:
raise ValidatorConfigError("Expecting <rule.for> in rule: " + ruleId)
if usage.lower() == "data":
isForInterest = False
elif usage.lower() == "interest":
isForInterest = True
else:
raise ValidatorConfigError(
"Unrecognized <rule.for>: " + usage + " in rule: " + ruleId)
rule = ConfigRule(ruleId, isForInterest)
# Get rule.filter(s)
filterList = configSection["filter"]
for i in range(len(filterList)):
rule.addFilter(ConfigFilter.create(filterList[i]))
# Get rule.checker(s)
checkerList = configSection["checker"]
for i in range(len(checkerList)):
rule.addChecker(ConfigChecker.create(checkerList[i]))
# Check other stuff.
if len(checkerList) == 0:
raise ValidatorConfigError(
"No <rule.checker> is specified in rule: " + ruleId)
return rule