30 #include <boost/asio/io_service.hpp>
31 #include <boost/lexical_cast.hpp>
32 #include <boost/range/adaptor/map.hpp>
39 constexpr
double SegmentFetcher::MIN_SSTHRESH;
45 NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
49 NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
53 NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
56 if (mdCoef < 0.0 || mdCoef > 1.0) {
57 NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
61 SegmentFetcher::SegmentFetcher(
Face& face,
66 , m_scheduler(m_face.getIoService())
67 , m_validator(validator)
69 , m_timeLastSegmentReceived(time::steady_clock::now())
70 , m_cwnd(options.initCwnd)
71 , m_ssthresh(options.initSsthresh)
76 shared_ptr<SegmentFetcher>
82 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
83 fetcher->m_this = fetcher;
84 fetcher->fetchFirstSegment(baseInterest,
false);
95 m_pendingSegments.clear();
96 m_face.
getIoService().post([
self = std::move(m_this)] {});
100 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
102 auto self = weakSelf.lock();
103 return self ==
nullptr ||
self->m_this ==
nullptr;
107 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
110 interest.setCanBePrefix(
true);
111 interest.setMustBeFresh(
true);
113 if (isRetransmission) {
114 interest.refreshNonce();
117 sendInterest(0, interest, isRetransmission);
121 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
123 if (checkAllSegmentsReceived()) {
125 return finalizeFetch();
128 int64_t availableWindowSize;
130 availableWindowSize = std::min<int64_t>(m_cwnd, m_options.
flowControlWindow - m_segmentBuffer.size());
133 availableWindowSize =
static_cast<int64_t
>(m_cwnd);
135 availableWindowSize -= m_nSegmentsInFlight;
137 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
139 while (availableWindowSize > 0) {
140 if (!m_retxQueue.empty()) {
141 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
143 if (pendingSegmentIt == m_pendingSegments.end()) {
147 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
148 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
150 else if (m_nSegments == 0 || m_nextSegmentNum <
static_cast<uint64_t
>(m_nSegments)) {
151 if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
156 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
161 availableWindowSize--;
164 for (
const auto& segment : segmentsToRequest) {
166 interest.setName(
Name(m_versionedDataName).appendSegment(segment.first));
167 interest.setCanBePrefix(
false);
168 interest.setMustBeFresh(
false);
170 interest.refreshNonce();
171 sendInterest(segment.first, interest, segment.second);
176 SegmentFetcher::sendInterest(uint64_t segNum,
const Interest& interest,
bool isRetransmission)
178 weak_ptr<SegmentFetcher> weakSelf = m_this;
180 ++m_nSegmentsInFlight;
182 [
this, weakSelf] (
const Interest& interest,
const Data& data) {
183 afterSegmentReceivedCb(interest, data, weakSelf);
186 afterNackReceivedCb(interest, nack, weakSelf);
191 auto timeoutEvent = m_scheduler.
schedule(timeout, [
this, interest, weakSelf] {
192 afterTimeoutCb(interest, weakSelf);
195 if (isRetransmission) {
196 updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
201 pendingInterest, timeoutEvent};
202 bool isNew = m_pendingSegments.emplace(segNum, std::move(pendingSegment)).second;
204 m_highInterest = segNum;
208 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
209 const weak_ptr<SegmentFetcher>& weakSelf)
211 if (shouldStop(weakSelf))
214 BOOST_ASSERT(m_nSegmentsInFlight > 0);
215 m_nSegmentsInFlight--;
217 name::Component currentSegmentComponent = data.getName().get(-1);
218 if (!currentSegmentComponent.isSegment()) {
222 uint64_t currentSegment = currentSegmentComponent.toSegment();
225 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
226 if (m_receivedSegments.size() > 0) {
227 pendingSegmentIt = m_pendingSegments.find(currentSegment);
230 pendingSegmentIt = m_pendingSegments.begin();
233 if (pendingSegmentIt == m_pendingSegments.end()) {
237 pendingSegmentIt->second.timeoutEvent.cancel();
242 [=] (
const Data& d) { afterValidationSuccess(d, origInterest, pendingSegmentIt, weakSelf); },
243 [=] (
const Data& d,
const auto& error) { afterValidationFailure(d, error, weakSelf); });
247 SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
248 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
249 const weak_ptr<SegmentFetcher>& weakSelf)
251 if (shouldStop(weakSelf))
261 uint64_t currentSegment = data.getName().get(-1).toSegment();
262 m_receivedSegments.insert(currentSegment);
265 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
266 BOOST_ASSERT(m_nSegmentsInFlight >= 0);
267 m_rttEstimator.
addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
268 static_cast<size_t>(m_nSegmentsInFlight) + 1);
272 m_pendingSegments.erase(pendingSegmentIt);
275 auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
276 std::forward_as_tuple(currentSegment),
277 std::forward_as_tuple(data.getContent().value_size()));
278 std::copy(data.getContent().value_begin(), data.getContent().value_end(),
279 receivedSegmentIt.first->second.begin());
280 m_nBytesReceived += data.getContent().value_size();
283 if (data.getFinalBlock()) {
284 if (!data.getFinalBlock()->isSegment()) {
286 "Received FinalBlockId did not contain a segment component");
289 if (data.getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
290 m_nSegments = data.getFinalBlock()->toSegment() + 1;
291 cancelExcessInFlightSegments();
295 if (m_options.
inOrder && m_nextSegmentInOrder == currentSegment) {
297 onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
298 m_segmentBuffer.erase(m_nextSegmentInOrder++);
299 }
while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
302 if (m_receivedSegments.size() == 1) {
303 m_versionedDataName = data.getName().
getPrefix(-1);
304 if (currentSegment == 0) {
310 if (m_highData < currentSegment) {
311 m_highData = currentSegment;
321 fetchSegmentsInWindow(origInterest);
325 SegmentFetcher::afterValidationFailure(
const Data&,
326 const security::ValidationError& error,
327 const weak_ptr<SegmentFetcher>& weakSelf)
329 if (shouldStop(weakSelf))
336 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
337 const weak_ptr<SegmentFetcher>& weakSelf)
339 if (shouldStop(weakSelf))
344 BOOST_ASSERT(m_nSegmentsInFlight > 0);
345 m_nSegmentsInFlight--;
347 switch (nack.getReason()) {
350 afterNackOrTimeout(origInterest);
359 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
360 const weak_ptr<SegmentFetcher>& weakSelf)
362 if (shouldStop(weakSelf))
367 BOOST_ASSERT(m_nSegmentsInFlight > 0);
368 m_nSegmentsInFlight--;
369 afterNackOrTimeout(origInterest);
373 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
380 name::Component lastNameComponent = origInterest.getName().get(-1);
381 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
382 BOOST_ASSERT(m_pendingSegments.size() > 0);
383 if (lastNameComponent.isSegment()) {
384 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
385 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
388 BOOST_ASSERT(m_pendingSegments.size() > 0);
389 pendingSegmentIt = m_pendingSegments.begin();
393 pendingSegmentIt->second.timeoutEvent.cancel();
394 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
398 if (m_receivedSegments.size() == 0) {
400 fetchFirstSegment(origInterest,
true);
404 m_retxQueue.push(pendingSegmentIt->first);
405 fetchSegmentsInWindow(origInterest);
410 SegmentFetcher::finalizeFetch()
419 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
421 for (int64_t i = 0; i < m_nSegments; i++) {
422 buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
430 SegmentFetcher::windowIncrease()
433 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
437 if (m_cwnd < m_ssthresh) {
438 m_cwnd += m_options.
aiStep;
441 m_cwnd += m_options.
aiStep / std::floor(m_cwnd);
446 SegmentFetcher::windowDecrease()
448 if (m_options.
disableCwa || m_highData > m_recPoint) {
449 m_recPoint = m_highInterest;
452 BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
457 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.
mdCoef);
463 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
470 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
471 const PendingInterestHandle& pendingInterest,
472 scheduler::EventId timeoutEvent)
474 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
475 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
476 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
477 pendingSegmentIt->second.state = SegmentState::Retransmitted;
478 pendingSegmentIt->second.hdl = pendingInterest;
479 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
483 SegmentFetcher::cancelExcessInFlightSegments()
485 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
486 if (it->first >=
static_cast<uint64_t
>(m_nSegments)) {
487 it = m_pendingSegments.erase(it);
488 BOOST_ASSERT(m_nSegmentsInFlight > 0);
489 m_nSegmentsInFlight--;
498 SegmentFetcher::checkAllSegmentsReceived()
500 bool haveReceivedAllSegments =
false;
502 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
503 haveReceivedAllSegments =
true;
505 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
506 if (m_receivedSegments.count(i) == 0) {
508 haveReceivedAllSegments =
false;
513 return haveReceivedAllSegments;
517 SegmentFetcher::getEstimatedRto()
522 time::duration_cast<time::milliseconds>(m_rttEstimator.
getEstimatedRto()));
Provide a communication channel with local or remote NDN forwarder.
boost::asio::io_service & getIoService()
Returns a reference to the io_service used by this face.
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
Represents an Interest packet.
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Interface for validating data and interest packets.
void validate(const Data &data, const DataValidationSuccessCallback &successCb, const DataValidationFailureCallback &failureCb)
Asynchronously validate data.
static time_point now() noexcept
void backoffRto()
Backoff RTO by a factor of Options::rtoBackoffMultiplier.
void addMeasurement(time::nanoseconds rtt, size_t nExpectedSamples=1)
Records a new RTT measurement.
time::nanoseconds getEstimatedRto() const
Returns the estimated RTO value.
bool useConstantInterestTimeout
if true, Interest timeout is kept at maxTimeout
double mdCoef
multiplicative decrease coefficient
double aiStep
additive increase step (in segments)
double initCwnd
initial congestion window size
bool disableCwa
disable Conservative Window Adaptation
bool resetCwndToInit
reduce cwnd to initCwnd when loss event occurs
time::milliseconds interestLifetime
lifetime of sent Interests - independent of Interest timeout
bool useConstantCwnd
if true, window size is kept at initCwnd
bool inOrder
true for 'in order' mode, false for 'block' mode
size_t flowControlWindow
maximum number of segments stored in the reorder buffer
bool ignoreCongMarks
disable window decrease after congestion mark received
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in 'in order' mode.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
@ INTEREST_TIMEOUT
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
@ DATA_HAS_NO_SEGMENT
One of the retrieved Data packets lacked a segment number in the last Name component (excl....
@ FINALBLOCKID_NOT_SEGMENT
A received FinalBlockId did not contain a segment component.
@ SEGMENT_VALIDATION_FAIL
One of the retrieved segments failed user-provided validation.
@ NACK_ERROR
An unrecoverable Nack was received during retrieval.
Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
void stop()
Stops fetching.
Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
boost::chrono::milliseconds milliseconds