asf-strategy.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2024, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "asf-strategy.hpp"
27 #include "algorithm.hpp"
28 #include "common/logger.hpp"
29 #include <boost/lexical_cast.hpp>
30 
31 namespace nfd::fw::asf {
32 
33 NFD_LOG_INIT(AsfStrategy);
35 
36 AsfStrategy::AsfStrategy(Forwarder& forwarder, const Name& name)
37  : Strategy(forwarder)
38  , ProcessNackTraits(this)
39 {
41  if (parsed.version && *parsed.version != getStrategyName()[-1].toVersion()) {
42  NDN_THROW(std::invalid_argument("AsfStrategy does not support version " +
43  std::to_string(*parsed.version)));
44  }
45 
47  m_retxSuppression = RetxSuppressionExponential::construct(params);
48  auto probingInterval = params.getOrDefault<time::milliseconds::rep>("probing-interval",
49  m_probing.getProbingInterval().count());
50  m_probing.setProbingInterval(time::milliseconds(probingInterval));
51  m_nMaxTimeouts = params.getOrDefault<size_t>("max-timeouts", m_nMaxTimeouts);
52  auto measurementsLifetime = time::milliseconds(params.getOrDefault<time::milliseconds::rep>("measurements-lifetime",
54  if (measurementsLifetime <= m_probing.getProbingInterval()) {
55  NDN_THROW(std::invalid_argument("Measurements lifetime (" + boost::lexical_cast<std::string>(measurementsLifetime) +
56  ") should be greater than the probing interval of " +
57  boost::lexical_cast<std::string>(m_probing.getProbingInterval())));
58  }
59  m_measurements.setMeasurementsLifetime(measurementsLifetime);
60 
62 
63  NDN_LOG_DEBUG(*m_retxSuppression);
64  NFD_LOG_DEBUG("probing-interval=" << m_probing.getProbingInterval()
65  << " max-timeouts=" << m_nMaxTimeouts
66  << " measurements-lifetime=" << m_measurements.getMeasurementsLifetime());
67 }
68 
69 const Name&
71 {
72  static const auto strategyName = Name("/localhost/nfd/strategy/asf").appendVersion(5);
73  return strategyName;
74 }
75 
76 void
77 AsfStrategy::afterReceiveInterest(const Interest& interest, const FaceEndpoint& ingress,
78  const shared_ptr<pit::Entry>& pitEntry)
79 {
80  const auto& fibEntry = this->lookupFib(*pitEntry);
81 
82  // Check if the interest is new and, if so, skip the retx suppression check
83  if (!hasPendingOutRecords(*pitEntry)) {
84  auto faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry);
85  if (faceToUse == nullptr) {
86  NFD_LOG_INTEREST_FROM(interest, ingress, "new no-nexthop");
87  sendNoRouteNack(ingress.face, pitEntry);
88  }
89  else {
90  NFD_LOG_INTEREST_FROM(interest, ingress, "new forward-to=" << faceToUse->getId());
91  forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
92  sendProbe(interest, ingress, *faceToUse, fibEntry, pitEntry);
93  }
94  return;
95  }
96 
97  auto faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry, false);
98  if (faceToUse != nullptr) {
99  auto suppressResult = m_retxSuppression->decidePerUpstream(*pitEntry, *faceToUse);
100  if (suppressResult == RetxSuppressionResult::SUPPRESS) {
101  // Cannot be sent on this face, interest was received within the suppression window
102  NFD_LOG_INTEREST_FROM(interest, ingress, "retx forward-to=" << faceToUse->getId() << " suppressed");
103  }
104  else {
105  // The retx arrived after the suppression period: forward it but don't probe, because
106  // probing was done earlier for this interest when it was newly received
107  NFD_LOG_INTEREST_FROM(interest, ingress, "retx forward-to=" << faceToUse->getId());
108  auto* outRecord = forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
109  if (outRecord && suppressResult == RetxSuppressionResult::FORWARD) {
110  m_retxSuppression->incrementIntervalForOutRecord(*outRecord);
111  }
112  }
113  return;
114  }
115 
116  // If all eligible faces have been used (i.e., they all have a pending out-record),
117  // choose the nexthop with the earliest out-record
118  const auto& nexthops = fibEntry.getNextHops();
119  auto it = findEligibleNextHopWithEarliestOutRecord(ingress.face, interest, nexthops, pitEntry);
120  if (it == nexthops.end()) {
121  NFD_LOG_INTEREST_FROM(interest, ingress, "retx no-nexthop");
122  return;
123  }
124  auto& outFace = it->getFace();
125  auto suppressResult = m_retxSuppression->decidePerUpstream(*pitEntry, outFace);
126  if (suppressResult == RetxSuppressionResult::SUPPRESS) {
127  NFD_LOG_INTEREST_FROM(interest, ingress, "retx retry-to=" << outFace.getId() << " suppressed");
128  }
129  else {
130  NFD_LOG_INTEREST_FROM(interest, ingress, "retx retry-to=" << outFace.getId());
131  // sendInterest() is used here instead of forwardInterest() because the measurements info
132  // were already attached to this face in the previous forwarding
133  auto* outRecord = sendInterest(interest, outFace, pitEntry);
134  if (outRecord && suppressResult == RetxSuppressionResult::FORWARD) {
135  m_retxSuppression->incrementIntervalForOutRecord(*outRecord);
136  }
137  }
138 }
139 
140 void
141 AsfStrategy::beforeSatisfyInterest(const Data& data, const FaceEndpoint& ingress,
142  const shared_ptr<pit::Entry>& pitEntry)
143 {
144  NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(pitEntry->getName());
145  if (namespaceInfo == nullptr) {
146  NFD_LOG_DATA_FROM(data, ingress, "no-measurements");
147  return;
148  }
149 
150  // Record the RTT between the Interest out to Data in
151  FaceInfo* faceInfo = namespaceInfo->getFaceInfo(ingress.face.getId());
152  if (faceInfo == nullptr) {
153  NFD_LOG_DATA_FROM(data, ingress, "no-face-info");
154  return;
155  }
156 
157  auto outRecord = pitEntry->findOutRecord(ingress.face);
158  if (outRecord == pitEntry->out_end()) {
159  NFD_LOG_DATA_FROM(data, ingress, "no-out-record");
160  }
161  else {
162  faceInfo->recordRtt(time::steady_clock::now() - outRecord->getLastRenewed());
163  NFD_LOG_DATA_FROM(data, ingress, "rtt=" << faceInfo->getLastRtt() << " srtt=" << faceInfo->getSrtt());
164  }
165 
166  // Extend lifetime for measurements associated with Face
167  namespaceInfo->extendFaceInfoLifetime(*faceInfo, ingress.face.getId());
168  // Extend PIT entry timer to allow slower probes to arrive
169  this->setExpiryTimer(pitEntry, 50_ms);
170  faceInfo->cancelTimeout(data.getName());
171  faceInfo->setNTimeouts(0);
172 }
173 
174 void
175 AsfStrategy::afterReceiveNack(const lp::Nack& nack, const FaceEndpoint& ingress,
176  const shared_ptr<pit::Entry>& pitEntry)
177 {
178  NFD_LOG_NACK_FROM(nack, ingress, "");
179  onTimeoutOrNack(pitEntry->getName(), ingress.face.getId(), true);
180  this->processNack(nack, ingress.face, pitEntry);
181 }
182 
184 AsfStrategy::forwardInterest(const Interest& interest, Face& outFace, const fib::Entry& fibEntry,
185  const shared_ptr<pit::Entry>& pitEntry)
186 {
187  const auto& interestName = interest.getName();
188  auto faceId = outFace.getId();
189 
190  auto* outRecord = sendInterest(interest, outFace, pitEntry);
191 
192  FaceInfo& faceInfo = m_measurements.getOrCreateFaceInfo(fibEntry, interestName, faceId);
193 
194  // Refresh measurements since Face is being used for forwarding
195  NamespaceInfo& namespaceInfo = m_measurements.getOrCreateNamespaceInfo(fibEntry, interestName);
196  namespaceInfo.extendFaceInfoLifetime(faceInfo, faceId);
197 
198  if (!faceInfo.isTimeoutScheduled()) {
199  auto timeout = faceInfo.scheduleTimeout(interestName,
200  [this, name = interestName, faceId] {
201  onTimeoutOrNack(name, faceId, false);
202  });
203  NFD_LOG_TRACE("Scheduled timeout for " << fibEntry.getPrefix() << " to=" << faceId
204  << " in " << time::duration_cast<time::milliseconds>(timeout));
205  }
206 
207  return outRecord;
208 }
209 
210 void
211 AsfStrategy::sendProbe(const Interest& interest, const FaceEndpoint& ingress, const Face& faceToUse,
212  const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry)
213 {
214  if (!m_probing.isProbingNeeded(fibEntry, interest.getName()))
215  return;
216 
217  Face* faceToProbe = m_probing.getFaceToProbe(ingress.face, interest, fibEntry, faceToUse);
218  if (faceToProbe == nullptr)
219  return;
220 
221  Interest probeInterest(interest);
222  probeInterest.refreshNonce();
223  NFD_LOG_DEBUG("Sending probe " << probeInterest.getName() << " nonce=" << probeInterest.getNonce()
224  << " to=" << faceToProbe->getId() << " trigger-nonce=" << interest.getNonce());
225  forwardInterest(probeInterest, *faceToProbe, fibEntry, pitEntry);
226 
227  m_probing.afterForwardingProbe(fibEntry, interest.getName());
228 }
229 
230 static auto
232 {
233  // The RTT is used to store the status of the face:
234  // - A positive value indicates data was received and is assumed to indicate a working face (group 1),
235  // - RTT_NO_MEASUREMENT indicates a face is unmeasured (group 2),
236  // - RTT_TIMEOUT indicates a face is timed out (group 3).
237  // These groups are defined in the technical report.
238  //
239  // When forwarding, we assume an order where working faces (group 1) are ranked
240  // higher than unmeasured faces (group 2), and unmeasured faces are ranked higher
241  // than timed out faces (group 3). We assign each group a priority value from 1-3
242  // to ensure lowest-to-highest ordering consistent with this logic.
243 
244  // Working faces are ranked first in priority; if RTT is not
245  // a special value, we assume the face to be in this group.
246  int priority = 1;
247  if (fs.rtt == FaceInfo::RTT_NO_MEASUREMENT) {
248  priority = 2;
249  }
250  else if (fs.rtt == FaceInfo::RTT_TIMEOUT) {
251  priority = 3;
252  }
253 
254  // We set SRTT by default to the max value; if a face is working, we instead set it to the actual value.
255  // Unmeasured and timed out faces are not sorted by SRTT.
256  auto srtt = priority == 1 ? fs.srtt : time::nanoseconds::max();
257 
258  // For ranking, group takes the priority over SRTT (if present) or cost, SRTT (if present)
259  // takes priority over cost, and cost takes priority over FaceId.
260  // FaceId is included to ensure all unique entries are included in the ranking (see #5310)
261  return std::tuple(priority, srtt, fs.cost, fs.face->getId());
262 }
263 
264 bool
265 AsfStrategy::FaceStatsForwardingCompare::operator()(const FaceStats& lhs, const FaceStats& rhs) const noexcept
266 {
268 }
269 
270 Face*
271 AsfStrategy::getBestFaceForForwarding(const Interest& interest, const Face& inFace,
272  const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry,
273  bool isInterestNew)
274 {
275  FaceStatsForwardingSet rankedFaces;
276 
277  auto now = time::steady_clock::now();
278  for (const auto& nh : fibEntry.getNextHops()) {
279  if (!isNextHopEligible(inFace, interest, nh, pitEntry, !isInterestNew, now)) {
280  continue;
281  }
282 
283  const FaceInfo* info = m_measurements.getFaceInfo(fibEntry, interest.getName(), nh.getFace().getId());
284  if (info == nullptr) {
285  rankedFaces.insert({&nh.getFace(), FaceInfo::RTT_NO_MEASUREMENT,
286  FaceInfo::RTT_NO_MEASUREMENT, nh.getCost()});
287  }
288  else {
289  rankedFaces.insert({&nh.getFace(), info->getLastRtt(), info->getSrtt(), nh.getCost()});
290  }
291  }
292 
293  auto it = rankedFaces.begin();
294  return it != rankedFaces.end() ? it->face : nullptr;
295 }
296 
297 void
298 AsfStrategy::onTimeoutOrNack(const Name& interestName, FaceId faceId, bool isNack)
299 {
300  NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(interestName);
301  if (namespaceInfo == nullptr) {
302  NFD_LOG_TRACE(interestName << " FibEntry has been removed since timeout scheduling");
303  return;
304  }
305 
306  FaceInfo* fiPtr = namespaceInfo->getFaceInfo(faceId);
307  if (fiPtr == nullptr) {
308  NFD_LOG_TRACE(interestName << " FaceInfo id=" << faceId << " has been removed since timeout scheduling");
309  return;
310  }
311 
312  auto& faceInfo = *fiPtr;
313  size_t nTimeouts = faceInfo.getNTimeouts() + 1;
314  faceInfo.setNTimeouts(nTimeouts);
315 
316  if (nTimeouts < m_nMaxTimeouts && !isNack) {
317  NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts << " ignoring");
318  // Extend lifetime for measurements associated with Face
319  namespaceInfo->extendFaceInfoLifetime(faceInfo, faceId);
320  faceInfo.cancelTimeout(interestName);
321  }
322  else {
323  NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts);
324  faceInfo.recordTimeout(interestName);
325  faceInfo.setNTimeouts(0);
326  }
327 }
328 
329 void
330 AsfStrategy::sendNoRouteNack(Face& face, const shared_ptr<pit::Entry>& pitEntry)
331 {
332  lp::NackHeader nackHeader;
333  nackHeader.setReason(lp::NackReason::NO_ROUTE);
334  this->sendNack(nackHeader, face, pitEntry);
335  this->rejectPendingInterest(pitEntry);
336 }
337 
338 } // namespace nfd::fw::asf
This file contains common algorithms used by forwarding strategies.
Represents a face-endpoint pair in the forwarder.
Main class of NFD's forwarding engine.
Definition: forwarder.hpp:54
Generalization of a network interface.
Definition: face.hpp:118
FaceId getId() const noexcept
Returns the face ID.
Definition: face.hpp:195
Represents an entry in the FIB.
Definition: fib-entry.hpp:91
const NextHopList & getNextHops() const noexcept
Definition: fib-entry.hpp:103
const Name & getPrefix() const noexcept
Definition: fib-entry.hpp:97
void processNack(const lp::Nack &nack, const Face &inFace, const shared_ptr< pit::Entry > &pitEntry)
static std::unique_ptr< RetxSuppressionExponential > construct(const StrategyParameters &params)
Base class of all forwarding strategies.
Definition: strategy.hpp:46
void rejectPendingInterest(const shared_ptr< pit::Entry > &pitEntry)
Schedule the PIT entry for immediate deletion.
Definition: strategy.hpp:335
const fib::Entry & lookupFib(const pit::Entry &pitEntry) const
Performs a FIB lookup, considering Link object if present.
Definition: strategy.cpp:313
bool sendNack(const lp::NackHeader &header, Face &egress, const shared_ptr< pit::Entry > &pitEntry)
Send a Nack packet.
Definition: strategy.hpp:351
pit::OutRecord * sendInterest(const Interest &interest, Face &egress, const shared_ptr< pit::Entry > &pitEntry)
Send an Interest packet.
Definition: strategy.cpp:236
static ParsedInstanceName parseInstanceName(const Name &input)
Parse a strategy instance name.
Definition: strategy.cpp:123
void setExpiryTimer(const shared_ptr< pit::Entry > &pitEntry, time::milliseconds duration)
Schedule the PIT entry to be erased after duration.
Definition: strategy.hpp:386
void setInstanceName(const Name &name) noexcept
Set strategy instance name.
Definition: strategy.hpp:448
static StrategyParameters parseParameters(const PartialName &params)
Parse strategy parameters encoded in a strategy instance name.
Definition: strategy.cpp:144
static Name makeInstanceName(const Name &input, const Name &strategyName)
Construct a strategy instance name.
Definition: strategy.cpp:134
std::enable_if_t< std::is_signed_v< T >, T > getOrDefault(const key_type &key, const T &defaultVal) const
Definition: strategy.hpp:489
NamespaceInfo & getOrCreateNamespaceInfo(const fib::Entry &fibEntry, const Name &prefix)
static constexpr time::milliseconds DEFAULT_MEASUREMENTS_LIFETIME
void setMeasurementsLifetime(time::milliseconds measurementsLifetime)
time::milliseconds getMeasurementsLifetime() const
FaceInfo & getOrCreateFaceInfo(const fib::Entry &fibEntry, const Name &interestName, FaceId faceId)
FaceInfo * getFaceInfo(const fib::Entry &fibEntry, const Name &interestName, FaceId faceId)
NamespaceInfo * getNamespaceInfo(const Name &prefix)
Adaptive SRTT-based forwarding strategy.
static const Name & getStrategyName()
void afterReceiveNack(const lp::Nack &nack, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger after a Nack is received.
AsfStrategy(Forwarder &forwarder, const Name &name=getStrategyName())
void afterReceiveInterest(const Interest &interest, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger after an Interest is received.
void beforeSatisfyInterest(const Data &data, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger before a PIT entry is satisfied.
Strategy information for each face in a namespace.
static constexpr time::nanoseconds RTT_TIMEOUT
void cancelTimeout(const Name &prefix)
time::nanoseconds scheduleTimeout(const Name &interestName, ndn::scheduler::EventCallback cb)
static constexpr time::nanoseconds RTT_NO_MEASUREMENT
time::nanoseconds getSrtt() const
void setNTimeouts(size_t nTimeouts)
time::nanoseconds getLastRtt() const
void recordTimeout(const Name &interestName)
void recordRtt(time::nanoseconds rtt)
Stores strategy information about each face in this namespace.
FaceInfo * getFaceInfo(FaceId faceId)
void extendFaceInfoLifetime(FaceInfo &info, FaceId faceId)
Face * getFaceToProbe(const Face &inFace, const Interest &interest, const fib::Entry &fibEntry, const Face &faceUsed)
bool isProbingNeeded(const fib::Entry &fibEntry, const Name &interestName)
time::milliseconds getProbingInterval() const
void afterForwardingProbe(const fib::Entry &fibEntry, const Name &interestName)
void setProbingInterval(time::milliseconds probingInterval)
Contains information about an Interest toward an outgoing face.
Definition: pit-entry.hpp:129
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
#define NFD_LOG_DEBUG
Definition: logger.hpp:38
#define NFD_LOG_TRACE
Definition: logger.hpp:37
uint64_t FaceId
Identifies a face.
Definition: face-common.hpp:48
static auto getFaceRankForForwarding(const FaceStats &fs) noexcept
@ SUPPRESS
Interest is a retransmission and should be suppressed.
@ FORWARD
Interest is a retransmission and should be forwarded.
bool isNextHopEligible(const Face &inFace, const Interest &interest, const fib::NextHop &nexthop, const shared_ptr< pit::Entry > &pitEntry, bool wantUnused, time::steady_clock::time_point now)
Determines whether a NextHop is eligible, i.e., not the same inFace.
Definition: algorithm.cpp:131
fib::NextHopList::const_iterator findEligibleNextHopWithEarliestOutRecord(const Face &inFace, const Interest &interest, const fib::NextHopList &nexthops, const shared_ptr< pit::Entry > &pitEntry)
Pick an eligible NextHop with earliest out-record.
Definition: algorithm.cpp:109
NFD_REGISTER_STRATEGY(SelfLearningStrategy)
bool hasPendingOutRecords(const pit::Entry &pitEntry)
Determine whether pitEntry has any pending out-records.
Definition: algorithm.cpp:85
#define NFD_LOG_INTEREST_FROM(interest, ingress, msg)
Logs the reception of interest on ingress, followed by msg, at DEBUG level.
Definition: strategy.hpp:542
#define NFD_LOG_DATA_FROM(data, ingress, msg)
Logs the reception of data on ingress, followed by msg, at DEBUG level.
Definition: strategy.hpp:549
#define NFD_LOG_NACK_FROM(nack, ingress, msg)
Logs the reception of nack on ingress, followed by msg, at DEBUG level.
Definition: strategy.hpp:555
std::optional< uint64_t > version
The strategy version number, if present.
Definition: strategy.hpp:420
PartialName parameters
Parameter components, may be empty.
Definition: strategy.hpp:421
Container for ranking-related values.