generic-link-service.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2024, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "generic-link-service.hpp"
27 
28 #include <ndn-cxx/lp/fields.hpp>
29 #include <ndn-cxx/lp/pit-token.hpp>
30 #include <ndn-cxx/lp/tags.hpp>
31 
32 #include <cmath>
33 
34 namespace nfd::face {
35 
36 NFD_LOG_INIT(GenericLinkService);
37 
38 constexpr size_t CONGESTION_MARK_SIZE = tlv::sizeOfVarNumber(lp::tlv::CongestionMark) + // type
39  tlv::sizeOfVarNumber(sizeof(uint64_t)) + // length
40  tlv::sizeOfNonNegativeInteger(UINT64_MAX); // value
41 
43  : m_options(options)
44  , m_fragmenter(m_options.fragmenterOptions, this)
45  , m_reassembler(m_options.reassemblerOptions, this)
46  , m_reliability(m_options.reliabilityOptions, this)
47 {
48  m_reassembler.beforeTimeout.connect([this] (auto&&...) { ++nReassemblyTimeouts; });
49  m_reliability.onDroppedInterest.connect([this] (const auto& i) { notifyDroppedInterest(i); });
50  nReassembling.observe(&m_reassembler);
51 }
52 
53 void
55 {
56  m_options = options;
57  m_fragmenter.setOptions(m_options.fragmenterOptions);
58  m_reassembler.setOptions(m_options.reassemblerOptions);
59  m_reliability.setOptions(m_options.reliabilityOptions);
60 }
61 
62 ssize_t
64 {
65  // Since MTU_UNLIMITED is negative, it will implicitly override any finite override MTU
66  return std::min(m_options.overrideMtu, getTransport()->getMtu());
67 }
68 
69 bool
71 {
72  // Not allowed to override unlimited transport MTU
73  if (getTransport()->getMtu() == MTU_UNLIMITED) {
74  return false;
75  }
76 
77  // Override MTU must be at least MIN_MTU (also implicitly forbids MTU_UNLIMITED and MTU_INVALID)
78  return mtu >= MIN_MTU;
79 }
80 
81 void
82 GenericLinkService::requestIdlePacket()
83 {
84  // No need to request Acks to attach to this packet from LpReliability, as they are already
85  // attached in sendLpPacket
86  NFD_LOG_FACE_TRACE("IDLE packet requested");
87  this->sendLpPacket({});
88 }
89 
90 void
91 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
92 {
93  const ssize_t mtu = getEffectiveMtu();
94 
95  if (m_options.reliabilityOptions.isEnabled) {
96  m_reliability.piggyback(pkt, mtu);
97  }
98 
99  if (m_options.allowCongestionMarking) {
100  checkCongestionLevel(pkt);
101  }
102 
103  auto block = pkt.wireEncode();
104  if (mtu != MTU_UNLIMITED && block.size() > static_cast<size_t>(mtu)) {
105  ++nOutOverMtu;
106  NFD_LOG_FACE_WARN("attempted to send packet over MTU limit");
107  return;
108  }
109  this->sendPacket(block);
110 }
111 
112 void
113 GenericLinkService::doSendInterest(const Interest& interest)
114 {
115  lp::Packet lpPacket(interest.wireEncode());
116 
117  encodeLpFields(interest, lpPacket);
118 
119  this->sendNetPacket(std::move(lpPacket), true);
120 }
121 
122 void
123 GenericLinkService::doSendData(const Data& data)
124 {
125  lp::Packet lpPacket(data.wireEncode());
126 
127  encodeLpFields(data, lpPacket);
128 
129  this->sendNetPacket(std::move(lpPacket), false);
130 }
131 
132 void
133 GenericLinkService::doSendNack(const lp::Nack& nack)
134 {
135  lp::Packet lpPacket(nack.getInterest().wireEncode());
136  lpPacket.add<lp::NackField>(nack.getHeader());
137 
138  encodeLpFields(nack, lpPacket);
139 
140  this->sendNetPacket(std::move(lpPacket), false);
141 }
142 
143 void
144 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
145 {
146  std::for_each(pkts.begin(), pkts.end(), [this] (lp::Packet& pkt) {
147  pkt.set<lp::SequenceField>(++m_lastSeqNo);
148  });
149 }
150 
151 void
152 GenericLinkService::encodeLpFields(const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
153 {
154  if (m_options.allowLocalFields) {
155  auto incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
156  if (incomingFaceIdTag != nullptr) {
157  lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
158  }
159  }
160 
161  auto congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
162  if (congestionMarkTag != nullptr) {
163  lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
164  }
165 
166  if (m_options.allowSelfLearning) {
167  auto nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
168  if (nonDiscoveryTag != nullptr) {
169  lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
170  }
171 
172  auto prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
173  if (prefixAnnouncementTag != nullptr) {
174  lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
175  }
176  }
177 
178  auto pitToken = netPkt.getTag<lp::PitToken>();
179  if (pitToken != nullptr) {
180  lpPacket.add<lp::PitTokenField>(*pitToken);
181  }
182 }
183 
184 void
185 GenericLinkService::sendNetPacket(lp::Packet&& pkt, bool isInterest)
186 {
187  std::vector<lp::Packet> frags;
188  ssize_t mtu = getEffectiveMtu();
189 
190  // Make space for feature fields in fragments
191  if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
193  }
194 
195  if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
196  mtu -= CONGESTION_MARK_SIZE;
197  }
198 
199  // An MTU of 0 is allowed but will cause all packets to be dropped before transmission
200  BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu >= 0);
201 
202  if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
203  bool isOk = false;
204  std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
205  if (!isOk) {
206  // fragmentation failed (warning is logged by LpFragmenter)
208  return;
209  }
210  }
211  else {
212  if (m_options.reliabilityOptions.isEnabled) {
213  frags.push_back(pkt);
214  }
215  else {
216  frags.push_back(std::move(pkt));
217  }
218  }
219 
220  if (frags.size() == 1) {
221  // even if indexed fragmentation is enabled, the fragmenter should not
222  // fragment the packet if it can fit in MTU
223  BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
224  BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
225  }
226 
227  // Only assign sequences to fragments if reliability enabled or if packet contains >1 fragment
228  if (m_options.reliabilityOptions.isEnabled || frags.size() > 1) {
229  // Assign sequences to all fragments
230  this->assignSequences(frags);
231  }
232 
233  if (m_options.reliabilityOptions.isEnabled && frags.front().has<lp::FragmentField>()) {
234  m_reliability.handleOutgoing(frags, std::move(pkt), isInterest);
235  }
236 
237  for (lp::Packet& frag : frags) {
238  this->sendLpPacket(std::move(frag));
239  }
240 }
241 
242 void
243 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
244 {
245  ssize_t sendQueueLength = getTransport()->getSendQueueLength();
246  // The transport must support retrieving the current send queue length
247  if (sendQueueLength < 0) {
248  return;
249  }
250 
251  if (sendQueueLength > 0) {
252  NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" <<
253  m_options.defaultCongestionThreshold << " capacity=" <<
254  getTransport()->getSendQueueCapacity());
255  }
256 
257  // sendQueue is above target
258  if (static_cast<size_t>(sendQueueLength) > m_options.defaultCongestionThreshold) {
259  const auto now = time::steady_clock::now();
260 
261  if (m_nextMarkTime == time::steady_clock::time_point::max()) {
262  m_nextMarkTime = now + m_options.baseCongestionMarkingInterval;
263  }
264  // Mark packet if sendQueue stays above target for one interval
265  else if (now >= m_nextMarkTime) {
266  pkt.set<lp::CongestionMarkField>(1);
268  NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
269 
270  ++m_nMarkedSinceInMarkingState;
271  // Decrease the marking interval by the inverse of the square root of the number of packets
272  // marked in this incident of congestion
273  time::nanoseconds interval(static_cast<time::nanoseconds::rep>(
274  m_options.baseCongestionMarkingInterval.count() /
275  std::sqrt(m_nMarkedSinceInMarkingState + 1)));
276  m_nextMarkTime += interval;
277  }
278  }
279  else if (m_nextMarkTime != time::steady_clock::time_point::max()) {
280  // Congestion incident has ended, so reset
281  NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
282  m_nextMarkTime = time::steady_clock::time_point::max();
283  m_nMarkedSinceInMarkingState = 0;
284  }
285 }
286 
287 void
288 GenericLinkService::doReceivePacket(const Block& packet, const EndpointId& endpoint)
289 {
290  try {
291  lp::Packet pkt(packet);
292 
293  if (m_options.reliabilityOptions.isEnabled) {
294  if (!m_reliability.processIncomingPacket(pkt)) {
295  NFD_LOG_FACE_TRACE("received duplicate fragment: DROP");
297  return;
298  }
299  }
300 
301  if (!pkt.has<lp::FragmentField>()) {
302  NFD_LOG_FACE_TRACE("received IDLE packet: DROP");
303  return;
304  }
305 
306  if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
307  !m_options.allowReassembly) {
308  NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
309  return;
310  }
311 
312  auto [isReassembled, netPkt, firstPkt] = m_reassembler.receiveFragment(endpoint, pkt);
313  if (isReassembled) {
314  this->decodeNetPacket(netPkt, firstPkt, endpoint);
315  }
316  }
317  catch (const tlv::Error& e) {
318  ++nInLpInvalid;
319  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
320  }
321 }
322 
323 void
324 GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt,
325  const EndpointId& endpointId)
326 {
327  try {
328  switch (netPkt.type()) {
329  case tlv::Interest:
330  if (firstPkt.has<lp::NackField>()) {
331  this->decodeNack(netPkt, firstPkt, endpointId);
332  }
333  else {
334  this->decodeInterest(netPkt, firstPkt, endpointId);
335  }
336  break;
337  case tlv::Data:
338  this->decodeData(netPkt, firstPkt, endpointId);
339  break;
340  default:
341  ++nInNetInvalid;
342  NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
343  return;
344  }
345  }
346  catch (const tlv::Error& e) {
347  ++nInNetInvalid;
348  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
349  }
350 }
351 
352 void
353 GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt,
354  const EndpointId& endpointId)
355 {
356  BOOST_ASSERT(netPkt.type() == tlv::Interest);
357  BOOST_ASSERT(!firstPkt.has<lp::NackField>());
358 
359  // forwarding expects Interest to be created with make_shared
360  auto interest = make_shared<Interest>(netPkt);
361 
362  if (firstPkt.has<lp::NextHopFaceIdField>()) {
363  if (m_options.allowLocalFields) {
364  interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
365  }
366  else {
367  NFD_LOG_FACE_WARN("received NextHopFaceId, but local fields disabled: DROP");
368  return;
369  }
370  }
371 
372  if (firstPkt.has<lp::CachePolicyField>()) {
373  ++nInNetInvalid;
374  NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
375  return;
376  }
377 
378  if (firstPkt.has<lp::IncomingFaceIdField>()) {
379  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
380  }
381 
382  if (firstPkt.has<lp::CongestionMarkField>()) {
383  interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
384  }
385 
386  if (firstPkt.has<lp::NonDiscoveryField>()) {
387  if (m_options.allowSelfLearning) {
388  interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
389  }
390  else {
391  NFD_LOG_FACE_WARN("received NonDiscovery, but self-learning disabled: IGNORE");
392  }
393  }
394 
395  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
396  ++nInNetInvalid;
397  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Interest: DROP");
398  return;
399  }
400 
401  if (firstPkt.has<lp::PitTokenField>()) {
402  interest->setTag(make_shared<lp::PitToken>(firstPkt.get<lp::PitTokenField>()));
403  }
404 
405  this->receiveInterest(*interest, endpointId);
406 }
407 
408 void
409 GenericLinkService::decodeData(const Block& netPkt, const lp::Packet& firstPkt,
410  const EndpointId& endpointId)
411 {
412  BOOST_ASSERT(netPkt.type() == tlv::Data);
413 
414  // forwarding expects Data to be created with make_shared
415  auto data = make_shared<Data>(netPkt);
416 
417  if (firstPkt.has<lp::NackField>()) {
418  ++nInNetInvalid;
419  NFD_LOG_FACE_WARN("received Nack with Data: DROP");
420  return;
421  }
422 
423  if (firstPkt.has<lp::NextHopFaceIdField>()) {
424  ++nInNetInvalid;
425  NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
426  return;
427  }
428 
429  if (firstPkt.has<lp::CachePolicyField>()) {
430  // CachePolicy is unprivileged and does not require allowLocalFields option.
431  // In case of an invalid CachePolicyType, get<lp::CachePolicyField> will throw,
432  // so it's unnecessary to check here.
433  data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
434  }
435 
436  if (firstPkt.has<lp::IncomingFaceIdField>()) {
437  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
438  }
439 
440  if (firstPkt.has<lp::CongestionMarkField>()) {
441  data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
442  }
443 
444  if (firstPkt.has<lp::NonDiscoveryField>()) {
445  ++nInNetInvalid;
446  NFD_LOG_FACE_WARN("received NonDiscovery with Data: DROP");
447  return;
448  }
449 
450  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
451  if (m_options.allowSelfLearning) {
452  data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
453  }
454  else {
455  NFD_LOG_FACE_WARN("received PrefixAnnouncement, but self-learning disabled: IGNORE");
456  }
457  }
458 
459  this->receiveData(*data, endpointId);
460 }
461 
462 void
463 GenericLinkService::decodeNack(const Block& netPkt, const lp::Packet& firstPkt,
464  const EndpointId& endpointId)
465 {
466  BOOST_ASSERT(netPkt.type() == tlv::Interest);
467  BOOST_ASSERT(firstPkt.has<lp::NackField>());
468 
469  lp::Nack nack((Interest(netPkt)));
470  nack.setHeader(firstPkt.get<lp::NackField>());
471 
472  if (firstPkt.has<lp::NextHopFaceIdField>()) {
473  ++nInNetInvalid;
474  NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
475  return;
476  }
477 
478  if (firstPkt.has<lp::CachePolicyField>()) {
479  ++nInNetInvalid;
480  NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
481  return;
482  }
483 
484  if (firstPkt.has<lp::IncomingFaceIdField>()) {
485  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
486  }
487 
488  if (firstPkt.has<lp::CongestionMarkField>()) {
489  nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
490  }
491 
492  if (firstPkt.has<lp::NonDiscoveryField>()) {
493  ++nInNetInvalid;
494  NFD_LOG_FACE_WARN("received NonDiscovery with Nack: DROP");
495  return;
496  }
497 
498  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
499  ++nInNetInvalid;
500  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Nack: DROP");
501  return;
502  }
503 
504  this->receiveNack(nack, endpointId);
505 }
506 
507 } // namespace nfd::face
void setOptions(const Options &options)
Set options for fragmenter.
std::tuple< bool, std::vector< lp::Packet > > fragmentPacket(const lp::Packet &packet, size_t mtu)
Fragments a network-layer packet into link-layer packets.
std::tuple< bool, Block, lp::Packet > receiveFragment(const EndpointId &remoteEndpoint, const lp::Packet &packet)
Adds received fragment to the buffer.
signal::Signal< LpReassembler, EndpointId, size_t > beforeTimeout
Notifies before a partial packet is dropped due to timeout.
void setOptions(const Options &options)
Set options for reassembler.
signal::Signal< LpReliability, Interest > onDroppedInterest
Called when an Interest is dropped for exceeding the allowed number of retransmissions.
void piggyback(lp::Packet &pkt, ssize_t mtu)
Called by GenericLinkService to attach Acks onto an outgoing LpPacket.
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + lp::Sequence (8 octets)
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
Observe outgoing fragment(s) of a network packet and store for potential retransmission.
bool processIncomingPacket(const lp::Packet &pkt)
Extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue.
void setOptions(const Options &options)
Set options for reliability.
virtual ssize_t getSendQueueLength()
Returns the current send queue length of the transport (in octets).
Definition: transport.hpp:326
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
std::variant< std::monostate, ethernet::Address, udp::Endpoint > EndpointId
Identifies a remote endpoint on the link.
Definition: face-common.hpp:78
constexpr size_t CONGESTION_MARK_SIZE
constexpr ssize_t MIN_MTU
Minimum MTU that may be set.
Definition: face-common.hpp:66
constexpr ssize_t MTU_UNLIMITED
Indicates that the transport has no limit on payload size.
Definition: transport.hpp:101
bool isEnabled
Enables link-layer reliability.