segment-fetcher.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2020 Regents of the University of California,
4  * Colorado State University,
5  * University Pierre & Marie Curie, Sorbonne University.
6  *
7  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8  *
9  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10  * terms of the GNU Lesser General Public License as published by the Free Software
11  * Foundation, either version 3 of the License, or (at your option) any later version.
12  *
13  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16  *
17  * You should have received copies of the GNU General Public License and GNU Lesser
18  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19  * <http://www.gnu.org/licenses/>.
20  *
21  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22  */
23 
27 #include "ndn-cxx/lp/nack.hpp"
29 
30 #include <boost/asio/io_service.hpp>
31 #include <boost/lexical_cast.hpp>
32 #include <boost/range/adaptor/map.hpp>
33 
34 #include <cmath>
35 
36 namespace ndn {
37 namespace util {
38 
39 constexpr double SegmentFetcher::MIN_SSTHRESH;
40 
41 void
43 {
44  if (maxTimeout < 1_ms) {
45  NDN_THROW(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
46  }
47 
48  if (initCwnd < 1.0) {
49  NDN_THROW(std::invalid_argument("initCwnd must be greater than or equal to 1"));
50  }
51 
52  if (aiStep < 0.0) {
53  NDN_THROW(std::invalid_argument("aiStep must be greater than or equal to 0"));
54  }
55 
56  if (mdCoef < 0.0 || mdCoef > 1.0) {
57  NDN_THROW(std::invalid_argument("mdCoef must be in range [0, 1]"));
58  }
59 }
60 
61 SegmentFetcher::SegmentFetcher(Face& face,
62  security::v2::Validator& validator,
63  const SegmentFetcher::Options& options)
64  : m_options(options)
65  , m_face(face)
66  , m_scheduler(m_face.getIoService())
67  , m_validator(validator)
68  , m_rttEstimator(make_shared<RttEstimator::Options>(options.rttOptions))
69  , m_timeLastSegmentReceived(time::steady_clock::now())
70  , m_cwnd(options.initCwnd)
71  , m_ssthresh(options.initSsthresh)
72 {
73  m_options.validate();
74 }
75 
76 shared_ptr<SegmentFetcher>
78  const Interest& baseInterest,
79  security::v2::Validator& validator,
80  const SegmentFetcher::Options& options)
81 {
82  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
83  fetcher->m_this = fetcher;
84  fetcher->fetchFirstSegment(baseInterest, false);
85  return fetcher;
86 }
87 
88 void
90 {
91  if (!m_this) {
92  return;
93  }
94 
95  m_pendingSegments.clear(); // cancels pending Interests and timeout events
96  m_face.getIoService().post([self = std::move(m_this)] {});
97 }
98 
99 bool
100 SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
101 {
102  auto self = weakSelf.lock();
103  return self == nullptr || self->m_this == nullptr;
104 }
105 
106 void
107 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
108 {
109  Interest interest(baseInterest);
110  interest.setCanBePrefix(true);
111  interest.setMustBeFresh(true);
112  interest.setInterestLifetime(m_options.interestLifetime);
113  if (isRetransmission) {
114  interest.refreshNonce();
115  }
116 
117  sendInterest(0, interest, isRetransmission);
118 }
119 
120 void
121 SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
122 {
123  if (checkAllSegmentsReceived()) {
124  // All segments have been retrieved
125  return finalizeFetch();
126  }
127 
128  int64_t availableWindowSize;
129  if (m_options.inOrder) {
130  availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
131  }
132  else {
133  availableWindowSize = static_cast<int64_t>(m_cwnd);
134  }
135  availableWindowSize -= m_nSegmentsInFlight;
136 
137  std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
138 
139  while (availableWindowSize > 0) {
140  if (!m_retxQueue.empty()) {
141  auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
142  m_retxQueue.pop();
143  if (pendingSegmentIt == m_pendingSegments.end()) {
144  // Skip re-requesting this segment, since it was received after RTO timeout
145  continue;
146  }
147  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
148  segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
149  }
150  else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
151  if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
152  // Don't request a segment a second time if received in response to first "discovery" Interest
153  m_nextSegmentNum++;
154  continue;
155  }
156  segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
157  }
158  else {
159  break;
160  }
161  availableWindowSize--;
162  }
163 
164  for (const auto& segment : segmentsToRequest) {
165  Interest interest(origInterest); // to preserve Interest elements
166  interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
167  interest.setCanBePrefix(false);
168  interest.setMustBeFresh(false);
169  interest.setInterestLifetime(m_options.interestLifetime);
170  interest.refreshNonce();
171  sendInterest(segment.first, interest, segment.second);
172  }
173 }
174 
175 void
176 SegmentFetcher::sendInterest(uint64_t segNum, const Interest& interest, bool isRetransmission)
177 {
178  weak_ptr<SegmentFetcher> weakSelf = m_this;
179 
180  ++m_nSegmentsInFlight;
181  auto pendingInterest = m_face.expressInterest(interest,
182  [this, weakSelf] (const Interest& interest, const Data& data) {
183  afterSegmentReceivedCb(interest, data, weakSelf);
184  },
185  [this, weakSelf] (const Interest& interest, const lp::Nack& nack) {
186  afterNackReceivedCb(interest, nack, weakSelf);
187  },
188  nullptr);
189 
190  auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
191  auto timeoutEvent = m_scheduler.schedule(timeout, [this, interest, weakSelf] {
192  afterTimeoutCb(interest, weakSelf);
193  });
194 
195  if (isRetransmission) {
196  updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
197  return;
198  }
199 
200  PendingSegment pendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
201  pendingInterest, timeoutEvent};
202  bool isNew = m_pendingSegments.emplace(segNum, std::move(pendingSegment)).second;
203  BOOST_VERIFY(isNew);
204  m_highInterest = segNum;
205 }
206 
207 void
208 SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
209  const weak_ptr<SegmentFetcher>& weakSelf)
210 {
211  if (shouldStop(weakSelf))
212  return;
213 
214  BOOST_ASSERT(m_nSegmentsInFlight > 0);
215  m_nSegmentsInFlight--;
216 
217  name::Component currentSegmentComponent = data.getName().get(-1);
218  if (!currentSegmentComponent.isSegment()) {
219  return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
220  }
221 
222  uint64_t currentSegment = currentSegmentComponent.toSegment();
223 
224  // The first received Interest could have any segment ID
225  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
226  if (m_receivedSegments.size() > 0) {
227  pendingSegmentIt = m_pendingSegments.find(currentSegment);
228  }
229  else {
230  pendingSegmentIt = m_pendingSegments.begin();
231  }
232 
233  if (pendingSegmentIt == m_pendingSegments.end()) {
234  return;
235  }
236 
237  pendingSegmentIt->second.timeoutEvent.cancel();
238 
239  afterSegmentReceived(data);
240 
241  m_validator.validate(data,
242  bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
243  pendingSegmentIt, weakSelf),
244  bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, weakSelf));
245 }
246 
247 void
248 SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
249  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
250  const weak_ptr<SegmentFetcher>& weakSelf)
251 {
252  if (shouldStop(weakSelf))
253  return;
254 
255  // We update the last receive time here instead of in the segment received callback so that the
256  // transfer will not fail to terminate if we only received invalid Data packets.
257  m_timeLastSegmentReceived = time::steady_clock::now();
258 
259  m_nReceived++;
260 
261  // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
262  uint64_t currentSegment = data.getName().get(-1).toSegment();
263  m_receivedSegments.insert(currentSegment);
264 
265  // Add measurement to RTO estimator (if not retransmission)
266  if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
267  BOOST_ASSERT(m_nSegmentsInFlight >= 0);
268  m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
269  static_cast<size_t>(m_nSegmentsInFlight) + 1);
270  }
271 
272  // Remove from pending segments map
273  m_pendingSegments.erase(pendingSegmentIt);
274 
275  // Copy data in segment to temporary buffer
276  auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
277  std::forward_as_tuple(currentSegment),
278  std::forward_as_tuple(data.getContent().value_size()));
279  std::copy(data.getContent().value_begin(), data.getContent().value_end(),
280  receivedSegmentIt.first->second.begin());
281  m_nBytesReceived += data.getContent().value_size();
282  afterSegmentValidated(data);
283 
284  if (data.getFinalBlock()) {
285  if (!data.getFinalBlock()->isSegment()) {
286  return signalError(FINALBLOCKID_NOT_SEGMENT,
287  "Received FinalBlockId did not contain a segment component");
288  }
289 
290  if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
291  m_nSegments = data.getFinalBlock()->toSegment() + 1;
292  cancelExcessInFlightSegments();
293  }
294  }
295 
296  if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
297  do {
298  onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
299  m_segmentBuffer.erase(m_nextSegmentInOrder++);
300  } while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
301  }
302 
303  if (m_receivedSegments.size() == 1) {
304  m_versionedDataName = data.getName().getPrefix(-1);
305  if (currentSegment == 0) {
306  // We received the first segment in response, so we can increment the next segment number
307  m_nextSegmentNum++;
308  }
309  }
310 
311  if (m_highData < currentSegment) {
312  m_highData = currentSegment;
313  }
314 
315  if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
316  windowDecrease();
317  }
318  else {
319  windowIncrease();
320  }
321 
322  fetchSegmentsInWindow(origInterest);
323 }
324 
325 void
326 SegmentFetcher::afterValidationFailure(const Data& data,
327  const security::v2::ValidationError& error,
328  const weak_ptr<SegmentFetcher>& weakSelf)
329 {
330  if (shouldStop(weakSelf))
331  return;
332 
333  signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
334 }
335 
336 void
337 SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
338  const weak_ptr<SegmentFetcher>& weakSelf)
339 {
340  if (shouldStop(weakSelf))
341  return;
342 
344 
345  BOOST_ASSERT(m_nSegmentsInFlight > 0);
346  m_nSegmentsInFlight--;
347 
348  switch (nack.getReason()) {
351  afterNackOrTimeout(origInterest);
352  break;
353  default:
354  signalError(NACK_ERROR, "Nack Error");
355  break;
356  }
357 }
358 
359 void
360 SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
361  const weak_ptr<SegmentFetcher>& weakSelf)
362 {
363  if (shouldStop(weakSelf))
364  return;
365 
367 
368  BOOST_ASSERT(m_nSegmentsInFlight > 0);
369  m_nSegmentsInFlight--;
370  afterNackOrTimeout(origInterest);
371 }
372 
373 void
374 SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
375 {
376  if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
377  // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
378  return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
379  }
380 
381  name::Component lastNameComponent = origInterest.getName().get(-1);
382  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
383  BOOST_ASSERT(m_pendingSegments.size() > 0);
384  if (lastNameComponent.isSegment()) {
385  BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
386  pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
387  }
388  else { // First Interest
389  BOOST_ASSERT(m_pendingSegments.size() > 0);
390  pendingSegmentIt = m_pendingSegments.begin();
391  }
392 
393  // Cancel timeout event and set status to InRetxQueue
394  pendingSegmentIt->second.timeoutEvent.cancel();
395  pendingSegmentIt->second.state = SegmentState::InRetxQueue;
396 
397  m_rttEstimator.backoffRto();
398 
399  if (m_receivedSegments.size() == 0) {
400  // Resend first Interest (until maximum receive timeout exceeded)
401  fetchFirstSegment(origInterest, true);
402  }
403  else {
404  windowDecrease();
405  m_retxQueue.push(pendingSegmentIt->first);
406  fetchSegmentsInWindow(origInterest);
407  }
408 }
409 
410 void
411 SegmentFetcher::finalizeFetch()
412 {
413  if (m_options.inOrder) {
415  }
416  else {
417  // Combine segments into final buffer
418  OBufferStream buf;
419  // We may have received more segments than exist in the object.
420  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
421 
422  for (int64_t i = 0; i < m_nSegments; i++) {
423  buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
424  }
425  onComplete(buf.buf());
426  }
427  stop();
428 }
429 
430 void
431 SegmentFetcher::windowIncrease()
432 {
433  if (m_options.useConstantCwnd) {
434  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
435  return;
436  }
437 
438  if (m_cwnd < m_ssthresh) {
439  m_cwnd += m_options.aiStep; // additive increase
440  }
441  else {
442  m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
443  }
444 }
445 
446 void
447 SegmentFetcher::windowDecrease()
448 {
449  if (m_options.disableCwa || m_highData > m_recPoint) {
450  m_recPoint = m_highInterest;
451 
452  if (m_options.useConstantCwnd) {
453  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
454  return;
455  }
456 
457  // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
458  m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
459  m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
460  }
461 }
462 
463 void
464 SegmentFetcher::signalError(uint32_t code, const std::string& msg)
465 {
466  onError(code, msg);
467  stop();
468 }
469 
470 void
471 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
472  const PendingInterestHandle& pendingInterest,
473  scheduler::EventId timeoutEvent)
474 {
475  auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
476  BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
477  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
478  pendingSegmentIt->second.state = SegmentState::Retransmitted;
479  pendingSegmentIt->second.hdl = pendingInterest; // cancels previous pending Interest via scoped handle
480  pendingSegmentIt->second.timeoutEvent = timeoutEvent;
481 }
482 
483 void
484 SegmentFetcher::cancelExcessInFlightSegments()
485 {
486  for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
487  if (it->first >= static_cast<uint64_t>(m_nSegments)) {
488  it = m_pendingSegments.erase(it); // cancels pending Interest and timeout event
489  BOOST_ASSERT(m_nSegmentsInFlight > 0);
490  m_nSegmentsInFlight--;
491  }
492  else {
493  ++it;
494  }
495  }
496 }
497 
498 bool
499 SegmentFetcher::checkAllSegmentsReceived()
500 {
501  bool haveReceivedAllSegments = false;
502 
503  if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
504  haveReceivedAllSegments = true;
505  // Verify that all segments in window have been received. If not, send Interests for missing segments.
506  for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
507  if (m_receivedSegments.count(i) == 0) {
508  m_retxQueue.push(i);
509  haveReceivedAllSegments = false;
510  }
511  }
512  }
513 
514  return haveReceivedAllSegments;
515 }
516 
517 time::milliseconds
518 SegmentFetcher::getEstimatedRto()
519 {
520  // We don't want an Interest timeout greater than the maximum allowed timeout between the
521  // succesful receipt of segments
522  return std::min(m_options.maxTimeout,
523  time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
524 }
525 
526 } // namespace util
527 } // namespace ndn
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Definition: name.hpp:212
Definition: data.cpp:26
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
Definition: interest.hpp:237
const Component & get(ssize_t i) const
Returns an immutable reference to the component at the specified index.
Definition: name.hpp:165
static time_point now() noexcept
Definition: time.cpp:80
An unrecoverable Nack was received during retrieval.
void refreshNonce()
Change nonce value.
Definition: interest.cpp:430
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
size_t value_size() const noexcept
Return the size of TLV-VALUE, aka TLV-LENGTH.
Definition: block.cpp:308
Represents an Interest packet.
Definition: interest.hpp:50
Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in &#39;in order&#39; mode.
void stop()
Stops fetching.
A handle for a scheduled event.
Definition: scheduler.hpp:60
represents a Network Nack
Definition: nack.hpp:38
#define NDN_THROW(e)
Definition: exception.hpp:61
NackReason getReason() const
Definition: nack.hpp:90
Buffer::const_iterator value_begin() const
Get begin iterator of TLV-VALUE.
Definition: block.hpp:283
double aiStep
additive increase step (in segments)
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
Definition: packet-base.cpp:28
One of the retrieved segments failed user-provided validation.
Handle for a pending Interest.
Definition: face.hpp:490
bool isSegment() const
Check if the component is a segment number per NDN naming conventions.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:90
Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
Buffer::const_iterator value_end() const
Get end iterator of TLV-VALUE.
Definition: block.hpp:292
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
const Name & getName() const noexcept
Get name.
Definition: data.hpp:126
double initSsthresh
initial slow start threshold
double initCwnd
initial congestion window size
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
Validation error code and optional detailed error message.
const Block & getContent() const noexcept
Get the Content element.
Definition: data.hpp:172
const Name & getName() const noexcept
Definition: interest.hpp:174
Interest & setInterestLifetime(time::milliseconds lifetime)
Set the Interest&#39;s lifetime.
Definition: interest.cpp:443
implements an output stream that constructs ndn::Buffer
Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
Represents a Data packet.
Definition: data.hpp:39
A received FinalBlockId did not contain a segment component.
const optional< name::Component > & getFinalBlock() const
Definition: data.hpp:295
Interface for validating data and interest packets.
Definition: validator.hpp:61
Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.
Definition: interest.hpp:217
Interest & setName(const Name &name)
Set the Interest&#39;s name.
Definition: interest.cpp:375