asf-strategy.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2024, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "asf-strategy.hpp"
27 #include "algorithm.hpp"
28 #include "common/logger.hpp"
29 
30 namespace nfd::fw::asf {
31 
32 NFD_LOG_INIT(AsfStrategy);
34 
35 AsfStrategy::AsfStrategy(Forwarder& forwarder, const Name& name)
36  : Strategy(forwarder)
37  , ProcessNackTraits(this)
38 {
40  if (parsed.version && *parsed.version != getStrategyName()[-1].toVersion()) {
41  NDN_THROW(std::invalid_argument("AsfStrategy does not support version " +
42  std::to_string(*parsed.version)));
43  }
44 
46  m_retxSuppression = RetxSuppressionExponential::construct(params);
47  auto probingInterval = params.getOrDefault<time::milliseconds::rep>("probing-interval",
48  m_probing.getProbingInterval().count());
49  m_probing.setProbingInterval(time::milliseconds(probingInterval));
50  m_nMaxTimeouts = params.getOrDefault<size_t>("max-timeouts", m_nMaxTimeouts);
51 
53 
54  NDN_LOG_DEBUG(*m_retxSuppression);
55  NFD_LOG_DEBUG("probing-interval=" << m_probing.getProbingInterval()
56  << " max-timeouts=" << m_nMaxTimeouts);
57 }
58 
59 const Name&
61 {
62  static const auto strategyName = Name("/localhost/nfd/strategy/asf").appendVersion(5);
63  return strategyName;
64 }
65 
66 void
67 AsfStrategy::afterReceiveInterest(const Interest& interest, const FaceEndpoint& ingress,
68  const shared_ptr<pit::Entry>& pitEntry)
69 {
70  const auto& fibEntry = this->lookupFib(*pitEntry);
71 
72  // Check if the interest is new and, if so, skip the retx suppression check
73  if (!hasPendingOutRecords(*pitEntry)) {
74  auto faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry);
75  if (faceToUse == nullptr) {
76  NFD_LOG_INTEREST_FROM(interest, ingress, "new no-nexthop");
77  sendNoRouteNack(ingress.face, pitEntry);
78  }
79  else {
80  NFD_LOG_INTEREST_FROM(interest, ingress, "new forward-to=" << faceToUse->getId());
81  forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
82  sendProbe(interest, ingress, *faceToUse, fibEntry, pitEntry);
83  }
84  return;
85  }
86 
87  auto faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry, false);
88  if (faceToUse != nullptr) {
89  auto suppressResult = m_retxSuppression->decidePerUpstream(*pitEntry, *faceToUse);
90  if (suppressResult == RetxSuppressionResult::SUPPRESS) {
91  // Cannot be sent on this face, interest was received within the suppression window
92  NFD_LOG_INTEREST_FROM(interest, ingress, "retx forward-to=" << faceToUse->getId() << " suppressed");
93  }
94  else {
95  // The retx arrived after the suppression period: forward it but don't probe, because
96  // probing was done earlier for this interest when it was newly received
97  NFD_LOG_INTEREST_FROM(interest, ingress, "retx forward-to=" << faceToUse->getId());
98  auto* outRecord = forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
99  if (outRecord && suppressResult == RetxSuppressionResult::FORWARD) {
100  m_retxSuppression->incrementIntervalForOutRecord(*outRecord);
101  }
102  }
103  return;
104  }
105 
106  // If all eligible faces have been used (i.e., they all have a pending out-record),
107  // choose the nexthop with the earliest out-record
108  const auto& nexthops = fibEntry.getNextHops();
109  auto it = findEligibleNextHopWithEarliestOutRecord(ingress.face, interest, nexthops, pitEntry);
110  if (it == nexthops.end()) {
111  NFD_LOG_INTEREST_FROM(interest, ingress, "retx no-nexthop");
112  return;
113  }
114  auto& outFace = it->getFace();
115  auto suppressResult = m_retxSuppression->decidePerUpstream(*pitEntry, outFace);
116  if (suppressResult == RetxSuppressionResult::SUPPRESS) {
117  NFD_LOG_INTEREST_FROM(interest, ingress, "retx retry-to=" << outFace.getId() << " suppressed");
118  }
119  else {
120  NFD_LOG_INTEREST_FROM(interest, ingress, "retx retry-to=" << outFace.getId());
121  // sendInterest() is used here instead of forwardInterest() because the measurements info
122  // were already attached to this face in the previous forwarding
123  auto* outRecord = sendInterest(interest, outFace, pitEntry);
124  if (outRecord && suppressResult == RetxSuppressionResult::FORWARD) {
125  m_retxSuppression->incrementIntervalForOutRecord(*outRecord);
126  }
127  }
128 }
129 
130 void
131 AsfStrategy::beforeSatisfyInterest(const Data& data, const FaceEndpoint& ingress,
132  const shared_ptr<pit::Entry>& pitEntry)
133 {
134  NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(pitEntry->getName());
135  if (namespaceInfo == nullptr) {
136  NFD_LOG_DATA_FROM(data, ingress, "no-measurements");
137  return;
138  }
139 
140  // Record the RTT between the Interest out to Data in
141  FaceInfo* faceInfo = namespaceInfo->getFaceInfo(ingress.face.getId());
142  if (faceInfo == nullptr) {
143  NFD_LOG_DATA_FROM(data, ingress, "no-face-info");
144  return;
145  }
146 
147  auto outRecord = pitEntry->findOutRecord(ingress.face);
148  if (outRecord == pitEntry->out_end()) {
149  NFD_LOG_DATA_FROM(data, ingress, "no-out-record");
150  }
151  else {
152  faceInfo->recordRtt(time::steady_clock::now() - outRecord->getLastRenewed());
153  NFD_LOG_DATA_FROM(data, ingress, "rtt=" << faceInfo->getLastRtt() << " srtt=" << faceInfo->getSrtt());
154  }
155 
156  // Extend lifetime for measurements associated with Face
157  namespaceInfo->extendFaceInfoLifetime(*faceInfo, ingress.face.getId());
158  // Extend PIT entry timer to allow slower probes to arrive
159  this->setExpiryTimer(pitEntry, 50_ms);
160  faceInfo->cancelTimeout(data.getName());
161  faceInfo->setNTimeouts(0);
162 }
163 
164 void
165 AsfStrategy::afterReceiveNack(const lp::Nack& nack, const FaceEndpoint& ingress,
166  const shared_ptr<pit::Entry>& pitEntry)
167 {
168  NFD_LOG_NACK_FROM(nack, ingress, "");
169  onTimeoutOrNack(pitEntry->getName(), ingress.face.getId(), true);
170  this->processNack(nack, ingress.face, pitEntry);
171 }
172 
174 AsfStrategy::forwardInterest(const Interest& interest, Face& outFace, const fib::Entry& fibEntry,
175  const shared_ptr<pit::Entry>& pitEntry)
176 {
177  const auto& interestName = interest.getName();
178  auto faceId = outFace.getId();
179 
180  auto* outRecord = sendInterest(interest, outFace, pitEntry);
181 
182  FaceInfo& faceInfo = m_measurements.getOrCreateFaceInfo(fibEntry, interestName, faceId);
183 
184  // Refresh measurements since Face is being used for forwarding
185  NamespaceInfo& namespaceInfo = m_measurements.getOrCreateNamespaceInfo(fibEntry, interestName);
186  namespaceInfo.extendFaceInfoLifetime(faceInfo, faceId);
187 
188  if (!faceInfo.isTimeoutScheduled()) {
189  auto timeout = faceInfo.scheduleTimeout(interestName,
190  [this, name = interestName, faceId] {
191  onTimeoutOrNack(name, faceId, false);
192  });
193  NFD_LOG_TRACE("Scheduled timeout for " << fibEntry.getPrefix() << " to=" << faceId
194  << " in " << time::duration_cast<time::milliseconds>(timeout));
195  }
196 
197  return outRecord;
198 }
199 
200 void
201 AsfStrategy::sendProbe(const Interest& interest, const FaceEndpoint& ingress, const Face& faceToUse,
202  const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry)
203 {
204  if (!m_probing.isProbingNeeded(fibEntry, interest.getName()))
205  return;
206 
207  Face* faceToProbe = m_probing.getFaceToProbe(ingress.face, interest, fibEntry, faceToUse);
208  if (faceToProbe == nullptr)
209  return;
210 
211  Interest probeInterest(interest);
212  probeInterest.refreshNonce();
213  NFD_LOG_DEBUG("Sending probe " << probeInterest.getName() << " nonce=" << probeInterest.getNonce()
214  << " to=" << faceToProbe->getId() << " trigger-nonce=" << interest.getNonce());
215  forwardInterest(probeInterest, *faceToProbe, fibEntry, pitEntry);
216 
217  m_probing.afterForwardingProbe(fibEntry, interest.getName());
218 }
219 
220 static auto
222 {
223  // The RTT is used to store the status of the face:
224  // - A positive value indicates data was received and is assumed to indicate a working face (group 1),
225  // - RTT_NO_MEASUREMENT indicates a face is unmeasured (group 2),
226  // - RTT_TIMEOUT indicates a face is timed out (group 3).
227  // These groups are defined in the technical report.
228  //
229  // When forwarding, we assume an order where working faces (group 1) are ranked
230  // higher than unmeasured faces (group 2), and unmeasured faces are ranked higher
231  // than timed out faces (group 3). We assign each group a priority value from 1-3
232  // to ensure lowest-to-highest ordering consistent with this logic.
233 
234  // Working faces are ranked first in priority; if RTT is not
235  // a special value, we assume the face to be in this group.
236  int priority = 1;
237  if (fs.rtt == FaceInfo::RTT_NO_MEASUREMENT) {
238  priority = 2;
239  }
240  else if (fs.rtt == FaceInfo::RTT_TIMEOUT) {
241  priority = 3;
242  }
243 
244  // We set SRTT by default to the max value; if a face is working, we instead set it to the actual value.
245  // Unmeasured and timed out faces are not sorted by SRTT.
246  auto srtt = priority == 1 ? fs.srtt : time::nanoseconds::max();
247 
248  // For ranking, group takes the priority over SRTT (if present) or cost, SRTT (if present)
249  // takes priority over cost, and cost takes priority over FaceId.
250  // FaceId is included to ensure all unique entries are included in the ranking (see #5310)
251  return std::tuple(priority, srtt, fs.cost, fs.face->getId());
252 }
253 
254 bool
255 AsfStrategy::FaceStatsForwardingCompare::operator()(const FaceStats& lhs, const FaceStats& rhs) const noexcept
256 {
258 }
259 
260 Face*
261 AsfStrategy::getBestFaceForForwarding(const Interest& interest, const Face& inFace,
262  const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry,
263  bool isInterestNew)
264 {
265  FaceStatsForwardingSet rankedFaces;
266 
267  auto now = time::steady_clock::now();
268  for (const auto& nh : fibEntry.getNextHops()) {
269  if (!isNextHopEligible(inFace, interest, nh, pitEntry, !isInterestNew, now)) {
270  continue;
271  }
272 
273  const FaceInfo* info = m_measurements.getFaceInfo(fibEntry, interest.getName(), nh.getFace().getId());
274  if (info == nullptr) {
275  rankedFaces.insert({&nh.getFace(), FaceInfo::RTT_NO_MEASUREMENT,
276  FaceInfo::RTT_NO_MEASUREMENT, nh.getCost()});
277  }
278  else {
279  rankedFaces.insert({&nh.getFace(), info->getLastRtt(), info->getSrtt(), nh.getCost()});
280  }
281  }
282 
283  auto it = rankedFaces.begin();
284  return it != rankedFaces.end() ? it->face : nullptr;
285 }
286 
287 void
288 AsfStrategy::onTimeoutOrNack(const Name& interestName, FaceId faceId, bool isNack)
289 {
290  NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(interestName);
291  if (namespaceInfo == nullptr) {
292  NFD_LOG_TRACE(interestName << " FibEntry has been removed since timeout scheduling");
293  return;
294  }
295 
296  FaceInfo* fiPtr = namespaceInfo->getFaceInfo(faceId);
297  if (fiPtr == nullptr) {
298  NFD_LOG_TRACE(interestName << " FaceInfo id=" << faceId << " has been removed since timeout scheduling");
299  return;
300  }
301 
302  auto& faceInfo = *fiPtr;
303  size_t nTimeouts = faceInfo.getNTimeouts() + 1;
304  faceInfo.setNTimeouts(nTimeouts);
305 
306  if (nTimeouts < m_nMaxTimeouts && !isNack) {
307  NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts << " ignoring");
308  // Extend lifetime for measurements associated with Face
309  namespaceInfo->extendFaceInfoLifetime(faceInfo, faceId);
310  faceInfo.cancelTimeout(interestName);
311  }
312  else {
313  NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts);
314  faceInfo.recordTimeout(interestName);
315  faceInfo.setNTimeouts(0);
316  }
317 }
318 
319 void
320 AsfStrategy::sendNoRouteNack(Face& face, const shared_ptr<pit::Entry>& pitEntry)
321 {
322  lp::NackHeader nackHeader;
323  nackHeader.setReason(lp::NackReason::NO_ROUTE);
324  this->sendNack(nackHeader, face, pitEntry);
325  this->rejectPendingInterest(pitEntry);
326 }
327 
328 } // namespace nfd::fw::asf
This file contains common algorithms used by forwarding strategies.
Represents a face-endpoint pair in the forwarder.
Main class of NFD's forwarding engine.
Definition: forwarder.hpp:54
Generalization of a network interface.
Definition: face.hpp:118
FaceId getId() const noexcept
Returns the face ID.
Definition: face.hpp:195
Represents an entry in the FIB.
Definition: fib-entry.hpp:91
const NextHopList & getNextHops() const noexcept
Definition: fib-entry.hpp:103
const Name & getPrefix() const noexcept
Definition: fib-entry.hpp:97
void processNack(const lp::Nack &nack, const Face &inFace, const shared_ptr< pit::Entry > &pitEntry)
static std::unique_ptr< RetxSuppressionExponential > construct(const StrategyParameters &params)
Base class of all forwarding strategies.
Definition: strategy.hpp:46
void rejectPendingInterest(const shared_ptr< pit::Entry > &pitEntry)
Schedule the PIT entry for immediate deletion.
Definition: strategy.hpp:335
const fib::Entry & lookupFib(const pit::Entry &pitEntry) const
Performs a FIB lookup, considering Link object if present.
Definition: strategy.cpp:313
bool sendNack(const lp::NackHeader &header, Face &egress, const shared_ptr< pit::Entry > &pitEntry)
Send a Nack packet.
Definition: strategy.hpp:351
pit::OutRecord * sendInterest(const Interest &interest, Face &egress, const shared_ptr< pit::Entry > &pitEntry)
Send an Interest packet.
Definition: strategy.cpp:236
static ParsedInstanceName parseInstanceName(const Name &input)
Parse a strategy instance name.
Definition: strategy.cpp:123
void setExpiryTimer(const shared_ptr< pit::Entry > &pitEntry, time::milliseconds duration)
Schedule the PIT entry to be erased after duration.
Definition: strategy.hpp:386
void setInstanceName(const Name &name) noexcept
Set strategy instance name.
Definition: strategy.hpp:448
static StrategyParameters parseParameters(const PartialName &params)
Parse strategy parameters encoded in a strategy instance name.
Definition: strategy.cpp:144
static Name makeInstanceName(const Name &input, const Name &strategyName)
Construct a strategy instance name.
Definition: strategy.cpp:134
std::enable_if_t< std::is_signed_v< T >, T > getOrDefault(const key_type &key, const T &defaultVal) const
Definition: strategy.hpp:489
NamespaceInfo & getOrCreateNamespaceInfo(const fib::Entry &fibEntry, const Name &prefix)
FaceInfo & getOrCreateFaceInfo(const fib::Entry &fibEntry, const Name &interestName, FaceId faceId)
FaceInfo * getFaceInfo(const fib::Entry &fibEntry, const Name &interestName, FaceId faceId)
NamespaceInfo * getNamespaceInfo(const Name &prefix)
Adaptive SRTT-based forwarding strategy.
static const Name & getStrategyName()
void afterReceiveNack(const lp::Nack &nack, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger after a Nack is received.
AsfStrategy(Forwarder &forwarder, const Name &name=getStrategyName())
void afterReceiveInterest(const Interest &interest, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger after an Interest is received.
void beforeSatisfyInterest(const Data &data, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger before a PIT entry is satisfied.
Strategy information for each face in a namespace.
static constexpr time::nanoseconds RTT_TIMEOUT
void cancelTimeout(const Name &prefix)
time::nanoseconds scheduleTimeout(const Name &interestName, ndn::scheduler::EventCallback cb)
static constexpr time::nanoseconds RTT_NO_MEASUREMENT
time::nanoseconds getSrtt() const
void setNTimeouts(size_t nTimeouts)
time::nanoseconds getLastRtt() const
void recordTimeout(const Name &interestName)
void recordRtt(time::nanoseconds rtt)
Stores strategy information about each face in this namespace.
FaceInfo * getFaceInfo(FaceId faceId)
void extendFaceInfoLifetime(FaceInfo &info, FaceId faceId)
Face * getFaceToProbe(const Face &inFace, const Interest &interest, const fib::Entry &fibEntry, const Face &faceUsed)
bool isProbingNeeded(const fib::Entry &fibEntry, const Name &interestName)
time::milliseconds getProbingInterval() const
void afterForwardingProbe(const fib::Entry &fibEntry, const Name &interestName)
void setProbingInterval(time::milliseconds probingInterval)
Contains information about an Interest toward an outgoing face.
Definition: pit-entry.hpp:129
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
#define NFD_LOG_DEBUG
Definition: logger.hpp:38
#define NFD_LOG_TRACE
Definition: logger.hpp:37
uint64_t FaceId
Identifies a face.
Definition: face-common.hpp:48
static auto getFaceRankForForwarding(const FaceStats &fs) noexcept
@ SUPPRESS
Interest is a retransmission and should be suppressed.
@ FORWARD
Interest is a retransmission and should be forwarded.
bool isNextHopEligible(const Face &inFace, const Interest &interest, const fib::NextHop &nexthop, const shared_ptr< pit::Entry > &pitEntry, bool wantUnused, time::steady_clock::time_point now)
Determines whether a NextHop is eligible, i.e., not the same inFace.
Definition: algorithm.cpp:131
fib::NextHopList::const_iterator findEligibleNextHopWithEarliestOutRecord(const Face &inFace, const Interest &interest, const fib::NextHopList &nexthops, const shared_ptr< pit::Entry > &pitEntry)
Pick an eligible NextHop with earliest out-record.
Definition: algorithm.cpp:109
NFD_REGISTER_STRATEGY(SelfLearningStrategy)
bool hasPendingOutRecords(const pit::Entry &pitEntry)
Determine whether pitEntry has any pending out-records.
Definition: algorithm.cpp:85
#define NFD_LOG_INTEREST_FROM(interest, ingress, msg)
Logs the reception of interest on ingress, followed by msg, at DEBUG level.
Definition: strategy.hpp:542
#define NFD_LOG_DATA_FROM(data, ingress, msg)
Logs the reception of data on ingress, followed by msg, at DEBUG level.
Definition: strategy.hpp:549
#define NFD_LOG_NACK_FROM(nack, ingress, msg)
Logs the reception of nack on ingress, followed by msg, at DEBUG level.
Definition: strategy.hpp:555
std::optional< uint64_t > version
The strategy version number, if present.
Definition: strategy.hpp:420
PartialName parameters
Parameter components, may be empty.
Definition: strategy.hpp:421
Container for ranking-related values.