30#include <boost/asio/io_context.hpp>
31#include <boost/asio/post.hpp>
32#include <boost/lexical_cast.hpp>
33#include <boost/range/adaptor/map.hpp>
44 NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
48 NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
52 NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
55 if (mdCoef < 0.0 || mdCoef > 1.0) {
56 NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
60SegmentFetcher::SegmentFetcher(
Face& face,
65 , m_scheduler(m_face.getIoContext())
66 , m_validator(validator)
67 , m_rttEstimator(make_shared<util::RttEstimator::Options>(options.rttOptions))
68 , m_timeLastSegmentReceived(time::steady_clock::now())
69 , m_cwnd(options.initCwnd)
70 , m_ssthresh(options.initSsthresh)
75shared_ptr<SegmentFetcher>
81 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
82 fetcher->m_this = fetcher;
83 fetcher->fetchFirstSegment(baseInterest,
false);
94 m_pendingSegments.clear();
95 boost::asio::post(m_face.
getIoContext(), [self = std::move(m_this)] {});
99SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
101 auto self = weakSelf.lock();
102 return self ==
nullptr || self->m_this ==
nullptr;
106SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
108 Interest interest(baseInterest);
109 interest.setCanBePrefix(
true);
110 if (!interest.getName().empty() && interest.getName()[-1].isVersion()) {
111 interest.setMustBeFresh(
false);
117 if (isRetransmission) {
118 interest.refreshNonce();
121 sendInterest(0, interest, isRetransmission);
125SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
127 if (checkAllSegmentsReceived()) {
129 return finalizeFetch();
132 int64_t availableWindowSize;
134 availableWindowSize = std::min<int64_t>(m_cwnd, m_options.
flowControlWindow - m_segmentBuffer.size());
137 availableWindowSize =
static_cast<int64_t
>(m_cwnd);
139 availableWindowSize -= m_nSegmentsInFlight;
141 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
143 while (availableWindowSize > 0) {
144 if (!m_retxQueue.empty()) {
145 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
147 if (pendingSegmentIt == m_pendingSegments.end()) {
151 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
152 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
154 else if (m_nSegments == 0 || m_nextSegmentNum <
static_cast<uint64_t
>(m_nSegments)) {
155 if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
160 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
165 availableWindowSize--;
168 for (
const auto& segment : segmentsToRequest) {
170 interest.setName(
Name(m_versionedDataName).appendSegment(segment.first));
171 interest.setCanBePrefix(
false);
172 interest.setMustBeFresh(
false);
174 interest.refreshNonce();
175 sendInterest(segment.first, interest, segment.second);
180SegmentFetcher::sendInterest(uint64_t segNum,
const Interest& interest,
bool isRetransmission)
182 weak_ptr<SegmentFetcher> weakSelf = m_this;
184 ++m_nSegmentsInFlight;
186 [
this, weakSelf] (
const Interest& interest,
const Data& data) {
187 afterSegmentReceivedCb(interest, data, weakSelf);
189 [
this, weakSelf] (
const Interest& interest,
const lp::Nack& nack) {
190 afterNackReceivedCb(interest, nack, weakSelf);
195 auto timeoutEvent = m_scheduler.
schedule(timeout, [
this, interest, weakSelf] {
196 afterTimeoutCb(interest, weakSelf);
199 if (isRetransmission) {
200 updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
205 pendingInterest, timeoutEvent};
206 bool isNew = m_pendingSegments.try_emplace(segNum, std::move(pendingSegment)).second;
208 m_highInterest = segNum;
212SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
213 const weak_ptr<SegmentFetcher>& weakSelf)
215 if (shouldStop(weakSelf))
218 BOOST_ASSERT(m_nSegmentsInFlight > 0);
219 m_nSegmentsInFlight--;
221 name::Component currentSegmentComponent = data.getName().get(-1);
222 if (!currentSegmentComponent.isSegment()) {
226 uint64_t currentSegment = currentSegmentComponent.toSegment();
229 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
230 if (m_receivedSegments.size() > 0) {
231 pendingSegmentIt = m_pendingSegments.find(currentSegment);
234 pendingSegmentIt = m_pendingSegments.begin();
237 if (pendingSegmentIt == m_pendingSegments.end()) {
241 pendingSegmentIt->second.timeoutEvent.cancel();
246 [=] (
const Data& d) { afterValidationSuccess(d, origInterest, pendingSegmentIt, weakSelf); },
247 [=] (
const Data& d,
const auto& error) { afterValidationFailure(d, error, weakSelf); });
251SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
252 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
253 const weak_ptr<SegmentFetcher>& weakSelf)
255 if (shouldStop(weakSelf))
265 uint64_t currentSegment = data.getName().get(-1).toSegment();
266 m_receivedSegments.insert(currentSegment);
269 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
270 BOOST_ASSERT(m_nSegmentsInFlight >= 0);
271 m_rttEstimator.
addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
272 static_cast<size_t>(m_nSegmentsInFlight) + 1);
276 m_pendingSegments.erase(pendingSegmentIt);
279 auto receivedSegmentIt = m_segmentBuffer.try_emplace(currentSegment, data.getContent().value_size())
281 std::copy(data.getContent().value_begin(), data.getContent().value_end(),
282 receivedSegmentIt->second.begin());
283 m_nBytesReceived += data.getContent().value_size();
286 if (data.getFinalBlock()) {
287 if (!data.getFinalBlock()->isSegment()) {
289 "Received FinalBlockId did not contain a segment component");
292 if (data.getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
293 m_nSegments = data.getFinalBlock()->toSegment() + 1;
294 cancelExcessInFlightSegments();
298 if (m_options.
inOrder && m_nextSegmentInOrder == currentSegment) {
300 onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
301 m_segmentBuffer.erase(m_nextSegmentInOrder++);
302 }
while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
305 if (m_receivedSegments.size() == 1) {
306 m_versionedDataName = data.getName().
getPrefix(-1);
307 if (currentSegment == 0) {
313 if (m_highData < currentSegment) {
314 m_highData = currentSegment;
324 fetchSegmentsInWindow(origInterest);
328SegmentFetcher::afterValidationFailure(
const Data&,
329 const security::ValidationError& error,
330 const weak_ptr<SegmentFetcher>& weakSelf)
332 if (shouldStop(weakSelf))
339SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
340 const weak_ptr<SegmentFetcher>& weakSelf)
342 if (shouldStop(weakSelf))
347 BOOST_ASSERT(m_nSegmentsInFlight > 0);
348 m_nSegmentsInFlight--;
350 switch (nack.getReason()) {
353 afterNackOrTimeout(origInterest);
362SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
363 const weak_ptr<SegmentFetcher>& weakSelf)
365 if (shouldStop(weakSelf))
370 BOOST_ASSERT(m_nSegmentsInFlight > 0);
371 m_nSegmentsInFlight--;
372 afterNackOrTimeout(origInterest);
376SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
383 BOOST_ASSERT(!m_pendingSegments.empty());
385 const auto& origName = origInterest.getName();
386 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
387 if (!origName.empty() && origName[-1].isSegment()) {
388 pendingSegmentIt = m_pendingSegments.find(origName[-1].toSegment());
389 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
392 pendingSegmentIt = m_pendingSegments.begin();
396 pendingSegmentIt->second.timeoutEvent.cancel();
397 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
401 if (m_receivedSegments.empty()) {
403 fetchFirstSegment(origInterest,
true);
407 m_retxQueue.push(pendingSegmentIt->first);
408 fetchSegmentsInWindow(origInterest);
413SegmentFetcher::finalizeFetch()
422 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
424 for (int64_t i = 0; i < m_nSegments; i++) {
425 buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
433SegmentFetcher::windowIncrease()
436 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
440 if (m_cwnd < m_ssthresh) {
441 m_cwnd += m_options.
aiStep;
444 m_cwnd += m_options.
aiStep / std::floor(m_cwnd);
449SegmentFetcher::windowDecrease()
451 if (m_options.
disableCwa || m_highData > m_recPoint) {
452 m_recPoint = m_highInterest;
455 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
460 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.
mdCoef);
466SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
473SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
474 const PendingInterestHandle& pendingInterest,
475 scheduler::EventId timeoutEvent)
477 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
478 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
479 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
480 pendingSegmentIt->second.state = SegmentState::Retransmitted;
481 pendingSegmentIt->second.hdl = pendingInterest;
482 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
486SegmentFetcher::cancelExcessInFlightSegments()
488 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
489 if (it->first >=
static_cast<uint64_t
>(m_nSegments)) {
490 it = m_pendingSegments.erase(it);
491 BOOST_ASSERT(m_nSegmentsInFlight > 0);
492 m_nSegmentsInFlight--;
501SegmentFetcher::checkAllSegmentsReceived()
503 bool haveReceivedAllSegments =
false;
505 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
506 haveReceivedAllSegments =
true;
508 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
509 if (m_receivedSegments.count(i) == 0) {
511 haveReceivedAllSegments =
false;
516 return haveReceivedAllSegments;
520SegmentFetcher::getEstimatedRto()
525 time::duration_cast<time::milliseconds>(m_rttEstimator.
getEstimatedRto()));
Provide a communication channel with local or remote NDN forwarder.
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express an Interest.
boost::asio::io_context & getIoContext() const noexcept
Returns a reference to the io_context used by this face.
Represents an Interest packet.
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Utility class to fetch a versioned and segmented object.
signal::Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
signal::Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::Validator &validator, const Options &options={})
Initiates segment fetching.
void stop()
Stops fetching.
signal::Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
signal::Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
signal::Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in 'in order' mode.
signal::Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
@ INTEREST_TIMEOUT
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
@ FINALBLOCKID_NOT_SEGMENT
A received FinalBlockId did not contain a segment component.
@ NACK_ERROR
An unrecoverable Nack was received during retrieval.
@ SEGMENT_VALIDATION_FAIL
One of the retrieved segments failed user-provided validation.
@ DATA_HAS_NO_SEGMENT
One of the retrieved Data packets lacked a segment number in the last Name component (excl....
signal::Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
signal::Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Interface for validating data and interest packets.
void validate(const Data &data, const DataValidationSuccessCallback &successCb, const DataValidationFailureCallback &failureCb)
Asynchronously validate data.
static time_point now() noexcept
void backoffRto()
Backoff RTO by a factor of Options::rtoBackoffMultiplier.
void addMeasurement(time::nanoseconds rtt, size_t nExpectedSamples=1)
Records a new RTT measurement.
time::nanoseconds getEstimatedRto() const
Returns the estimated RTO value.
::boost::chrono::milliseconds milliseconds
Options for SegmentFetcher.
size_t flowControlWindow
Maximum number of segments stored in the reorder buffer.
double initCwnd
Initial congestion window size.
bool resetCwndToInit
Reduce cwnd to initCwnd when a loss event occurs.
bool probeLatestVersion
Use the first Interest to probe the latest version of the object.
double mdCoef
Multiplicative decrease coefficient.
time::milliseconds maxTimeout
Maximum allowed time between successful receipt of segments.
bool disableCwa
Disable Conservative Window Adaptation.
bool inOrder
Set to true for 'in order' mode, false for 'block' mode.
bool useConstantInterestTimeout
If true, Interest timeout is kept fixed at maxTimeout.
time::milliseconds interestLifetime
Lifetime of sent Interests (independent of Interest timeout)
double aiStep
Additive increase step (in segments)
bool useConstantCwnd
If true, window size is kept fixed at initCwnd.
bool ignoreCongMarks
Disable window decrease after a congestion mark is received.