30 #include <boost/asio/io_context.hpp>
31 #include <boost/asio/post.hpp>
32 #include <boost/lexical_cast.hpp>
33 #include <boost/range/adaptor/map.hpp>
44 NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
48 NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
52 NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
55 if (mdCoef < 0.0 || mdCoef > 1.0) {
56 NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
60 SegmentFetcher::SegmentFetcher(
Face& face,
65 , m_scheduler(m_face.getIoContext())
66 , m_validator(validator)
67 , m_rttEstimator(make_shared<util::RttEstimator::Options>(options.rttOptions))
68 , m_timeLastSegmentReceived(time::steady_clock::now())
69 , m_cwnd(options.initCwnd)
70 , m_ssthresh(options.initSsthresh)
75 shared_ptr<SegmentFetcher>
81 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
82 fetcher->m_this = fetcher;
83 fetcher->fetchFirstSegment(baseInterest,
false);
94 m_pendingSegments.clear();
95 boost::asio::post(m_face.
getIoContext(), [
self = std::move(m_this)] {});
99 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
101 auto self = weakSelf.lock();
102 return self ==
nullptr ||
self->m_this ==
nullptr;
106 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
109 interest.setCanBePrefix(
true);
110 if (!interest.getName().empty() && interest.getName()[-1].isVersion()) {
111 interest.setMustBeFresh(
false);
117 if (isRetransmission) {
118 interest.refreshNonce();
121 sendInterest(0, interest, isRetransmission);
125 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
127 if (checkAllSegmentsReceived()) {
129 return finalizeFetch();
132 int64_t availableWindowSize;
134 availableWindowSize = std::min<int64_t>(m_cwnd, m_options.
flowControlWindow - m_segmentBuffer.size());
137 availableWindowSize =
static_cast<int64_t
>(m_cwnd);
139 availableWindowSize -= m_nSegmentsInFlight;
141 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
143 while (availableWindowSize > 0) {
144 if (!m_retxQueue.empty()) {
145 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
147 if (pendingSegmentIt == m_pendingSegments.end()) {
151 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
152 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
154 else if (m_nSegments == 0 || m_nextSegmentNum <
static_cast<uint64_t
>(m_nSegments)) {
155 if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
160 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
165 availableWindowSize--;
168 for (
const auto& segment : segmentsToRequest) {
170 interest.setName(
Name(m_versionedDataName).appendSegment(segment.first));
171 interest.setCanBePrefix(
false);
172 interest.setMustBeFresh(
false);
174 interest.refreshNonce();
175 sendInterest(segment.first, interest, segment.second);
180 SegmentFetcher::sendInterest(uint64_t segNum,
const Interest& interest,
bool isRetransmission)
182 weak_ptr<SegmentFetcher> weakSelf = m_this;
184 ++m_nSegmentsInFlight;
186 [
this, weakSelf] (
const Interest& interest,
const Data& data) {
187 afterSegmentReceivedCb(interest, data, weakSelf);
190 afterNackReceivedCb(interest, nack, weakSelf);
195 auto timeoutEvent = m_scheduler.
schedule(timeout, [
this, interest, weakSelf] {
196 afterTimeoutCb(interest, weakSelf);
199 if (isRetransmission) {
200 updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
205 pendingInterest, timeoutEvent};
206 bool isNew = m_pendingSegments.try_emplace(segNum, std::move(pendingSegment)).second;
208 m_highInterest = segNum;
212 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
213 const weak_ptr<SegmentFetcher>& weakSelf)
215 if (shouldStop(weakSelf))
218 BOOST_ASSERT(m_nSegmentsInFlight > 0);
219 m_nSegmentsInFlight--;
221 name::Component currentSegmentComponent = data.getName().get(-1);
222 if (!currentSegmentComponent.isSegment()) {
226 uint64_t currentSegment = currentSegmentComponent.toSegment();
229 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
230 if (m_receivedSegments.size() > 0) {
231 pendingSegmentIt = m_pendingSegments.find(currentSegment);
234 pendingSegmentIt = m_pendingSegments.begin();
237 if (pendingSegmentIt == m_pendingSegments.end()) {
241 pendingSegmentIt->second.timeoutEvent.cancel();
246 [=] (
const Data& d) { afterValidationSuccess(d, origInterest, pendingSegmentIt, weakSelf); },
247 [=] (
const Data& d,
const auto& error) { afterValidationFailure(d, error, weakSelf); });
251 SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
252 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
253 const weak_ptr<SegmentFetcher>& weakSelf)
255 if (shouldStop(weakSelf))
265 uint64_t currentSegment = data.getName().get(-1).toSegment();
266 m_receivedSegments.insert(currentSegment);
269 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
270 BOOST_ASSERT(m_nSegmentsInFlight >= 0);
271 m_rttEstimator.
addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
272 static_cast<size_t>(m_nSegmentsInFlight) + 1);
276 m_pendingSegments.erase(pendingSegmentIt);
279 auto receivedSegmentIt = m_segmentBuffer.try_emplace(currentSegment, data.getContent().value_size())
281 std::copy(data.getContent().value_begin(), data.getContent().value_end(),
282 receivedSegmentIt->second.begin());
283 m_nBytesReceived += data.getContent().value_size();
286 if (data.getFinalBlock()) {
287 if (!data.getFinalBlock()->isSegment()) {
289 "Received FinalBlockId did not contain a segment component");
292 if (data.getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
293 m_nSegments = data.getFinalBlock()->toSegment() + 1;
294 cancelExcessInFlightSegments();
298 if (m_options.
inOrder && m_nextSegmentInOrder == currentSegment) {
300 onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
301 m_segmentBuffer.erase(m_nextSegmentInOrder++);
302 }
while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
305 if (m_receivedSegments.size() == 1) {
306 m_versionedDataName = data.getName().
getPrefix(-1);
307 if (currentSegment == 0) {
313 if (m_highData < currentSegment) {
314 m_highData = currentSegment;
324 fetchSegmentsInWindow(origInterest);
328 SegmentFetcher::afterValidationFailure(
const Data&,
329 const security::ValidationError& error,
330 const weak_ptr<SegmentFetcher>& weakSelf)
332 if (shouldStop(weakSelf))
339 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
340 const weak_ptr<SegmentFetcher>& weakSelf)
342 if (shouldStop(weakSelf))
347 BOOST_ASSERT(m_nSegmentsInFlight > 0);
348 m_nSegmentsInFlight--;
350 switch (nack.getReason()) {
353 afterNackOrTimeout(origInterest);
362 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
363 const weak_ptr<SegmentFetcher>& weakSelf)
365 if (shouldStop(weakSelf))
370 BOOST_ASSERT(m_nSegmentsInFlight > 0);
371 m_nSegmentsInFlight--;
372 afterNackOrTimeout(origInterest);
376 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
383 BOOST_ASSERT(!m_pendingSegments.empty());
385 const auto& origName = origInterest.getName();
386 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
387 if (!origName.empty() && origName[-1].isSegment()) {
388 pendingSegmentIt = m_pendingSegments.find(origName[-1].toSegment());
389 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
392 pendingSegmentIt = m_pendingSegments.begin();
396 pendingSegmentIt->second.timeoutEvent.cancel();
397 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
401 if (m_receivedSegments.empty()) {
403 fetchFirstSegment(origInterest,
true);
407 m_retxQueue.push(pendingSegmentIt->first);
408 fetchSegmentsInWindow(origInterest);
413 SegmentFetcher::finalizeFetch()
422 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
424 for (int64_t i = 0; i < m_nSegments; i++) {
425 buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
433 SegmentFetcher::windowIncrease()
436 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
440 if (m_cwnd < m_ssthresh) {
441 m_cwnd += m_options.
aiStep;
444 m_cwnd += m_options.
aiStep / std::floor(m_cwnd);
449 SegmentFetcher::windowDecrease()
451 if (m_options.
disableCwa || m_highData > m_recPoint) {
452 m_recPoint = m_highInterest;
455 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
460 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.
mdCoef);
466 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
473 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
474 const PendingInterestHandle& pendingInterest,
475 scheduler::EventId timeoutEvent)
477 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
478 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
479 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
480 pendingSegmentIt->second.state = SegmentState::Retransmitted;
481 pendingSegmentIt->second.hdl = pendingInterest;
482 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
486 SegmentFetcher::cancelExcessInFlightSegments()
488 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
489 if (it->first >=
static_cast<uint64_t
>(m_nSegments)) {
490 it = m_pendingSegments.erase(it);
491 BOOST_ASSERT(m_nSegmentsInFlight > 0);
492 m_nSegmentsInFlight--;
501 SegmentFetcher::checkAllSegmentsReceived()
503 bool haveReceivedAllSegments =
false;
505 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
506 haveReceivedAllSegments =
true;
508 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
509 if (m_receivedSegments.count(i) == 0) {
511 haveReceivedAllSegments =
false;
516 return haveReceivedAllSegments;
520 SegmentFetcher::getEstimatedRto()
525 time::duration_cast<time::milliseconds>(m_rttEstimator.
getEstimatedRto()));
Provide a communication channel with local or remote NDN forwarder.
boost::asio::io_context & getIoContext() const noexcept
Returns a reference to the io_context used by this face.
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express an Interest.
Represents an Interest packet.
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Utility class to fetch a versioned and segmented object.
signal::Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
signal::Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::Validator &validator, const Options &options={})
Initiates segment fetching.
void stop()
Stops fetching.
signal::Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
signal::Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
signal::Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in 'in order' mode.
signal::Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
@ INTEREST_TIMEOUT
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
@ FINALBLOCKID_NOT_SEGMENT
A received FinalBlockId did not contain a segment component.
@ NACK_ERROR
An unrecoverable Nack was received during retrieval.
@ SEGMENT_VALIDATION_FAIL
One of the retrieved segments failed user-provided validation.
@ DATA_HAS_NO_SEGMENT
One of the retrieved Data packets lacked a segment number in the last Name component (excl....
signal::Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
signal::Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Interface for validating data and interest packets.
void validate(const Data &data, const DataValidationSuccessCallback &successCb, const DataValidationFailureCallback &failureCb)
Asynchronously validate data.
static time_point now() noexcept
void backoffRto()
Backoff RTO by a factor of Options::rtoBackoffMultiplier.
void addMeasurement(time::nanoseconds rtt, size_t nExpectedSamples=1)
Records a new RTT measurement.
time::nanoseconds getEstimatedRto() const
Returns the estimated RTO value.
::boost::chrono::milliseconds milliseconds
Options for SegmentFetcher.
size_t flowControlWindow
Maximum number of segments stored in the reorder buffer.
double initCwnd
Initial congestion window size.
bool resetCwndToInit
Reduce cwnd to initCwnd when a loss event occurs.
bool probeLatestVersion
Use the first Interest to probe the latest version of the object.
double mdCoef
Multiplicative decrease coefficient.
time::milliseconds maxTimeout
Maximum allowed time between successful receipt of segments.
bool disableCwa
Disable Conservative Window Adaptation.
bool inOrder
Set to true for 'in order' mode, false for 'block' mode.
bool useConstantInterestTimeout
If true, Interest timeout is kept fixed at maxTimeout.
time::milliseconds interestLifetime
Lifetime of sent Interests (independent of Interest timeout)
double aiStep
Additive increase step (in segments)
bool useConstantCwnd
If true, window size is kept fixed at initCwnd.
bool ignoreCongMarks
Disable window decrease after a congestion mark is received.