Loading...
Searching...
No Matches
asf-strategy.cpp
Go to the documentation of this file.
1/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2024, Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#include "asf-strategy.hpp"
27#include "algorithm.hpp"
28#include "common/logger.hpp"
29#include <boost/lexical_cast.hpp>
30
31namespace nfd::fw::asf {
32
33NFD_LOG_INIT(AsfStrategy);
35
36AsfStrategy::AsfStrategy(Forwarder& forwarder, const Name& name)
37 : Strategy(forwarder)
38 , ProcessNackTraits(this)
39{
41 if (parsed.version && *parsed.version != getStrategyName()[-1].toVersion()) {
42 NDN_THROW(std::invalid_argument("AsfStrategy does not support version " +
43 std::to_string(*parsed.version)));
44 }
45
47 m_retxSuppression = RetxSuppressionExponential::construct(params);
48 auto probingInterval = params.getOrDefault<time::milliseconds::rep>("probing-interval",
49 m_probing.getProbingInterval().count());
50 m_probing.setProbingInterval(time::milliseconds(probingInterval));
51 m_nMaxTimeouts = params.getOrDefault<size_t>("max-timeouts", m_nMaxTimeouts);
52 auto measurementsLifetime = time::milliseconds(params.getOrDefault<time::milliseconds::rep>("measurements-lifetime",
54 if (measurementsLifetime <= m_probing.getProbingInterval()) {
55 NDN_THROW(std::invalid_argument("Measurements lifetime (" + boost::lexical_cast<std::string>(measurementsLifetime) +
56 ") should be greater than the probing interval of " +
57 boost::lexical_cast<std::string>(m_probing.getProbingInterval())));
58 }
59 m_measurements.setMeasurementsLifetime(measurementsLifetime);
60
62
63 NDN_LOG_DEBUG(*m_retxSuppression);
64 NFD_LOG_DEBUG("probing-interval=" << m_probing.getProbingInterval()
65 << " max-timeouts=" << m_nMaxTimeouts
66 << " measurements-lifetime=" << m_measurements.getMeasurementsLifetime());
67}
68
69const Name&
71{
72 static const auto strategyName = Name("/localhost/nfd/strategy/asf").appendVersion(5);
73 return strategyName;
74}
75
76void
77AsfStrategy::afterReceiveInterest(const Interest& interest, const FaceEndpoint& ingress,
78 const shared_ptr<pit::Entry>& pitEntry)
79{
80 const auto& fibEntry = this->lookupFib(*pitEntry);
81
82 // Check if the interest is new and, if so, skip the retx suppression check
83 if (!hasPendingOutRecords(*pitEntry)) {
84 auto faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry);
85 if (faceToUse == nullptr) {
86 NFD_LOG_INTEREST_FROM(interest, ingress, "new no-nexthop");
87 sendNoRouteNack(ingress.face, pitEntry);
88 }
89 else {
90 NFD_LOG_INTEREST_FROM(interest, ingress, "new forward-to=" << faceToUse->getId());
91 forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
92 sendProbe(interest, ingress, *faceToUse, fibEntry, pitEntry);
93 }
94 return;
95 }
96
97 auto faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry, false);
98 if (faceToUse != nullptr) {
99 auto suppressResult = m_retxSuppression->decidePerUpstream(*pitEntry, *faceToUse);
100 if (suppressResult == RetxSuppressionResult::SUPPRESS) {
101 // Cannot be sent on this face, interest was received within the suppression window
102 NFD_LOG_INTEREST_FROM(interest, ingress, "retx forward-to=" << faceToUse->getId() << " suppressed");
103 }
104 else {
105 // The retx arrived after the suppression period: forward it but don't probe, because
106 // probing was done earlier for this interest when it was newly received
107 NFD_LOG_INTEREST_FROM(interest, ingress, "retx forward-to=" << faceToUse->getId());
108 auto* outRecord = forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
109 if (outRecord && suppressResult == RetxSuppressionResult::FORWARD) {
110 m_retxSuppression->incrementIntervalForOutRecord(*outRecord);
111 }
112 }
113 return;
114 }
115
116 // If all eligible faces have been used (i.e., they all have a pending out-record),
117 // choose the nexthop with the earliest out-record
118 const auto& nexthops = fibEntry.getNextHops();
119 auto it = findEligibleNextHopWithEarliestOutRecord(ingress.face, interest, nexthops, pitEntry);
120 if (it == nexthops.end()) {
121 NFD_LOG_INTEREST_FROM(interest, ingress, "retx no-nexthop");
122 return;
123 }
124 auto& outFace = it->getFace();
125 auto suppressResult = m_retxSuppression->decidePerUpstream(*pitEntry, outFace);
126 if (suppressResult == RetxSuppressionResult::SUPPRESS) {
127 NFD_LOG_INTEREST_FROM(interest, ingress, "retx retry-to=" << outFace.getId() << " suppressed");
128 }
129 else {
130 NFD_LOG_INTEREST_FROM(interest, ingress, "retx retry-to=" << outFace.getId());
131 // sendInterest() is used here instead of forwardInterest() because the measurements info
132 // were already attached to this face in the previous forwarding
133 auto* outRecord = sendInterest(interest, outFace, pitEntry);
134 if (outRecord && suppressResult == RetxSuppressionResult::FORWARD) {
135 m_retxSuppression->incrementIntervalForOutRecord(*outRecord);
136 }
137 }
138}
139
140void
141AsfStrategy::beforeSatisfyInterest(const Data& data, const FaceEndpoint& ingress,
142 const shared_ptr<pit::Entry>& pitEntry)
143{
144 NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(pitEntry->getName());
145 if (namespaceInfo == nullptr) {
146 NFD_LOG_DATA_FROM(data, ingress, "no-measurements");
147 return;
148 }
149
150 // Record the RTT between the Interest out to Data in
151 FaceInfo* faceInfo = namespaceInfo->getFaceInfo(ingress.face.getId());
152 if (faceInfo == nullptr) {
153 NFD_LOG_DATA_FROM(data, ingress, "no-face-info");
154 return;
155 }
156
157 auto outRecord = pitEntry->findOutRecord(ingress.face);
158 if (outRecord == pitEntry->out_end()) {
159 NFD_LOG_DATA_FROM(data, ingress, "no-out-record");
160 }
161 else {
162 faceInfo->recordRtt(time::steady_clock::now() - outRecord->getLastRenewed());
163 NFD_LOG_DATA_FROM(data, ingress, "rtt=" << faceInfo->getLastRtt() << " srtt=" << faceInfo->getSrtt());
164 }
165
166 // Extend lifetime for measurements associated with Face
167 namespaceInfo->extendFaceInfoLifetime(*faceInfo, ingress.face.getId());
168 // Extend PIT entry timer to allow slower probes to arrive
169 this->setExpiryTimer(pitEntry, 50_ms);
170 faceInfo->cancelTimeout(data.getName());
171 faceInfo->setNTimeouts(0);
172}
173
174void
175AsfStrategy::afterReceiveNack(const lp::Nack& nack, const FaceEndpoint& ingress,
176 const shared_ptr<pit::Entry>& pitEntry)
177{
178 NFD_LOG_NACK_FROM(nack, ingress, "");
179 onTimeoutOrNack(pitEntry->getName(), ingress.face.getId(), true);
180 this->processNack(nack, ingress.face, pitEntry);
181}
182
184AsfStrategy::forwardInterest(const Interest& interest, Face& outFace, const fib::Entry& fibEntry,
185 const shared_ptr<pit::Entry>& pitEntry)
186{
187 const auto& interestName = interest.getName();
188 auto faceId = outFace.getId();
189
190 auto* outRecord = sendInterest(interest, outFace, pitEntry);
191
192 FaceInfo& faceInfo = m_measurements.getOrCreateFaceInfo(fibEntry, interestName, faceId);
193
194 // Refresh measurements since Face is being used for forwarding
195 NamespaceInfo& namespaceInfo = m_measurements.getOrCreateNamespaceInfo(fibEntry, interestName);
196 namespaceInfo.extendFaceInfoLifetime(faceInfo, faceId);
197
198 if (!faceInfo.isTimeoutScheduled()) {
199 auto timeout = faceInfo.scheduleTimeout(interestName,
200 [this, name = interestName, faceId] {
201 onTimeoutOrNack(name, faceId, false);
202 });
203 NFD_LOG_TRACE("Scheduled timeout for " << fibEntry.getPrefix() << " to=" << faceId
204 << " in " << time::duration_cast<time::milliseconds>(timeout));
205 }
206
207 return outRecord;
208}
209
210void
211AsfStrategy::sendProbe(const Interest& interest, const FaceEndpoint& ingress, const Face& faceToUse,
212 const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry)
213{
214 if (!m_probing.isProbingNeeded(fibEntry, interest.getName()))
215 return;
216
217 Face* faceToProbe = m_probing.getFaceToProbe(ingress.face, interest, fibEntry, faceToUse);
218 if (faceToProbe == nullptr)
219 return;
220
221 Interest probeInterest(interest);
222 probeInterest.refreshNonce();
223 NFD_LOG_DEBUG("Sending probe " << probeInterest.getName() << " nonce=" << probeInterest.getNonce()
224 << " to=" << faceToProbe->getId() << " trigger-nonce=" << interest.getNonce());
225 forwardInterest(probeInterest, *faceToProbe, fibEntry, pitEntry);
226
227 m_probing.afterForwardingProbe(fibEntry, interest.getName());
228}
229
230static auto
232{
233 // The RTT is used to store the status of the face:
234 // - A positive value indicates data was received and is assumed to indicate a working face (group 1),
235 // - RTT_NO_MEASUREMENT indicates a face is unmeasured (group 2),
236 // - RTT_TIMEOUT indicates a face is timed out (group 3).
237 // These groups are defined in the technical report.
238 //
239 // When forwarding, we assume an order where working faces (group 1) are ranked
240 // higher than unmeasured faces (group 2), and unmeasured faces are ranked higher
241 // than timed out faces (group 3). We assign each group a priority value from 1-3
242 // to ensure lowest-to-highest ordering consistent with this logic.
243
244 // Working faces are ranked first in priority; if RTT is not
245 // a special value, we assume the face to be in this group.
246 int priority = 1;
247 if (fs.rtt == FaceInfo::RTT_NO_MEASUREMENT) {
248 priority = 2;
249 }
250 else if (fs.rtt == FaceInfo::RTT_TIMEOUT) {
251 priority = 3;
252 }
253
254 // We set SRTT by default to the max value; if a face is working, we instead set it to the actual value.
255 // Unmeasured and timed out faces are not sorted by SRTT.
256 auto srtt = priority == 1 ? fs.srtt : time::nanoseconds::max();
257
258 // For ranking, group takes the priority over SRTT (if present) or cost, SRTT (if present)
259 // takes priority over cost, and cost takes priority over FaceId.
260 // FaceId is included to ensure all unique entries are included in the ranking (see #5310)
261 return std::tuple(priority, srtt, fs.cost, fs.face->getId());
262}
263
264bool
265AsfStrategy::FaceStatsForwardingCompare::operator()(const FaceStats& lhs, const FaceStats& rhs) const noexcept
266{
268}
269
270Face*
271AsfStrategy::getBestFaceForForwarding(const Interest& interest, const Face& inFace,
272 const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry,
273 bool isInterestNew)
274{
275 FaceStatsForwardingSet rankedFaces;
276
277 auto now = time::steady_clock::now();
278 for (const auto& nh : fibEntry.getNextHops()) {
279 if (!isNextHopEligible(inFace, interest, nh, pitEntry, !isInterestNew, now)) {
280 continue;
281 }
282
283 const FaceInfo* info = m_measurements.getFaceInfo(fibEntry, interest.getName(), nh.getFace().getId());
284 if (info == nullptr) {
285 rankedFaces.insert({&nh.getFace(), FaceInfo::RTT_NO_MEASUREMENT,
286 FaceInfo::RTT_NO_MEASUREMENT, nh.getCost()});
287 }
288 else {
289 rankedFaces.insert({&nh.getFace(), info->getLastRtt(), info->getSrtt(), nh.getCost()});
290 }
291 }
292
293 auto it = rankedFaces.begin();
294 return it != rankedFaces.end() ? it->face : nullptr;
295}
296
297void
298AsfStrategy::onTimeoutOrNack(const Name& interestName, FaceId faceId, bool isNack)
299{
300 NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(interestName);
301 if (namespaceInfo == nullptr) {
302 NFD_LOG_TRACE(interestName << " FibEntry has been removed since timeout scheduling");
303 return;
304 }
305
306 FaceInfo* fiPtr = namespaceInfo->getFaceInfo(faceId);
307 if (fiPtr == nullptr) {
308 NFD_LOG_TRACE(interestName << " FaceInfo id=" << faceId << " has been removed since timeout scheduling");
309 return;
310 }
311
312 auto& faceInfo = *fiPtr;
313 size_t nTimeouts = faceInfo.getNTimeouts() + 1;
314 faceInfo.setNTimeouts(nTimeouts);
315
316 if (nTimeouts < m_nMaxTimeouts && !isNack) {
317 NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts << " ignoring");
318 // Extend lifetime for measurements associated with Face
319 namespaceInfo->extendFaceInfoLifetime(faceInfo, faceId);
320 faceInfo.cancelTimeout(interestName);
321 }
322 else {
323 NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts);
324 faceInfo.recordTimeout(interestName);
325 faceInfo.setNTimeouts(0);
326 }
327}
328
329void
330AsfStrategy::sendNoRouteNack(Face& face, const shared_ptr<pit::Entry>& pitEntry)
331{
332 lp::NackHeader nackHeader;
333 nackHeader.setReason(lp::NackReason::NO_ROUTE);
334 this->sendNack(nackHeader, face, pitEntry);
335 this->rejectPendingInterest(pitEntry);
336}
337
338} // namespace nfd::fw::asf
This file contains common algorithms used by forwarding strategies.
Represents a face-endpoint pair in the forwarder.
Main class of NFD's forwarding engine.
Definition forwarder.hpp:54
Generalization of a network interface.
Definition face.hpp:118
FaceId getId() const noexcept
Returns the face ID.
Definition face.hpp:195
Represents an entry in the FIB.
Definition fib-entry.hpp:91
const Name & getPrefix() const noexcept
Definition fib-entry.hpp:97
void processNack(const lp::Nack &nack, const Face &inFace, const shared_ptr< pit::Entry > &pitEntry)
static std::unique_ptr< RetxSuppressionExponential > construct(const StrategyParameters &params)
Base class of all forwarding strategies.
Definition strategy.hpp:46
void rejectPendingInterest(const shared_ptr< pit::Entry > &pitEntry)
Schedule the PIT entry for immediate deletion.
Definition strategy.hpp:335
const fib::Entry & lookupFib(const pit::Entry &pitEntry) const
Performs a FIB lookup, considering Link object if present.
Definition strategy.cpp:313
bool sendNack(const lp::NackHeader &header, Face &egress, const shared_ptr< pit::Entry > &pitEntry)
Send a Nack packet.
Definition strategy.hpp:351
pit::OutRecord * sendInterest(const Interest &interest, Face &egress, const shared_ptr< pit::Entry > &pitEntry)
Send an Interest packet.
Definition strategy.cpp:236
static ParsedInstanceName parseInstanceName(const Name &input)
Parse a strategy instance name.
Definition strategy.cpp:123
void setExpiryTimer(const shared_ptr< pit::Entry > &pitEntry, time::milliseconds duration)
Schedule the PIT entry to be erased after duration.
Definition strategy.hpp:386
void setInstanceName(const Name &name) noexcept
Set strategy instance name.
Definition strategy.hpp:448
static StrategyParameters parseParameters(const PartialName &params)
Parse strategy parameters encoded in a strategy instance name.
Definition strategy.cpp:144
static Name makeInstanceName(const Name &input, const Name &strategyName)
Construct a strategy instance name.
Definition strategy.cpp:134
std::enable_if_t< std::is_signed_v< T >, T > getOrDefault(const key_type &key, const T &defaultVal) const
Definition strategy.hpp:489
NamespaceInfo & getOrCreateNamespaceInfo(const fib::Entry &fibEntry, const Name &prefix)
static constexpr time::milliseconds DEFAULT_MEASUREMENTS_LIFETIME
void setMeasurementsLifetime(time::milliseconds measurementsLifetime)
time::milliseconds getMeasurementsLifetime() const
FaceInfo & getOrCreateFaceInfo(const fib::Entry &fibEntry, const Name &interestName, FaceId faceId)
FaceInfo * getFaceInfo(const fib::Entry &fibEntry, const Name &interestName, FaceId faceId)
NamespaceInfo * getNamespaceInfo(const Name &prefix)
Adaptive SRTT-based forwarding strategy.
static const Name & getStrategyName()
void afterReceiveNack(const lp::Nack &nack, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger after a Nack is received.
AsfStrategy(Forwarder &forwarder, const Name &name=getStrategyName())
void afterReceiveInterest(const Interest &interest, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger after an Interest is received.
void beforeSatisfyInterest(const Data &data, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger before a PIT entry is satisfied.
Strategy information for each face in a namespace.
static constexpr time::nanoseconds RTT_TIMEOUT
void cancelTimeout(const Name &prefix)
time::nanoseconds scheduleTimeout(const Name &interestName, ndn::scheduler::EventCallback cb)
static constexpr time::nanoseconds RTT_NO_MEASUREMENT
time::nanoseconds getSrtt() const
void setNTimeouts(size_t nTimeouts)
time::nanoseconds getLastRtt() const
void recordTimeout(const Name &interestName)
void recordRtt(time::nanoseconds rtt)
Stores strategy information about each face in this namespace.
FaceInfo * getFaceInfo(FaceId faceId)
void extendFaceInfoLifetime(FaceInfo &info, FaceId faceId)
Face * getFaceToProbe(const Face &inFace, const Interest &interest, const fib::Entry &fibEntry, const Face &faceUsed)
bool isProbingNeeded(const fib::Entry &fibEntry, const Name &interestName)
time::milliseconds getProbingInterval() const
void afterForwardingProbe(const fib::Entry &fibEntry, const Name &interestName)
void setProbingInterval(time::milliseconds probingInterval)
Contains information about an Interest toward an outgoing face.
#define NFD_LOG_INIT(name)
Definition logger.hpp:31
#define NFD_LOG_DEBUG
Definition logger.hpp:38
#define NFD_LOG_TRACE
Definition logger.hpp:37
static auto getFaceRankForForwarding(const FaceStats &fs) noexcept
@ SUPPRESS
Interest is a retransmission and should be suppressed.
@ FORWARD
Interest is a retransmission and should be forwarded.
bool isNextHopEligible(const Face &inFace, const Interest &interest, const fib::NextHop &nexthop, const shared_ptr< pit::Entry > &pitEntry, bool wantUnused, time::steady_clock::time_point now)
Determines whether a NextHop is eligible, i.e., not the same inFace.
fib::NextHopList::const_iterator findEligibleNextHopWithEarliestOutRecord(const Face &inFace, const Interest &interest, const fib::NextHopList &nexthops, const shared_ptr< pit::Entry > &pitEntry)
Pick an eligible NextHop with earliest out-record.
bool hasPendingOutRecords(const pit::Entry &pitEntry)
Determine whether pitEntry has any pending out-records.
Definition algorithm.cpp:85
#define NFD_LOG_INTEREST_FROM(interest, ingress, msg)
Logs the reception of interest on ingress, followed by msg, at DEBUG level.
Definition strategy.hpp:542
#define NFD_LOG_DATA_FROM(data, ingress, msg)
Logs the reception of data on ingress, followed by msg, at DEBUG level.
Definition strategy.hpp:549
#define NFD_LOG_NACK_FROM(nack, ingress, msg)
Logs the reception of nack on ingress, followed by msg, at DEBUG level.
Definition strategy.hpp:555
#define NFD_REGISTER_STRATEGY(S)
Registers a forwarding strategy.
Definition strategy.hpp:531
std::optional< uint64_t > version
The strategy version number, if present.
Definition strategy.hpp:420
PartialName parameters
Parameter components, may be empty.
Definition strategy.hpp:421
Container for ranking-related values.