ndn-cxx: NDN C++ Library 0.9.0-33-g832ea91d
Loading...
Searching...
No Matches
segment-fetcher.cpp
Go to the documentation of this file.
1/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2013-2024 Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
6 *
7 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8 *
9 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10 * terms of the GNU Lesser General Public License as published by the Free Software
11 * Foundation, either version 3 of the License, or (at your option) any later version.
12 *
13 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16 *
17 * You should have received copies of the GNU General Public License and GNU Lesser
18 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19 * <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22 */
23
27#include "ndn-cxx/lp/nack.hpp"
29
30#include <boost/asio/io_context.hpp>
31#include <boost/asio/post.hpp>
32#include <boost/lexical_cast.hpp>
33#include <boost/range/adaptor/map.hpp>
34
35#include <algorithm>
36#include <cmath>
37
38namespace ndn {
39
40void
42{
43 if (maxTimeout < 1_ms) {
44 NDN_THROW(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
45 }
46
47 if (initCwnd < 1.0) {
48 NDN_THROW(std::invalid_argument("initCwnd must be greater than or equal to 1"));
49 }
50
51 if (aiStep < 0.0) {
52 NDN_THROW(std::invalid_argument("aiStep must be greater than or equal to 0"));
53 }
54
55 if (mdCoef < 0.0 || mdCoef > 1.0) {
56 NDN_THROW(std::invalid_argument("mdCoef must be in range [0, 1]"));
57 }
58}
59
60SegmentFetcher::SegmentFetcher(Face& face,
61 security::Validator& validator,
62 const SegmentFetcher::Options& options)
63 : m_options(options)
64 , m_face(face)
65 , m_scheduler(m_face.getIoContext())
66 , m_validator(validator)
67 , m_rttEstimator(make_shared<util::RttEstimator::Options>(options.rttOptions))
68 , m_timeLastSegmentReceived(time::steady_clock::now())
69 , m_cwnd(options.initCwnd)
70 , m_ssthresh(options.initSsthresh)
71{
72 m_options.validate();
73}
74
75shared_ptr<SegmentFetcher>
77 const Interest& baseInterest,
78 security::Validator& validator,
79 const SegmentFetcher::Options& options)
80{
81 shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
82 fetcher->m_this = fetcher;
83 fetcher->fetchFirstSegment(baseInterest, false);
84 return fetcher;
85}
86
87void
89{
90 if (!m_this) {
91 return;
92 }
93
94 m_pendingSegments.clear(); // cancels pending Interests and timeout events
95 boost::asio::post(m_face.getIoContext(), [self = std::move(m_this)] {});
96}
97
98bool
99SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
100{
101 auto self = weakSelf.lock();
102 return self == nullptr || self->m_this == nullptr;
103}
104
105void
106SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
107{
108 Interest interest(baseInterest);
109 interest.setCanBePrefix(true);
110 if (!interest.getName().empty() && interest.getName()[-1].isVersion()) {
111 interest.setMustBeFresh(false);
112 }
113 else {
114 interest.setMustBeFresh(m_options.probeLatestVersion);
115 }
116 interest.setInterestLifetime(m_options.interestLifetime);
117 if (isRetransmission) {
118 interest.refreshNonce();
119 }
120
121 sendInterest(0, interest, isRetransmission);
122}
123
124void
125SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
126{
127 if (checkAllSegmentsReceived()) {
128 // All segments have been retrieved
129 return finalizeFetch();
130 }
131
132 int64_t availableWindowSize;
133 if (m_options.inOrder) {
134 availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
135 }
136 else {
137 availableWindowSize = static_cast<int64_t>(m_cwnd);
138 }
139 availableWindowSize -= m_nSegmentsInFlight;
140
141 std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
142
143 while (availableWindowSize > 0) {
144 if (!m_retxQueue.empty()) {
145 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
146 m_retxQueue.pop();
147 if (pendingSegmentIt == m_pendingSegments.end()) {
148 // Skip re-requesting this segment, since it was received after RTO timeout
149 continue;
150 }
151 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
152 segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
153 }
154 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
155 if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
156 // Don't request a segment a second time if received in response to first "discovery" Interest
157 m_nextSegmentNum++;
158 continue;
159 }
160 segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
161 }
162 else {
163 break;
164 }
165 availableWindowSize--;
166 }
167
168 for (const auto& segment : segmentsToRequest) {
169 Interest interest(origInterest); // to preserve Interest elements
170 interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
171 interest.setCanBePrefix(false);
172 interest.setMustBeFresh(false);
173 interest.setInterestLifetime(m_options.interestLifetime);
174 interest.refreshNonce();
175 sendInterest(segment.first, interest, segment.second);
176 }
177}
178
179void
180SegmentFetcher::sendInterest(uint64_t segNum, const Interest& interest, bool isRetransmission)
181{
182 weak_ptr<SegmentFetcher> weakSelf = m_this;
183
184 ++m_nSegmentsInFlight;
185 auto pendingInterest = m_face.expressInterest(interest,
186 [this, weakSelf] (const Interest& interest, const Data& data) {
187 afterSegmentReceivedCb(interest, data, weakSelf);
188 },
189 [this, weakSelf] (const Interest& interest, const lp::Nack& nack) {
190 afterNackReceivedCb(interest, nack, weakSelf);
191 },
192 nullptr);
193
194 auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
195 auto timeoutEvent = m_scheduler.schedule(timeout, [this, interest, weakSelf] {
196 afterTimeoutCb(interest, weakSelf);
197 });
198
199 if (isRetransmission) {
200 updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
201 return;
202 }
203
204 PendingSegment pendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
205 pendingInterest, timeoutEvent};
206 bool isNew = m_pendingSegments.try_emplace(segNum, std::move(pendingSegment)).second;
207 BOOST_VERIFY(isNew);
208 m_highInterest = segNum;
209}
210
211void
212SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
213 const weak_ptr<SegmentFetcher>& weakSelf)
214{
215 if (shouldStop(weakSelf))
216 return;
217
218 BOOST_ASSERT(m_nSegmentsInFlight > 0);
219 m_nSegmentsInFlight--;
220
221 name::Component currentSegmentComponent = data.getName().get(-1);
222 if (!currentSegmentComponent.isSegment()) {
223 return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
224 }
225
226 uint64_t currentSegment = currentSegmentComponent.toSegment();
227
228 // The first received Interest could have any segment ID
229 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
230 if (m_receivedSegments.size() > 0) {
231 pendingSegmentIt = m_pendingSegments.find(currentSegment);
232 }
233 else {
234 pendingSegmentIt = m_pendingSegments.begin();
235 }
236
237 if (pendingSegmentIt == m_pendingSegments.end()) {
238 return;
239 }
240
241 pendingSegmentIt->second.timeoutEvent.cancel();
242
244
245 m_validator.validate(data,
246 [=] (const Data& d) { afterValidationSuccess(d, origInterest, pendingSegmentIt, weakSelf); },
247 [=] (const Data& d, const auto& error) { afterValidationFailure(d, error, weakSelf); });
248}
249
250void
251SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
252 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
253 const weak_ptr<SegmentFetcher>& weakSelf)
254{
255 if (shouldStop(weakSelf))
256 return;
257
258 // We update the last receive time here instead of in the segment received callback so that the
259 // transfer will not fail to terminate if we only received invalid Data packets.
260 m_timeLastSegmentReceived = time::steady_clock::now();
261
262 m_nReceived++;
263
264 // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
265 uint64_t currentSegment = data.getName().get(-1).toSegment();
266 m_receivedSegments.insert(currentSegment);
267
268 // Add measurement to RTO estimator (if not retransmission)
269 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
270 BOOST_ASSERT(m_nSegmentsInFlight >= 0);
271 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
272 static_cast<size_t>(m_nSegmentsInFlight) + 1);
273 }
274
275 // Remove from pending segments map
276 m_pendingSegments.erase(pendingSegmentIt);
277
278 // Copy data in segment to temporary buffer
279 auto receivedSegmentIt = m_segmentBuffer.try_emplace(currentSegment, data.getContent().value_size())
280 .first;
281 std::copy(data.getContent().value_begin(), data.getContent().value_end(),
282 receivedSegmentIt->second.begin());
283 m_nBytesReceived += data.getContent().value_size();
285
286 if (data.getFinalBlock()) {
287 if (!data.getFinalBlock()->isSegment()) {
288 return signalError(FINALBLOCKID_NOT_SEGMENT,
289 "Received FinalBlockId did not contain a segment component");
290 }
291
292 if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
293 m_nSegments = data.getFinalBlock()->toSegment() + 1;
294 cancelExcessInFlightSegments();
295 }
296 }
297
298 if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
299 do {
300 onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
301 m_segmentBuffer.erase(m_nextSegmentInOrder++);
302 } while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
303 }
304
305 if (m_receivedSegments.size() == 1) {
306 m_versionedDataName = data.getName().getPrefix(-1);
307 if (currentSegment == 0) {
308 // We received the first segment in response, so we can increment the next segment number
309 m_nextSegmentNum++;
310 }
311 }
312
313 if (m_highData < currentSegment) {
314 m_highData = currentSegment;
315 }
316
317 if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
318 windowDecrease();
319 }
320 else {
321 windowIncrease();
322 }
323
324 fetchSegmentsInWindow(origInterest);
325}
326
327void
328SegmentFetcher::afterValidationFailure(const Data&,
329 const security::ValidationError& error,
330 const weak_ptr<SegmentFetcher>& weakSelf)
331{
332 if (shouldStop(weakSelf))
333 return;
334
335 signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
336}
337
338void
339SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
340 const weak_ptr<SegmentFetcher>& weakSelf)
341{
342 if (shouldStop(weakSelf))
343 return;
344
346
347 BOOST_ASSERT(m_nSegmentsInFlight > 0);
348 m_nSegmentsInFlight--;
349
350 switch (nack.getReason()) {
353 afterNackOrTimeout(origInterest);
354 break;
355 default:
356 signalError(NACK_ERROR, "Nack Error");
357 break;
358 }
359}
360
361void
362SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
363 const weak_ptr<SegmentFetcher>& weakSelf)
364{
365 if (shouldStop(weakSelf))
366 return;
367
369
370 BOOST_ASSERT(m_nSegmentsInFlight > 0);
371 m_nSegmentsInFlight--;
372 afterNackOrTimeout(origInterest);
373}
374
375void
376SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
377{
378 if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
379 // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
380 return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
381 }
382
383 BOOST_ASSERT(!m_pendingSegments.empty());
384
385 const auto& origName = origInterest.getName();
386 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
387 if (!origName.empty() && origName[-1].isSegment()) {
388 pendingSegmentIt = m_pendingSegments.find(origName[-1].toSegment());
389 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
390 }
391 else { // First Interest
392 pendingSegmentIt = m_pendingSegments.begin();
393 }
394
395 // Cancel timeout event and set status to InRetxQueue
396 pendingSegmentIt->second.timeoutEvent.cancel();
397 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
398
399 m_rttEstimator.backoffRto();
400
401 if (m_receivedSegments.empty()) {
402 // Resend first Interest (until maximum receive timeout exceeded)
403 fetchFirstSegment(origInterest, true);
404 }
405 else {
406 windowDecrease();
407 m_retxQueue.push(pendingSegmentIt->first);
408 fetchSegmentsInWindow(origInterest);
409 }
410}
411
412void
413SegmentFetcher::finalizeFetch()
414{
415 if (m_options.inOrder) {
417 }
418 else {
419 // Combine segments into final buffer
420 OBufferStream buf;
421 // We may have received more segments than exist in the object.
422 BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
423
424 for (int64_t i = 0; i < m_nSegments; i++) {
425 buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
426 }
427 onComplete(buf.buf());
428 }
429 stop();
430}
431
432void
433SegmentFetcher::windowIncrease()
434{
435 if (m_options.useConstantCwnd) {
436 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
437 return;
438 }
439
440 if (m_cwnd < m_ssthresh) {
441 m_cwnd += m_options.aiStep; // additive increase
442 }
443 else {
444 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
445 }
446}
447
448void
449SegmentFetcher::windowDecrease()
450{
451 if (m_options.disableCwa || m_highData > m_recPoint) {
452 m_recPoint = m_highInterest;
453
454 if (m_options.useConstantCwnd) {
455 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
456 return;
457 }
458
459 // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
460 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
461 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
462 }
463}
464
465void
466SegmentFetcher::signalError(uint32_t code, const std::string& msg)
467{
468 onError(code, msg);
469 stop();
470}
471
472void
473SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
474 const PendingInterestHandle& pendingInterest,
475 scheduler::EventId timeoutEvent)
476{
477 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
478 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
479 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
480 pendingSegmentIt->second.state = SegmentState::Retransmitted;
481 pendingSegmentIt->second.hdl = pendingInterest; // cancels previous pending Interest via scoped handle
482 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
483}
484
485void
486SegmentFetcher::cancelExcessInFlightSegments()
487{
488 for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
489 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
490 it = m_pendingSegments.erase(it); // cancels pending Interest and timeout event
491 BOOST_ASSERT(m_nSegmentsInFlight > 0);
492 m_nSegmentsInFlight--;
493 }
494 else {
495 ++it;
496 }
497 }
498}
499
500bool
501SegmentFetcher::checkAllSegmentsReceived()
502{
503 bool haveReceivedAllSegments = false;
504
505 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
506 haveReceivedAllSegments = true;
507 // Verify that all segments in window have been received. If not, send Interests for missing segments.
508 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
509 if (m_receivedSegments.count(i) == 0) {
510 m_retxQueue.push(i);
511 haveReceivedAllSegments = false;
512 }
513 }
514 }
515
516 return haveReceivedAllSegments;
517}
518
520SegmentFetcher::getEstimatedRto()
521{
522 // We don't want an Interest timeout greater than the maximum allowed timeout between the
523 // succesful receipt of segments
524 return std::min(m_options.maxTimeout,
525 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
526}
527
528} // namespace ndn
Provide a communication channel with local or remote NDN forwarder.
Definition face.hpp:91
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express an Interest.
Definition face.cpp:151
boost::asio::io_context & getIoContext() const noexcept
Returns a reference to the io_context used by this face.
Definition face.hpp:422
Represents an Interest packet.
Definition interest.hpp:50
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Definition name.hpp:241
Utility class to fetch a versioned and segmented object.
signal::Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
signal::Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::Validator &validator, const Options &options={})
Initiates segment fetching.
void stop()
Stops fetching.
signal::Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
signal::Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
signal::Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in 'in order' mode.
signal::Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
@ INTEREST_TIMEOUT
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
@ FINALBLOCKID_NOT_SEGMENT
A received FinalBlockId did not contain a segment component.
@ NACK_ERROR
An unrecoverable Nack was received during retrieval.
@ SEGMENT_VALIDATION_FAIL
One of the retrieved segments failed user-provided validation.
@ DATA_HAS_NO_SEGMENT
One of the retrieved Data packets lacked a segment number in the last Name component (excl....
signal::Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
signal::Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Definition scheduler.cpp:78
Interface for validating data and interest packets.
Definition validator.hpp:61
void validate(const Data &data, const DataValidationSuccessCallback &successCb, const DataValidationFailureCallback &failureCb)
Asynchronously validate data.
Definition validator.cpp:47
static time_point now() noexcept
Definition time.cpp:79
void backoffRto()
Backoff RTO by a factor of Options::rtoBackoffMultiplier.
void addMeasurement(time::nanoseconds rtt, size_t nExpectedSamples=1)
Records a new RTT measurement.
time::nanoseconds getEstimatedRto() const
Returns the estimated RTO value.
#define NDN_THROW(e)
Definition exception.hpp:56
::boost::chrono::milliseconds milliseconds
Definition time.hpp:52
@ Name
Definition tlv.hpp:71
@ Data
Definition tlv.hpp:69
@ Interest
Definition tlv.hpp:68
Definition data.cpp:25
Options for SegmentFetcher.
size_t flowControlWindow
Maximum number of segments stored in the reorder buffer.
double initCwnd
Initial congestion window size.
bool resetCwndToInit
Reduce cwnd to initCwnd when a loss event occurs.
bool probeLatestVersion
Use the first Interest to probe the latest version of the object.
double mdCoef
Multiplicative decrease coefficient.
time::milliseconds maxTimeout
Maximum allowed time between successful receipt of segments.
bool disableCwa
Disable Conservative Window Adaptation.
bool inOrder
Set to true for 'in order' mode, false for 'block' mode.
bool useConstantInterestTimeout
If true, Interest timeout is kept fixed at maxTimeout.
time::milliseconds interestLifetime
Lifetime of sent Interests (independent of Interest timeout)
double aiStep
Additive increase step (in segments)
bool useConstantCwnd
If true, window size is kept fixed at initCwnd.
bool ignoreCongMarks
Disable window decrease after a congestion mark is received.