segment-fetcher.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2019 Regents of the University of California,
4  * Colorado State University,
5  * University Pierre & Marie Curie, Sorbonne University.
6  *
7  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8  *
9  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10  * terms of the GNU Lesser General Public License as published by the Free Software
11  * Foundation, either version 3 of the License, or (at your option) any later version.
12  *
13  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16  *
17  * You should have received copies of the GNU General Public License and GNU Lesser
18  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19  * <http://www.gnu.org/licenses/>.
20  *
21  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22  *
23  * @author Shuo Yang
24  * @author Weiwei Liu
25  * @author Chavoosh Ghasemi
26  */
27 
31 #include "ndn-cxx/lp/nack.hpp"
33 
34 #include <boost/asio/io_service.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/range/adaptor/map.hpp>
37 
38 #include <cmath>
39 
40 namespace ndn {
41 namespace util {
42 
43 constexpr double SegmentFetcher::MIN_SSTHRESH;
44 
45 void
47 {
48  if (maxTimeout < 1_ms) {
49  NDN_THROW(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
50  }
51 
52  if (initCwnd < 1.0) {
53  NDN_THROW(std::invalid_argument("initCwnd must be greater than or equal to 1"));
54  }
55 
56  if (aiStep < 0.0) {
57  NDN_THROW(std::invalid_argument("aiStep must be greater than or equal to 0"));
58  }
59 
60  if (mdCoef < 0.0 || mdCoef > 1.0) {
61  NDN_THROW(std::invalid_argument("mdCoef must be in range [0, 1]"));
62  }
63 }
64 
65 SegmentFetcher::SegmentFetcher(Face& face,
66  security::v2::Validator& validator,
67  const SegmentFetcher::Options& options)
68  : m_options(options)
69  , m_face(face)
70  , m_scheduler(m_face.getIoService())
71  , m_validator(validator)
72  , m_rttEstimator(make_shared<RttEstimator::Options>(options.rttOptions))
73  , m_timeLastSegmentReceived(time::steady_clock::now())
74  , m_nextSegmentNum(0)
75  , m_cwnd(options.initCwnd)
76  , m_ssthresh(options.initSsthresh)
77  , m_nSegmentsInFlight(0)
78  , m_nSegments(0)
79  , m_highInterest(0)
80  , m_highData(0)
81  , m_recPoint(0)
82  , m_nReceived(0)
83  , m_nBytesReceived(0)
84 {
85  m_options.validate();
86 }
87 
88 shared_ptr<SegmentFetcher>
90  const Interest& baseInterest,
91  security::v2::Validator& validator,
92  const SegmentFetcher::Options& options)
93 {
94  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
95  fetcher->m_this = fetcher;
96  fetcher->fetchFirstSegment(baseInterest, false);
97  return fetcher;
98 }
99 
100 void
102 {
103  if (!m_this) {
104  return;
105  }
106 
107  m_pendingSegments.clear(); // cancels pending Interests and timeout events
108  m_face.getIoService().post([self = std::move(m_this)] {});
109 }
110 
111 bool
112 SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
113 {
114  auto self = weakSelf.lock();
115  return self == nullptr || self->m_this == nullptr;
116 }
117 
118 void
119 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
120 {
121  Interest interest(baseInterest);
122  interest.setCanBePrefix(true);
123  interest.setMustBeFresh(true);
124  interest.setInterestLifetime(m_options.interestLifetime);
125  if (isRetransmission) {
126  interest.refreshNonce();
127  }
128 
129  sendInterest(0, interest, isRetransmission);
130 }
131 
132 void
133 SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
134 {
135  if (checkAllSegmentsReceived()) {
136  // All segments have been retrieved
137  return finalizeFetch();
138  }
139 
140  int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
141  std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
142 
143  while (availableWindowSize > 0) {
144  if (!m_retxQueue.empty()) {
145  auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
146  m_retxQueue.pop();
147  if (pendingSegmentIt == m_pendingSegments.end()) {
148  // Skip re-requesting this segment, since it was received after RTO timeout
149  continue;
150  }
151  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
152  segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
153  }
154  else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
155  if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
156  // Don't request a segment a second time if received in response to first "discovery" Interest
157  m_nextSegmentNum++;
158  continue;
159  }
160  segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
161  }
162  else {
163  break;
164  }
165  availableWindowSize--;
166  }
167 
168  for (const auto& segment : segmentsToRequest) {
169  Interest interest(origInterest); // to preserve Interest elements
170  interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
171  interest.setCanBePrefix(false);
172  interest.setMustBeFresh(false);
173  interest.setInterestLifetime(m_options.interestLifetime);
174  interest.refreshNonce();
175  sendInterest(segment.first, interest, segment.second);
176  }
177 }
178 
179 void
180 SegmentFetcher::sendInterest(uint64_t segNum, const Interest& interest, bool isRetransmission)
181 {
182  weak_ptr<SegmentFetcher> weakSelf = m_this;
183 
184  ++m_nSegmentsInFlight;
185  auto pendingInterest = m_face.expressInterest(interest,
186  [this, weakSelf] (const Interest& interest, const Data& data) {
187  afterSegmentReceivedCb(interest, data, weakSelf);
188  },
189  [this, weakSelf] (const Interest& interest, const lp::Nack& nack) {
190  afterNackReceivedCb(interest, nack, weakSelf);
191  },
192  nullptr);
193 
194  auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
195  auto timeoutEvent = m_scheduler.schedule(timeout, [this, interest, weakSelf] {
196  afterTimeoutCb(interest, weakSelf);
197  });
198 
199  if (isRetransmission) {
200  updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
201  return;
202  }
203 
204  PendingSegment pendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
205  pendingInterest, timeoutEvent};
206  bool isNew = m_pendingSegments.emplace(segNum, std::move(pendingSegment)).second;
207  BOOST_VERIFY(isNew);
208  m_highInterest = segNum;
209 }
210 
211 void
212 SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
213  const weak_ptr<SegmentFetcher>& weakSelf)
214 {
215  if (shouldStop(weakSelf))
216  return;
217 
218  BOOST_ASSERT(m_nSegmentsInFlight > 0);
219  m_nSegmentsInFlight--;
220 
221  name::Component currentSegmentComponent = data.getName().get(-1);
222  if (!currentSegmentComponent.isSegment()) {
223  return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
224  }
225 
226  uint64_t currentSegment = currentSegmentComponent.toSegment();
227 
228  // The first received Interest could have any segment ID
229  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
230  if (m_receivedSegments.size() > 0) {
231  pendingSegmentIt = m_pendingSegments.find(currentSegment);
232  }
233  else {
234  pendingSegmentIt = m_pendingSegments.begin();
235  }
236 
237  if (pendingSegmentIt == m_pendingSegments.end()) {
238  return;
239  }
240 
241  pendingSegmentIt->second.timeoutEvent.cancel();
242 
243  afterSegmentReceived(data);
244 
245  m_validator.validate(data,
246  bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
247  pendingSegmentIt, weakSelf),
248  bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, weakSelf));
249 }
250 
251 void
252 SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
253  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
254  const weak_ptr<SegmentFetcher>& weakSelf)
255 {
256  if (shouldStop(weakSelf))
257  return;
258 
259  // We update the last receive time here instead of in the segment received callback so that the
260  // transfer will not fail to terminate if we only received invalid Data packets.
261  m_timeLastSegmentReceived = time::steady_clock::now();
262 
263  m_nReceived++;
264 
265  // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
266  uint64_t currentSegment = data.getName().get(-1).toSegment();
267  // Add measurement to RTO estimator (if not retransmission)
268  if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
269  BOOST_ASSERT(m_nSegmentsInFlight >= 0);
270  m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
271  static_cast<size_t>(m_nSegmentsInFlight) + 1);
272  }
273 
274  // Remove from pending segments map
275  m_pendingSegments.erase(pendingSegmentIt);
276 
277  // Copy data in segment to temporary buffer
278  auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
279  std::forward_as_tuple(currentSegment),
280  std::forward_as_tuple(data.getContent().value_size()));
281  std::copy(data.getContent().value_begin(), data.getContent().value_end(),
282  receivedSegmentIt.first->second.begin());
283  m_nBytesReceived += data.getContent().value_size();
284  afterSegmentValidated(data);
285 
286  if (data.getFinalBlock()) {
287  if (!data.getFinalBlock()->isSegment()) {
288  return signalError(FINALBLOCKID_NOT_SEGMENT,
289  "Received FinalBlockId did not contain a segment component");
290  }
291 
292  if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
293  m_nSegments = data.getFinalBlock()->toSegment() + 1;
294  cancelExcessInFlightSegments();
295  }
296  }
297 
298  if (m_receivedSegments.size() == 1) {
299  m_versionedDataName = data.getName().getPrefix(-1);
300  if (currentSegment == 0) {
301  // We received the first segment in response, so we can increment the next segment number
302  m_nextSegmentNum++;
303  }
304  }
305 
306  if (m_highData < currentSegment) {
307  m_highData = currentSegment;
308  }
309 
310  if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
311  windowDecrease();
312  }
313  else {
314  windowIncrease();
315  }
316 
317  fetchSegmentsInWindow(origInterest);
318 }
319 
320 void
321 SegmentFetcher::afterValidationFailure(const Data& data,
322  const security::v2::ValidationError& error,
323  const weak_ptr<SegmentFetcher>& weakSelf)
324 {
325  if (shouldStop(weakSelf))
326  return;
327 
328  signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
329 }
330 
331 void
332 SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
333  const weak_ptr<SegmentFetcher>& weakSelf)
334 {
335  if (shouldStop(weakSelf))
336  return;
337 
339 
340  BOOST_ASSERT(m_nSegmentsInFlight > 0);
341  m_nSegmentsInFlight--;
342 
343  switch (nack.getReason()) {
346  afterNackOrTimeout(origInterest);
347  break;
348  default:
349  signalError(NACK_ERROR, "Nack Error");
350  break;
351  }
352 }
353 
354 void
355 SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
356  const weak_ptr<SegmentFetcher>& weakSelf)
357 {
358  if (shouldStop(weakSelf))
359  return;
360 
362 
363  BOOST_ASSERT(m_nSegmentsInFlight > 0);
364  m_nSegmentsInFlight--;
365  afterNackOrTimeout(origInterest);
366 }
367 
368 void
369 SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
370 {
371  if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
372  // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
373  return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
374  }
375 
376  name::Component lastNameComponent = origInterest.getName().get(-1);
377  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
378  BOOST_ASSERT(m_pendingSegments.size() > 0);
379  if (lastNameComponent.isSegment()) {
380  BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
381  pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
382  }
383  else { // First Interest
384  BOOST_ASSERT(m_pendingSegments.size() > 0);
385  pendingSegmentIt = m_pendingSegments.begin();
386  }
387 
388  // Cancel timeout event and set status to InRetxQueue
389  pendingSegmentIt->second.timeoutEvent.cancel();
390  pendingSegmentIt->second.state = SegmentState::InRetxQueue;
391 
392  m_rttEstimator.backoffRto();
393 
394  if (m_receivedSegments.size() == 0) {
395  // Resend first Interest (until maximum receive timeout exceeded)
396  fetchFirstSegment(origInterest, true);
397  }
398  else {
399  windowDecrease();
400  m_retxQueue.push(pendingSegmentIt->first);
401  fetchSegmentsInWindow(origInterest);
402  }
403 }
404 
405 void
406 SegmentFetcher::finalizeFetch()
407 {
408  // Combine segments into final buffer
410  // We may have received more segments than exist in the object.
411  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
412 
413  for (int64_t i = 0; i < m_nSegments; i++) {
414  buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
415  }
416 
417  onComplete(buf.buf());
418  stop();
419 }
420 
421 void
422 SegmentFetcher::windowIncrease()
423 {
424  if (m_options.useConstantCwnd) {
425  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
426  return;
427  }
428 
429  if (m_cwnd < m_ssthresh) {
430  m_cwnd += m_options.aiStep; // additive increase
431  }
432  else {
433  m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
434  }
435 }
436 
437 void
438 SegmentFetcher::windowDecrease()
439 {
440  if (m_options.disableCwa || m_highData > m_recPoint) {
441  m_recPoint = m_highInterest;
442 
443  if (m_options.useConstantCwnd) {
444  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
445  return;
446  }
447 
448  // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
449  m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
450  m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
451  }
452 }
453 
454 void
455 SegmentFetcher::signalError(uint32_t code, const std::string& msg)
456 {
457  onError(code, msg);
458  stop();
459 }
460 
461 void
462 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
463  const PendingInterestHandle& pendingInterest,
464  scheduler::EventId timeoutEvent)
465 {
466  auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
467  BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
468  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
469  pendingSegmentIt->second.state = SegmentState::Retransmitted;
470  pendingSegmentIt->second.hdl = pendingInterest; // cancels previous pending Interest via scoped handle
471  pendingSegmentIt->second.timeoutEvent = timeoutEvent;
472 }
473 
474 void
475 SegmentFetcher::cancelExcessInFlightSegments()
476 {
477  for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
478  if (it->first >= static_cast<uint64_t>(m_nSegments)) {
479  it = m_pendingSegments.erase(it); // cancels pending Interest and timeout event
480  BOOST_ASSERT(m_nSegmentsInFlight > 0);
481  m_nSegmentsInFlight--;
482  }
483  else {
484  ++it;
485  }
486  }
487 }
488 
489 bool
490 SegmentFetcher::checkAllSegmentsReceived()
491 {
492  bool haveReceivedAllSegments = false;
493 
494  if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
495  haveReceivedAllSegments = true;
496  // Verify that all segments in window have been received. If not, send Interests for missing segments.
497  for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
498  if (m_receivedSegments.count(i) == 0) {
499  m_retxQueue.push(i);
500  haveReceivedAllSegments = false;
501  }
502  }
503  }
504 
505  return haveReceivedAllSegments;
506 }
507 
508 time::milliseconds
509 SegmentFetcher::getEstimatedRto()
510 {
511  // We don't want an Interest timeout greater than the maximum allowed timeout between the
512  // succesful receipt of segments
513  return std::min(m_options.maxTimeout,
514  time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
515 }
516 
517 } // namespace util
518 } // namespace ndn
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Definition: name.hpp:211
Definition: data.cpp:26
const Name & getName() const
Get name.
Definition: data.hpp:124
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
Definition: interest.hpp:184
const Block & getContent() const
Get Content.
Definition: data.cpp:232
const Component & get(ssize_t i) const
Returns an immutable reference to the component at the specified index.
Definition: name.hpp:164
static time_point now() noexcept
Definition: time.cpp:80
An unrecoverable Nack was received during retrieval.
void refreshNonce()
Change nonce value.
Definition: interest.cpp:420
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emits upon successful retrieval of the complete data.
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
size_t value_size() const noexcept
Return the size of TLV-VALUE, aka TLV-LENGTH.
Definition: block.cpp:308
Represents an Interest packet.
Definition: interest.hpp:43
void stop()
Stops fetching.
A handle for a scheduled event.
Definition: scheduler.hpp:60
represents a Network Nack
Definition: nack.hpp:38
#define NDN_THROW(e)
Definition: exception.hpp:61
NackReason getReason() const
Definition: nack.hpp:90
Buffer::const_iterator value_begin() const
Get begin iterator of TLV-VALUE.
Definition: block.hpp:296
double aiStep
additive increase step (in segments)
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
Definition: packet-base.cpp:28
One of the retrieved segments failed user-provided validation.
A handle of pending Interest.
Definition: face.hpp:545
bool isSegment() const
Check if the component is a segment number per NDN naming conventions.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emits whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:89
Signal< SegmentFetcher, Data > afterSegmentValidated
Emits whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
Buffer::const_iterator value_end() const
Get end iterator of TLV-VALUE.
Definition: block.hpp:305
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
double initSsthresh
initial slow start threshold
double initCwnd
initial congestion window size
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emits whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emits whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
Validation error code and optional detailed error message.
const Name & getName() const noexcept
Definition: interest.hpp:121
Interest & setInterestLifetime(time::milliseconds lifetime)
Set the Interest&#39;s lifetime.
Definition: interest.cpp:433
const uint8_t * buf
implements an output stream that constructs ndn::Buffer
Signal< SegmentFetcher, uint32_t, std::string > onError
Emits when the retrieval could not be completed due to an error.
Represents a Data packet.
Definition: data.hpp:35
A received FinalBlockId did not contain a segment component.
const optional< name::Component > & getFinalBlock() const
Definition: data.hpp:222
Interface for validating data and interest packets.
Definition: validator.hpp:61
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.
Definition: interest.hpp:164
Interest & setName(const Name &name)
Set the Interest&#39;s name.
Definition: interest.cpp:375