# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
This module defines the MemoryContentCache class which holds a set of Data
packets and answers an Interest to return the correct Data packet. The cache is
periodically cleaned up to remove each stale Data packet based on its
FreshnessPeriod (if it has one).
Note: This class is an experimental feature. See the API docs for more detail at
http://named-data.net/doc/ndn-ccl-api/memory-content-cache.html .
"""
import logging
import collections
from pyndn.registration_options import RegistrationOptions
from pyndn.interest_filter import InterestFilter
from pyndn.encoding.wire_format import WireFormat
from pyndn.name import Name
from pyndn.util.common import Common
[docs]class MemoryContentCache(object):
"""
Create a new MemoryContentCache to use the given Face.
:param Face face: The Face to use to call registerPrefix and
setInterestFilter, and which will call this object's OnInterest callback.
:param float cleanupIntervalMilliseconds: (optional) The interval in
milliseconds between each check to clean up stale content in the cache. If
omitted, use a default of 1000 milliseconds. If this is a large number,
then effectively the stale content will not be removed from the cache.
"""
def __init__(self, face, cleanupIntervalMilliseconds = None):
if cleanupIntervalMilliseconds == None:
cleanupIntervalMilliseconds = 1000.0
self._face = face
self._cleanupIntervalMilliseconds = cleanupIntervalMilliseconds
self._nextCleanupTime = (Common.getNowMilliseconds() +
cleanupIntervalMilliseconds)
# The map key is the prefix.toUri(). The value is an OnInterest function.
self._onDataNotFoundForPrefix = {}
# elements are int
self._interestFilterIdList = []
# elements are int
self._registeredPrefixIdList = []
# elements are MemoryContentCache._Content
self._noStaleTimeCache = []
# elements are MemoryContentCache.StaleTimeContent
self._staleTimeCache = []
self._emptyComponent = Name.Component()
self._pendingInterestTable = [] # of PendingInterest
self._minimumCacheLifetime = 0.0
[docs] def registerPrefix(
self, prefix, onRegisterFailed, onRegisterSuccess = None,
onDataNotFound = None, registrationOptions = None, wireFormat = None):
"""
Call registerPrefix on the Face given to the constructor so that this
MemoryContentCache will answer interests whose name has the prefix.
Alternatively, if the Face's registerPrefix has already been called,
then you can call this object's setInterestFilter.
:param Name prefix: The Name for the prefix to register. This copies the
Name.
:param onRegisterFailed: If this fails to register the prefix for any
reason, this calls onRegisterFailed(prefix) where prefix is the prefix
given to registerPrefix.
NOTE: The library will log any exceptions raised by this callback, but
for better error handling the callback should catch and properly
handle any exceptions.
:type onRegisterFailed: function object
:param onRegisterSuccess: (optional) This calls
onRegisterSuccess[0](prefix, registeredPrefixId) when this receives a
success message from the forwarder. If onRegisterSuccess is omitted or
[None], this does not use it. (As a special case, this optional
parameter is supplied as a list of one function object, instead of
just a function object, in order to detect when it is used instead of
the following optional onDataNotFound function object.)
NOTE: The library will log any exceptions raised by this callback, but
for better error handling the callback should catch and properly
handle any exceptions.
:type onRegisterSuccess: list of one function object
:param onDataNotFound: (optional) If a data packet for an interest is
not found in the cache, this forwards the interest by calling
onDataNotFound(prefix, interest, face, interestFilterId, filter). Your
callback can find the Data packet for the interest and call
face.putData(data). If your callback cannot find the Data packet, it can
optionally call storePendingInterest(interest, face) to store the
pending interest in this object to be satisfied by a later call to
add(data). If you want to automatically store all pending interests,
you can simply use getStorePendingInterest() for onDataNotFound. If
onDataNotFound is omitted or None, this does not use it.
NOTE: The library will log any exceptions raised by this callback, but
for better error handling the callback should catch and properly
handle any exceptions.
:type onDataNotFound: function object
:param RegistrationOptions registrationOptions: (optional) See Face.registerPrefix.
:param wireFormat: (optional) See Face.registerPrefix.
:type wireFormat: A subclass of WireFormat
"""
arg3 = onRegisterSuccess
arg4 = onDataNotFound
arg5 = registrationOptions
arg6 = wireFormat
# arg3, arg4, arg5, arg6 may be:
# [OnRegisterSuccess], OnDataNotFound, RegistrationOptions, WireFormat
# [OnRegisterSuccess], OnDataNotFound, RegistrationOptions, None
# [OnRegisterSuccess], OnDataNotFound, WireFormat, None
# [OnRegisterSuccess], OnDataNotFound, None, None
# [OnRegisterSuccess], RegistrationOptions, WireFormat, None
# [OnRegisterSuccess], RegistrationOptions, None, None
# [OnRegisterSuccess], WireFormat, None, None
# [OnRegisterSuccess], None, None, None
# OnDataNotFound, RegistrationOptions, WireFormat, None
# OnDataNotFound, RegistrationOptions, None, None
# OnDataNotFound, WireFormat, None, None
# OnDataNotFound, None, None, None
# RegistrationOptions, WireFormat, None, None
# RegistrationOptions, None, None, None
# WireFormat, None, None, None
# None, None, None, None
if type(arg3) is list and len(arg3) == 1:
onRegisterSuccess = arg3[0]
else:
onRegisterSuccess = None
if isinstance(arg3, collections.Callable):
onDataNotFound = arg3
elif isinstance(arg4, collections.Callable):
onDataNotFound = arg4
else:
onDataNotFound = None
if isinstance(arg3, RegistrationOptions):
registrationOptions = arg3
elif isinstance(arg4, RegistrationOptions):
registrationOptions = arg4
elif isinstance(arg5, RegistrationOptions):
registrationOptions = arg5
else:
registrationOptions = RegistrationOptions()
if isinstance(arg3, WireFormat):
wireFormat = arg3
elif isinstance(arg4, WireFormat):
wireFormat = arg4
elif isinstance(arg5, WireFormat):
wireFormat = arg5
elif isinstance(arg6, WireFormat):
wireFormat = arg6
else:
# Don't use a default argument since getDefaultWireFormat can change.
wireFormat = WireFormat.getDefaultWireFormat()
if onDataNotFound != None:
self._onDataNotFoundForPrefix[prefix.toUri()] = onDataNotFound
registeredPrefixId = self._face.registerPrefix(
prefix, self._onInterest, onRegisterFailed, onRegisterSuccess,
registrationOptions, wireFormat)
self._registeredPrefixIdList.append(registeredPrefixId)
[docs] def setInterestFilter(self, filterOrPrefix, onDataNotFound = None):
"""
Call setInterestFilter on the Face given to the constructor so that this
MemoryContentCache will answer interests whose name matches the filter.
There are two forms of setInterestFilter.
The first form uses the exact given InterestFilter:
setInterestFilter(filter, [onDataNotFound]).
The second form creates an InterestFilter from the given prefix Name:
setInterestFilter(prefix, [onDataNotFound]).
:param InterestFilter filter: The InterestFilter with a prefix and
optional regex filter used to match the name of an incoming Interest.
This makes a copy of filter.
:param Name prefix: The Name prefix used to match the name of an
incoming Interest. This makes a copy of the Name.
:param onDataNotFound: (optional) If a data packet for an interest is
not found in the cache, this forwards the interest by calling
onDataNotFound(prefix, interest, face, interestFilterId, filter). Your
callback can find the Data packet for the interest and call
face.putData(data). If your callback cannot find the Data packet, it can
optionally call storePendingInterest(interest, face) to store the
pending interest in this object to be satisfied by a later call to
add(data). If you want to automatically store all pending interests,
you can simply use getStorePendingInterest() for onDataNotFound. If
onDataNotFound is omitted or None, this does not use it.
NOTE: The library will log any exceptions raised by this callback, but
for better error handling the callback should catch and properly
handle any exceptions.
:type onDataNotFound: function object
"""
if onDataNotFound != None:
if isinstance(filterOrPrefix, InterestFilter):
prefix = filterOrPrefix.getPrefix()
else:
prefix = filterOrPrefix
self._onDataNotFoundForPrefix[prefix.toUri()] = onDataNotFound
interestFilterId = self._face.setInterestFilter(
filterOrPrefix, self._onInterest)
self._interestFilterIdList.append(interestFilterId)
[docs] def unregisterAll(self):
"""
Call Face.unsetInterestFilter and Face.removeRegisteredPrefix for all
the prefixes given to the setInterestFilter and registerPrefix method on
this MemoryContentCache object so that it will not receive interests any
more. You can call this if you want to "shut down" this
MemoryContentCache while your application is still running.
"""
for interestFilterId in self._interestFilterIdList:
self._face.unsetInterestFilter(interestFilterId)
self._interestFilterIdList = []
for registeredPrefixId in self._registeredPrefixIdList:
self._face.removeRegisteredPrefix(registeredPrefixId)
self._registeredPrefixIdList = []
# Also clear each onDataNotFoundForPrefix given to registerPrefix.
self._onDataNotFoundForPrefix = {}
[docs] def add(self, data):
"""
Add the Data packet to the cache so that it is available to use to
answer interests. If data.getMetaInfo().getFreshnessPeriod() is not
None, set the staleness time to now plus the maximum of
data.getMetaInfo().getFreshnessPeriod() and minimumCacheLifetime, which
is checked during cleanup to remove stale content.
This also checks if cleanupIntervalMilliseconds
milliseconds have passed and
removes stale content from the cache. After removing stale content,
remove timed-out pending interests from storePendingInterest(), then if
the added Data packet satisfies any interest, send it through the
face and remove the interest from the pending interest table.
:param Data data: The Data packet object to put in the cache. This
copies the fields from the object.
"""
nowMilliseconds = Common.getNowMilliseconds()
self._doCleanup(nowMilliseconds)
if (data.getMetaInfo().getFreshnessPeriod() != None and
data.getMetaInfo().getFreshnessPeriod() >= 0.0):
# The content will go stale, so use staleTimeCache.
content = MemoryContentCache._StaleTimeContent(
data, nowMilliseconds, self._minimumCacheLifetime)
# Insert into _staleTimeCache, sorted on
# content._cacheRemovalTimeMilliseconds.
# Search from the back since we expect it to go there.
i = len(self._staleTimeCache) - 1
while i >= 0:
if (self._staleTimeCache[i]._cacheRemovalTimeMilliseconds <=
content._cacheRemovalTimeMilliseconds):
break
i -= 1
# Element i is the greatest less than or equal to
# content._cacheRemovalTimeMilliseconds, so insert after it.
self._staleTimeCache.insert(i + 1, content)
else:
# The data does not go stale, so use _noStaleTimeCache.
self._noStaleTimeCache.append(MemoryContentCache._Content(data))
# Remove timed-out interests and check if the data packet matches any
# pending interest.
# Go backwards through the list so we can erase entries.
for i in range(len(self._pendingInterestTable) - 1, -1, -1):
pendingInterest = self._pendingInterestTable[i]
if pendingInterest.isTimedOut(nowMilliseconds):
self._pendingInterestTable.pop(i)
continue
if pendingInterest.getInterest().matchesName(data.getName()):
try:
# Send to the same face from the original call to onInterest.
# wireEncode returns the cached encoding if available.
logging.getLogger(__name__).info(
"MemoryContentCache: Reply w/ add Data " + data.getName().toUri())
pendingInterest.getFace().send(data.wireEncode())
except Exception as ex:
logging.getLogger(__name__).error(
"Error in face.send: %s", str(ex))
return
# The pending interest is satisfied, so remove it.
self._pendingInterestTable.pop(i)
[docs] def storePendingInterest(self, interest, face):
"""
Store an interest from an OnInterest callback in the internal pending
interest table (normally because there is no Data packet available yet
to satisfy the interest). add(data) will check if the added Data packet
satisfies any pending interest and send it through the face.
:param Interest interest: The Interest for which we don't have a Data
packet yet. You should not modify the interest after calling this.
:param Face face: The Face with the connection which
received the interest. This comes from the OnInterest callback.
"""
self._pendingInterestTable.append(
self._PendingInterest(interest, face))
[docs] def getStorePendingInterest(self):
"""
Return a callback to use for onDataNotFound in registerPrefix which
simply calls storePendingInterest() to store the interest that doesn't
match a Data packet. add(data) will check if the added Data packet
satisfies any pending interest and send it.
:return: A callback to use for onDataNotFound in registerPrefix().
:rtype: function object
"""
return self._storePendingInterestCallback
[docs] def getMinimumCacheLifetime(self):
"""
Get the minimum lifetime before removing stale content from the cache.
:return: The minimum cache lifetime in milliseconds.
:rtype: float
"""
return self._minimumCacheLifetime
[docs] def setMinimumCacheLifetime(self, minimumCacheLifetime):
"""
Set the minimum lifetime before removing stale content from the cache
which can keep content in the cache longer than the lifetime defined in
the meta info. This can be useful for matching interests where
MustBeFresh is False. The default minimum cache lifetime is zero,
meaning that content is removed when its lifetime expires.
:param float minimumCacheLifetime: The minimum cache lifetime in
milliseconds.
"""
self._minimumCacheLifetime = minimumCacheLifetime
def _storePendingInterestCallback(
self, prefix, interest, face, interestFilterId, filter):
"""
This is a private method to return from getStorePendingInterest(). We
need a separate method because the arguments are different from
storePendingInterest.
"""
self.storePendingInterest(interest, face)
def _onInterest(self, prefix, interest, face, interestFilterId, filter):
"""
This is the OnInterest callback which is called when the library
receives an interest whose name has the prefix given to registerPrefix.
First check if cleanupIntervalMilliseconds milliseconds have passed and
remove stale content from the cache. Then search the cache for the Data
packet, matching any interest selectors including ChildSelector, and
send the Data packet to the face. If no matching Data packet is in
the cache, call the callback in onDataNotFoundForPrefix (if defined).
"""
logging.getLogger(__name__).info(
"MemoryContentCache: Received Interest " + interest.toUri())
nowMilliseconds = Common.getNowMilliseconds()
self._doCleanup(nowMilliseconds)
selectedComponent = None
selectedEncoding = None
# We need to iterate over both arrays.
totalSize = len(self._staleTimeCache) + len(self._noStaleTimeCache)
for i in range(totalSize):
isFresh = True
if i < len(self._staleTimeCache):
content = self._staleTimeCache[i]
isFresh = content.isFresh(nowMilliseconds)
else:
# We have iterated over the first array. Get from the second.
content = self._noStaleTimeCache[i - len(self._staleTimeCache)]
if (interest.matchesName(content.getName()) and
not (interest.getMustBeFresh() and not isFresh)):
if (interest.getChildSelector() == None):
# No child selector, so send the first match that we have found.
logging.getLogger(__name__).info(
"MemoryContentCache: Reply Data " + content.getName().toUri())
face.send(content.getDataEncoding())
return
else:
# Update selectedEncoding based on the child selector.
if (content.getName().size() > interest.getName().size()):
component = content.getName().get(interest.getName().size())
else:
component = self._emptyComponent
gotBetterMatch = False
if selectedEncoding == None:
# Save the first match.
gotBetterMatch = True
else:
if interest.getChildSelector() == 0:
# Leftmost child.
if component.compare(selectedComponent) < 0:
gotBetterMatch = True
else:
# Rightmost child.
if component.compare(selectedComponent) > 0:
gotBetterMatch = True
if gotBetterMatch:
selectedComponent = component
selectedEncoding = content.getDataEncoding()
if selectedEncoding != None:
# We found the leftmost or rightmost child.
logging.getLogger(__name__).info(
"MemoryContentCache: Reply Data to Interest " + interest.toUri())
face.send(selectedEncoding)
else:
logging.getLogger(__name__).info(
"MemoryContentCache: onDataNotFound for " + interest.toUri())
# Call the onDataNotFound callback (if defined).
if prefix.toUri() in self._onDataNotFoundForPrefix:
try:
self._onDataNotFoundForPrefix[prefix.toUri()](
prefix, interest, face, interestFilterId, filter)
except:
logging.exception("Error in onDataNotFound")
def _doCleanup(self, nowMilliseconds):
"""
Check if now is greater than nextCleanupTime and, if so, remove stale
content from staleTimeCache and reset nextCleanupTime based on
cleanupIntervalMilliseconds. Since add(Data) does a sorted insert into
staleTimeCache, the check for stale data is quick and does not require
searching the entire staleTimeCache.
:param float nowMilliseconds: The current time in milliseconds from
Common.getNowMilliseconds().
"""
if nowMilliseconds >= self._nextCleanupTime:
# staleTimeCache is sorted on _cacheRemovalTimeMilliseconds, so we only need
# to erase the stale entries at the front, then quit.
while (len(self._staleTimeCache) > 0 and
self._staleTimeCache[0].isPastRemovalTime(nowMilliseconds)):
del self._staleTimeCache[0]
self._nextCleanupTime = nowMilliseconds + self._cleanupIntervalMilliseconds
"""
_Content is a private class to hold the name and encoding for each entry in
the cache. This base class is for a Data packet without a FreshnessPeriod.
"""
class _Content(object):
"""
Create a new Content entry to hold data's name and wire encoding.
:param Data data: The Data packet whose name and wire encoding are
copied.
"""
def __init__(self, data):
# Copy the name.
self._name = Name(data.getName())
# wireEncode returns the cached encoding if available.
self._dataEncoding = data.wireEncode().buf()
def getName(self):
return self._name
def getDataEncoding(self):
return self._dataEncoding
"""
_StaleTimeContent extends _Content to include the
_cacheRemovalTimeMilliseconds for when this entry should be cleaned up from
the cache.
"""
class _StaleTimeContent(_Content):
"""
Create a new StaleTimeContent to hold data's name and wire encoding
as well as the cacheRemovalTimeMilliseconds_ which is now plus the
maximum of data.getMetaInfo().getFreshnessPeriod() and the
minimumCacheLifetime.
:param Data data: The Data packet whose name and wire encoding are
copied.
:param float nowMilliseconds: The current time in milliseconds from
Common.getNowMilliseconds().
:param float minimumCacheLifetime: The minimum cache lifetime in
milliseconds.
"""
def __init__(self, data, nowMilliseconds, minimumCacheLifetime):
super(MemoryContentCache._StaleTimeContent, self).__init__(data)
# Set up _cacheRemovalTimeMilliseconds which is the time when the
# content becomes stale and should be removed from the cache in
# milliseconds according to Common.getNowMilliseconds().
self._cacheRemovalTimeMilliseconds = nowMilliseconds + max(
data.getMetaInfo().getFreshnessPeriod(), minimumCacheLifetime)
# Set up freshnessExpiryTimeMilliseconds_ which is the time time
# when the freshness period of the content expires (independent of
# when to remove from the cache) in milliseconds according to
# Common.getNowMilliseconds().
self._freshnessExpiryTimeMilliseconds = (nowMilliseconds +
data.getMetaInfo().getFreshnessPeriod())
def isPastRemovalTime(self, nowMilliseconds):
"""
Check if this content is stale and should be removed from the cache,
according to the content freshness period and the
minimumCacheLifetime.
:param float nowMilliseconds: The current time in milliseconds from
Common.getNowMilliseconds().
:return: True if this content is stale, otherwise False.
:rtype: bool
"""
return self._cacheRemovalTimeMilliseconds <= nowMilliseconds
def isFresh(self, nowMilliseconds):
"""
Check if the content is still fresh according to its freshness
period (independent of when to remove from the cache).
:param float nowMilliseconds: The current time in milliseconds from
Common.getNowMilliseconds().
:return: True if the content is still fresh, otherwise False.
:rtype: bool
"""
return self._freshnessExpiryTimeMilliseconds > nowMilliseconds
class _PendingInterest(object):
"""
A PendingInterest holds an interest which onInterest received but could
not satisfy. When we add a new data packet to the cache, we will also
check if it satisfies a pending interest.
Create a new PendingInterest and set the _timeoutTime based on the
current time and the interest lifetime.
:param Interest interest: The interest.
:param Face face: The face from the onInterest callback.
If the interest is satisfied later by a new data packet, we will send
the data packet to the face.
"""
def __init__(self, interest, face):
self._interest = interest
self._face = face
# Set up _timeoutTimeMilliseconds.
interestLifetime = self._interest.getInterestLifetimeMilliseconds()
if interestLifetime is None or interestLifetime < 0.0:
# The InterestLifetime is omitted, so use a default.
interestLifetime = 4000.0
self._timeoutTimeMilliseconds = Common.getNowMilliseconds() + interestLifetime
def getInterest(self):
"""
Return the interest given to the constructor.
"""
return self._interest
def getFace(self):
"""
Return the face given to the constructor.
"""
return self._face
def isTimedOut(self, nowMilliseconds):
"""
Check if this interest is timed out.
:param float nowMilliseconds: The current time in milliseconds from
Common.getNowMilliseconds.
:return: True if this interest timed out, otherwise False.
:rtype: bool
"""
return nowMilliseconds >= self._timeoutTimeMilliseconds