segment-fetcher.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2024 Regents of the University of California,
4  * Colorado State University,
5  * University Pierre & Marie Curie, Sorbonne University.
6  *
7  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8  *
9  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10  * terms of the GNU Lesser General Public License as published by the Free Software
11  * Foundation, either version 3 of the License, or (at your option) any later version.
12  *
13  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16  *
17  * You should have received copies of the GNU General Public License and GNU Lesser
18  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19  * <http://www.gnu.org/licenses/>.
20  *
21  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22  */
23 
27 #include "ndn-cxx/lp/nack.hpp"
29 
30 #include <boost/asio/io_context.hpp>
31 #include <boost/asio/post.hpp>
32 #include <boost/lexical_cast.hpp>
33 #include <boost/range/adaptor/map.hpp>
34 
35 #include <algorithm>
36 #include <cmath>
37 
38 namespace ndn {
39 
40 void
42 {
43  if (maxTimeout < 1_ms) {
44  NDN_THROW(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
45  }
46 
47  if (initCwnd < 1.0) {
48  NDN_THROW(std::invalid_argument("initCwnd must be greater than or equal to 1"));
49  }
50 
51  if (aiStep < 0.0) {
52  NDN_THROW(std::invalid_argument("aiStep must be greater than or equal to 0"));
53  }
54 
55  if (mdCoef < 0.0 || mdCoef > 1.0) {
56  NDN_THROW(std::invalid_argument("mdCoef must be in range [0, 1]"));
57  }
58 }
59 
60 SegmentFetcher::SegmentFetcher(Face& face,
61  security::Validator& validator,
62  const SegmentFetcher::Options& options)
63  : m_options(options)
64  , m_face(face)
65  , m_scheduler(m_face.getIoContext())
66  , m_validator(validator)
67  , m_rttEstimator(make_shared<util::RttEstimator::Options>(options.rttOptions))
68  , m_timeLastSegmentReceived(time::steady_clock::now())
69  , m_cwnd(options.initCwnd)
70  , m_ssthresh(options.initSsthresh)
71 {
72  m_options.validate();
73 }
74 
75 shared_ptr<SegmentFetcher>
77  const Interest& baseInterest,
78  security::Validator& validator,
79  const SegmentFetcher::Options& options)
80 {
81  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
82  fetcher->m_this = fetcher;
83  fetcher->fetchFirstSegment(baseInterest, false);
84  return fetcher;
85 }
86 
87 void
89 {
90  if (!m_this) {
91  return;
92  }
93 
94  m_pendingSegments.clear(); // cancels pending Interests and timeout events
95  boost::asio::post(m_face.getIoContext(), [self = std::move(m_this)] {});
96 }
97 
98 bool
99 SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
100 {
101  auto self = weakSelf.lock();
102  return self == nullptr || self->m_this == nullptr;
103 }
104 
105 void
106 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
107 {
108  Interest interest(baseInterest);
109  interest.setCanBePrefix(true);
110  if (!interest.getName().empty() && interest.getName()[-1].isVersion()) {
111  interest.setMustBeFresh(false);
112  }
113  else {
114  interest.setMustBeFresh(m_options.probeLatestVersion);
115  }
116  interest.setInterestLifetime(m_options.interestLifetime);
117  if (isRetransmission) {
118  interest.refreshNonce();
119  }
120 
121  sendInterest(0, interest, isRetransmission);
122 }
123 
124 void
125 SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
126 {
127  if (checkAllSegmentsReceived()) {
128  // All segments have been retrieved
129  return finalizeFetch();
130  }
131 
132  int64_t availableWindowSize;
133  if (m_options.inOrder) {
134  availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
135  }
136  else {
137  availableWindowSize = static_cast<int64_t>(m_cwnd);
138  }
139  availableWindowSize -= m_nSegmentsInFlight;
140 
141  std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
142 
143  while (availableWindowSize > 0) {
144  if (!m_retxQueue.empty()) {
145  auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
146  m_retxQueue.pop();
147  if (pendingSegmentIt == m_pendingSegments.end()) {
148  // Skip re-requesting this segment, since it was received after RTO timeout
149  continue;
150  }
151  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
152  segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
153  }
154  else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
155  if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
156  // Don't request a segment a second time if received in response to first "discovery" Interest
157  m_nextSegmentNum++;
158  continue;
159  }
160  segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
161  }
162  else {
163  break;
164  }
165  availableWindowSize--;
166  }
167 
168  for (const auto& segment : segmentsToRequest) {
169  Interest interest(origInterest); // to preserve Interest elements
170  interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
171  interest.setCanBePrefix(false);
172  interest.setMustBeFresh(false);
173  interest.setInterestLifetime(m_options.interestLifetime);
174  interest.refreshNonce();
175  sendInterest(segment.first, interest, segment.second);
176  }
177 }
178 
179 void
180 SegmentFetcher::sendInterest(uint64_t segNum, const Interest& interest, bool isRetransmission)
181 {
182  weak_ptr<SegmentFetcher> weakSelf = m_this;
183 
184  ++m_nSegmentsInFlight;
185  auto pendingInterest = m_face.expressInterest(interest,
186  [this, weakSelf] (const Interest& interest, const Data& data) {
187  afterSegmentReceivedCb(interest, data, weakSelf);
188  },
189  [this, weakSelf] (const Interest& interest, const lp::Nack& nack) {
190  afterNackReceivedCb(interest, nack, weakSelf);
191  },
192  nullptr);
193 
194  auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
195  auto timeoutEvent = m_scheduler.schedule(timeout, [this, interest, weakSelf] {
196  afterTimeoutCb(interest, weakSelf);
197  });
198 
199  if (isRetransmission) {
200  updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
201  return;
202  }
203 
204  PendingSegment pendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
205  pendingInterest, timeoutEvent};
206  bool isNew = m_pendingSegments.try_emplace(segNum, std::move(pendingSegment)).second;
207  BOOST_VERIFY(isNew);
208  m_highInterest = segNum;
209 }
210 
211 void
212 SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
213  const weak_ptr<SegmentFetcher>& weakSelf)
214 {
215  if (shouldStop(weakSelf))
216  return;
217 
218  BOOST_ASSERT(m_nSegmentsInFlight > 0);
219  m_nSegmentsInFlight--;
220 
221  name::Component currentSegmentComponent = data.getName().get(-1);
222  if (!currentSegmentComponent.isSegment()) {
223  return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
224  }
225 
226  uint64_t currentSegment = currentSegmentComponent.toSegment();
227 
228  // The first received Interest could have any segment ID
229  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
230  if (m_receivedSegments.size() > 0) {
231  pendingSegmentIt = m_pendingSegments.find(currentSegment);
232  }
233  else {
234  pendingSegmentIt = m_pendingSegments.begin();
235  }
236 
237  if (pendingSegmentIt == m_pendingSegments.end()) {
238  return;
239  }
240 
241  pendingSegmentIt->second.timeoutEvent.cancel();
242 
243  afterSegmentReceived(data);
244 
245  m_validator.validate(data,
246  [=] (const Data& d) { afterValidationSuccess(d, origInterest, pendingSegmentIt, weakSelf); },
247  [=] (const Data& d, const auto& error) { afterValidationFailure(d, error, weakSelf); });
248 }
249 
250 void
251 SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
252  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
253  const weak_ptr<SegmentFetcher>& weakSelf)
254 {
255  if (shouldStop(weakSelf))
256  return;
257 
258  // We update the last receive time here instead of in the segment received callback so that the
259  // transfer will not fail to terminate if we only received invalid Data packets.
260  m_timeLastSegmentReceived = time::steady_clock::now();
261 
262  m_nReceived++;
263 
264  // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
265  uint64_t currentSegment = data.getName().get(-1).toSegment();
266  m_receivedSegments.insert(currentSegment);
267 
268  // Add measurement to RTO estimator (if not retransmission)
269  if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
270  BOOST_ASSERT(m_nSegmentsInFlight >= 0);
271  m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
272  static_cast<size_t>(m_nSegmentsInFlight) + 1);
273  }
274 
275  // Remove from pending segments map
276  m_pendingSegments.erase(pendingSegmentIt);
277 
278  // Copy data in segment to temporary buffer
279  auto receivedSegmentIt = m_segmentBuffer.try_emplace(currentSegment, data.getContent().value_size())
280  .first;
281  std::copy(data.getContent().value_begin(), data.getContent().value_end(),
282  receivedSegmentIt->second.begin());
283  m_nBytesReceived += data.getContent().value_size();
284  afterSegmentValidated(data);
285 
286  if (data.getFinalBlock()) {
287  if (!data.getFinalBlock()->isSegment()) {
288  return signalError(FINALBLOCKID_NOT_SEGMENT,
289  "Received FinalBlockId did not contain a segment component");
290  }
291 
292  if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
293  m_nSegments = data.getFinalBlock()->toSegment() + 1;
294  cancelExcessInFlightSegments();
295  }
296  }
297 
298  if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
299  do {
300  onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
301  m_segmentBuffer.erase(m_nextSegmentInOrder++);
302  } while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
303  }
304 
305  if (m_receivedSegments.size() == 1) {
306  m_versionedDataName = data.getName().getPrefix(-1);
307  if (currentSegment == 0) {
308  // We received the first segment in response, so we can increment the next segment number
309  m_nextSegmentNum++;
310  }
311  }
312 
313  if (m_highData < currentSegment) {
314  m_highData = currentSegment;
315  }
316 
317  if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
318  windowDecrease();
319  }
320  else {
321  windowIncrease();
322  }
323 
324  fetchSegmentsInWindow(origInterest);
325 }
326 
327 void
328 SegmentFetcher::afterValidationFailure(const Data&,
329  const security::ValidationError& error,
330  const weak_ptr<SegmentFetcher>& weakSelf)
331 {
332  if (shouldStop(weakSelf))
333  return;
334 
335  signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
336 }
337 
338 void
339 SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
340  const weak_ptr<SegmentFetcher>& weakSelf)
341 {
342  if (shouldStop(weakSelf))
343  return;
344 
346 
347  BOOST_ASSERT(m_nSegmentsInFlight > 0);
348  m_nSegmentsInFlight--;
349 
350  switch (nack.getReason()) {
353  afterNackOrTimeout(origInterest);
354  break;
355  default:
356  signalError(NACK_ERROR, "Nack Error");
357  break;
358  }
359 }
360 
361 void
362 SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
363  const weak_ptr<SegmentFetcher>& weakSelf)
364 {
365  if (shouldStop(weakSelf))
366  return;
367 
369 
370  BOOST_ASSERT(m_nSegmentsInFlight > 0);
371  m_nSegmentsInFlight--;
372  afterNackOrTimeout(origInterest);
373 }
374 
375 void
376 SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
377 {
378  if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
379  // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
380  return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
381  }
382 
383  BOOST_ASSERT(!m_pendingSegments.empty());
384 
385  const auto& origName = origInterest.getName();
386  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
387  if (!origName.empty() && origName[-1].isSegment()) {
388  pendingSegmentIt = m_pendingSegments.find(origName[-1].toSegment());
389  BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
390  }
391  else { // First Interest
392  pendingSegmentIt = m_pendingSegments.begin();
393  }
394 
395  // Cancel timeout event and set status to InRetxQueue
396  pendingSegmentIt->second.timeoutEvent.cancel();
397  pendingSegmentIt->second.state = SegmentState::InRetxQueue;
398 
399  m_rttEstimator.backoffRto();
400 
401  if (m_receivedSegments.empty()) {
402  // Resend first Interest (until maximum receive timeout exceeded)
403  fetchFirstSegment(origInterest, true);
404  }
405  else {
406  windowDecrease();
407  m_retxQueue.push(pendingSegmentIt->first);
408  fetchSegmentsInWindow(origInterest);
409  }
410 }
411 
412 void
413 SegmentFetcher::finalizeFetch()
414 {
415  if (m_options.inOrder) {
417  }
418  else {
419  // Combine segments into final buffer
420  OBufferStream buf;
421  // We may have received more segments than exist in the object.
422  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
423 
424  for (int64_t i = 0; i < m_nSegments; i++) {
425  buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
426  }
427  onComplete(buf.buf());
428  }
429  stop();
430 }
431 
432 void
433 SegmentFetcher::windowIncrease()
434 {
435  if (m_options.useConstantCwnd) {
436  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
437  return;
438  }
439 
440  if (m_cwnd < m_ssthresh) {
441  m_cwnd += m_options.aiStep; // additive increase
442  }
443  else {
444  m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
445  }
446 }
447 
448 void
449 SegmentFetcher::windowDecrease()
450 {
451  if (m_options.disableCwa || m_highData > m_recPoint) {
452  m_recPoint = m_highInterest;
453 
454  if (m_options.useConstantCwnd) {
455  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
456  return;
457  }
458 
459  // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
460  m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
461  m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
462  }
463 }
464 
465 void
466 SegmentFetcher::signalError(uint32_t code, const std::string& msg)
467 {
468  onError(code, msg);
469  stop();
470 }
471 
472 void
473 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
474  const PendingInterestHandle& pendingInterest,
475  scheduler::EventId timeoutEvent)
476 {
477  auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
478  BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
479  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
480  pendingSegmentIt->second.state = SegmentState::Retransmitted;
481  pendingSegmentIt->second.hdl = pendingInterest; // cancels previous pending Interest via scoped handle
482  pendingSegmentIt->second.timeoutEvent = timeoutEvent;
483 }
484 
485 void
486 SegmentFetcher::cancelExcessInFlightSegments()
487 {
488  for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
489  if (it->first >= static_cast<uint64_t>(m_nSegments)) {
490  it = m_pendingSegments.erase(it); // cancels pending Interest and timeout event
491  BOOST_ASSERT(m_nSegmentsInFlight > 0);
492  m_nSegmentsInFlight--;
493  }
494  else {
495  ++it;
496  }
497  }
498 }
499 
500 bool
501 SegmentFetcher::checkAllSegmentsReceived()
502 {
503  bool haveReceivedAllSegments = false;
504 
505  if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
506  haveReceivedAllSegments = true;
507  // Verify that all segments in window have been received. If not, send Interests for missing segments.
508  for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
509  if (m_receivedSegments.count(i) == 0) {
510  m_retxQueue.push(i);
511  haveReceivedAllSegments = false;
512  }
513  }
514  }
515 
516  return haveReceivedAllSegments;
517 }
518 
520 SegmentFetcher::getEstimatedRto()
521 {
522  // We don't want an Interest timeout greater than the maximum allowed timeout between the
523  // succesful receipt of segments
524  return std::min(m_options.maxTimeout,
525  time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
526 }
527 
528 } // namespace ndn
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:91
boost::asio::io_context & getIoContext() const noexcept
Returns a reference to the io_context used by this face.
Definition: face.hpp:422
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express an Interest.
Definition: face.cpp:151
Represents an Interest packet.
Definition: interest.hpp:50
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Definition: name.hpp:241
Utility class to fetch a versioned and segmented object.
signal::Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
signal::Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::Validator &validator, const Options &options={})
Initiates segment fetching.
void stop()
Stops fetching.
signal::Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
signal::Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
signal::Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in 'in order' mode.
signal::Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
@ INTEREST_TIMEOUT
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
@ FINALBLOCKID_NOT_SEGMENT
A received FinalBlockId did not contain a segment component.
@ NACK_ERROR
An unrecoverable Nack was received during retrieval.
@ SEGMENT_VALIDATION_FAIL
One of the retrieved segments failed user-provided validation.
@ DATA_HAS_NO_SEGMENT
One of the retrieved Data packets lacked a segment number in the last Name component (excl....
signal::Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
signal::Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:78
Interface for validating data and interest packets.
Definition: validator.hpp:61
void validate(const Data &data, const DataValidationSuccessCallback &successCb, const DataValidationFailureCallback &failureCb)
Asynchronously validate data.
Definition: validator.cpp:47
static time_point now() noexcept
Definition: time.cpp:79
void backoffRto()
Backoff RTO by a factor of Options::rtoBackoffMultiplier.
void addMeasurement(time::nanoseconds rtt, size_t nExpectedSamples=1)
Records a new RTT measurement.
time::nanoseconds getEstimatedRto() const
Returns the estimated RTO value.
#define NDN_THROW(e)
Definition: exception.hpp:56
::boost::chrono::milliseconds milliseconds
Definition: time.hpp:52
@ Name
Definition: tlv.hpp:71
@ Data
Definition: tlv.hpp:69
@ Interest
Definition: tlv.hpp:68
Definition: data.cpp:25
Options for SegmentFetcher.
size_t flowControlWindow
Maximum number of segments stored in the reorder buffer.
double initCwnd
Initial congestion window size.
bool resetCwndToInit
Reduce cwnd to initCwnd when a loss event occurs.
bool probeLatestVersion
Use the first Interest to probe the latest version of the object.
double mdCoef
Multiplicative decrease coefficient.
time::milliseconds maxTimeout
Maximum allowed time between successful receipt of segments.
bool disableCwa
Disable Conservative Window Adaptation.
bool inOrder
Set to true for 'in order' mode, false for 'block' mode.
bool useConstantInterestTimeout
If true, Interest timeout is kept fixed at maxTimeout.
time::milliseconds interestLifetime
Lifetime of sent Interests (independent of Interest timeout)
double aiStep
Additive increase step (in segments)
bool useConstantCwnd
If true, window size is kept fixed at initCwnd.
bool ignoreCongMarks
Disable window decrease after a congestion mark is received.