34 #include <boost/asio/io_service.hpp> 35 #include <boost/lexical_cast.hpp> 36 #include <boost/range/adaptor/map.hpp> 43 constexpr
double SegmentFetcher::MIN_SSTHRESH;
49 NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
53 NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
57 NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
60 if (mdCoef < 0.0 || mdCoef > 1.0) {
61 NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
65 SegmentFetcher::SegmentFetcher(
Face& face,
70 , m_scheduler(m_face.getIoService())
71 , m_validator(validator)
72 , m_rttEstimator(make_shared<RttEstimator::Options>(options.
rttOptions))
77 , m_nSegmentsInFlight(0)
88 shared_ptr<SegmentFetcher>
94 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
95 fetcher->m_this = fetcher;
96 fetcher->fetchFirstSegment(baseInterest,
false);
107 m_pendingSegments.clear();
108 m_face.getIoService().post([
self =
std::move(m_this)] {});
112 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
114 auto self = weakSelf.lock();
115 return self ==
nullptr ||
self->m_this ==
nullptr;
119 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
125 if (isRetransmission) {
129 sendInterest(0, interest, isRetransmission);
133 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
135 if (checkAllSegmentsReceived()) {
137 return finalizeFetch();
140 int64_t availableWindowSize =
static_cast<int64_t
>(m_cwnd) - m_nSegmentsInFlight;
141 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
143 while (availableWindowSize > 0) {
144 if (!m_retxQueue.empty()) {
145 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
147 if (pendingSegmentIt == m_pendingSegments.end()) {
151 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
152 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
154 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
155 if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
160 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
165 availableWindowSize--;
168 for (
const auto& segment : segmentsToRequest) {
170 interest.
setName(
Name(m_versionedDataName).appendSegment(segment.first));
175 sendInterest(segment.first, interest, segment.second);
180 SegmentFetcher::sendInterest(uint64_t segNum,
const Interest& interest,
bool isRetransmission)
182 weak_ptr<SegmentFetcher> weakSelf = m_this;
184 ++m_nSegmentsInFlight;
185 auto pendingInterest = m_face.expressInterest(interest,
186 [
this, weakSelf] (
const Interest& interest,
const Data& data) {
187 afterSegmentReceivedCb(interest, data, weakSelf);
190 afterNackReceivedCb(interest, nack, weakSelf);
194 auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
195 auto timeoutEvent = m_scheduler.schedule(timeout, [
this, interest, weakSelf] {
196 afterTimeoutCb(interest, weakSelf);
199 if (isRetransmission) {
200 updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
205 pendingInterest, timeoutEvent};
206 bool isNew = m_pendingSegments.emplace(segNum,
std::move(pendingSegment)).second;
208 m_highInterest = segNum;
212 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
213 const weak_ptr<SegmentFetcher>& weakSelf)
215 if (shouldStop(weakSelf))
218 BOOST_ASSERT(m_nSegmentsInFlight > 0);
219 m_nSegmentsInFlight--;
222 if (!currentSegmentComponent.
isSegment()) {
226 uint64_t currentSegment = currentSegmentComponent.
toSegment();
229 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
230 if (m_receivedSegments.size() > 0) {
231 pendingSegmentIt = m_pendingSegments.find(currentSegment);
234 pendingSegmentIt = m_pendingSegments.begin();
237 if (pendingSegmentIt == m_pendingSegments.end()) {
241 pendingSegmentIt->second.timeoutEvent.cancel();
245 m_validator.validate(data,
246 bind(&SegmentFetcher::afterValidationSuccess,
this, _1, origInterest,
247 pendingSegmentIt, weakSelf),
248 bind(&SegmentFetcher::afterValidationFailure,
this, _1, _2, weakSelf));
252 SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
253 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
254 const weak_ptr<SegmentFetcher>& weakSelf)
256 if (shouldStop(weakSelf))
268 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
269 BOOST_ASSERT(m_nSegmentsInFlight >= 0);
270 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
271 static_cast<size_t>(m_nSegmentsInFlight) + 1);
275 m_pendingSegments.erase(pendingSegmentIt);
278 auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
279 std::forward_as_tuple(currentSegment),
282 receivedSegmentIt.first->second.begin());
289 "Received FinalBlockId did not contain a segment component");
292 if (data.
getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
294 cancelExcessInFlightSegments();
298 if (m_receivedSegments.size() == 1) {
300 if (currentSegment == 0) {
306 if (m_highData < currentSegment) {
307 m_highData = currentSegment;
317 fetchSegmentsInWindow(origInterest);
321 SegmentFetcher::afterValidationFailure(
const Data& data,
323 const weak_ptr<SegmentFetcher>& weakSelf)
325 if (shouldStop(weakSelf))
332 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
333 const weak_ptr<SegmentFetcher>& weakSelf)
335 if (shouldStop(weakSelf))
340 BOOST_ASSERT(m_nSegmentsInFlight > 0);
341 m_nSegmentsInFlight--;
346 afterNackOrTimeout(origInterest);
355 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
356 const weak_ptr<SegmentFetcher>& weakSelf)
358 if (shouldStop(weakSelf))
363 BOOST_ASSERT(m_nSegmentsInFlight > 0);
364 m_nSegmentsInFlight--;
365 afterNackOrTimeout(origInterest);
369 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
377 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
378 BOOST_ASSERT(m_pendingSegments.size() > 0);
380 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.
toSegment()) > 0);
381 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.
toSegment());
384 BOOST_ASSERT(m_pendingSegments.size() > 0);
385 pendingSegmentIt = m_pendingSegments.begin();
389 pendingSegmentIt->second.timeoutEvent.cancel();
390 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
392 m_rttEstimator.backoffRto();
394 if (m_receivedSegments.size() == 0) {
396 fetchFirstSegment(origInterest,
true);
400 m_retxQueue.push(pendingSegmentIt->first);
401 fetchSegmentsInWindow(origInterest);
406 SegmentFetcher::finalizeFetch()
411 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
413 for (int64_t i = 0; i < m_nSegments; i++) {
414 buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
422 SegmentFetcher::windowIncrease()
424 if (m_options.useConstantCwnd) {
425 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
429 if (m_cwnd < m_ssthresh) {
430 m_cwnd += m_options.aiStep;
433 m_cwnd += m_options.aiStep / std::floor(m_cwnd);
438 SegmentFetcher::windowDecrease()
440 if (m_options.disableCwa || m_highData > m_recPoint) {
441 m_recPoint = m_highInterest;
443 if (m_options.useConstantCwnd) {
444 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
449 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef);
450 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
455 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
462 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
466 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
467 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
468 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
469 pendingSegmentIt->second.state = SegmentState::Retransmitted;
470 pendingSegmentIt->second.hdl = pendingInterest;
471 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
475 SegmentFetcher::cancelExcessInFlightSegments()
477 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
478 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
479 it = m_pendingSegments.erase(it);
480 BOOST_ASSERT(m_nSegmentsInFlight > 0);
481 m_nSegmentsInFlight--;
490 SegmentFetcher::checkAllSegmentsReceived()
492 bool haveReceivedAllSegments =
false;
494 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
495 haveReceivedAllSegments =
true;
497 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
498 if (m_receivedSegments.count(i) == 0) {
500 haveReceivedAllSegments =
false;
505 return haveReceivedAllSegments;
509 SegmentFetcher::getEstimatedRto()
513 return std::min(m_options.maxTimeout,
514 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
const Name & getName() const
Get name.
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
const Block & getContent() const
Get Content.
const Component & get(ssize_t i) const
Returns an immutable reference to the component at the specified index.
static time_point now() noexcept
An unrecoverable Nack was received during retrieval.
void refreshNonce()
Change nonce value.
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emits upon successful retrieval of the complete data.
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
size_t value_size() const noexcept
Return the size of TLV-VALUE, aka TLV-LENGTH.
Represents an Interest packet.
void stop()
Stops fetching.
A handle for a scheduled event.
represents a Network Nack
NackReason getReason() const
Buffer::const_iterator value_begin() const
Get begin iterator of TLV-VALUE.
double aiStep
additive increase step (in segments)
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
One of the retrieved segments failed user-provided validation.
A handle of pending Interest.
bool isSegment() const
Check if the component is a segment number per NDN naming conventions.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emits whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Signal< SegmentFetcher, Data > afterSegmentValidated
Emits whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
Buffer::const_iterator value_end() const
Get end iterator of TLV-VALUE.
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
double initSsthresh
initial slow start threshold
double initCwnd
initial congestion window size
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emits whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emits whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
Validation error code and optional detailed error message.
const Name & getName() const noexcept
Interest & setInterestLifetime(time::milliseconds lifetime)
Set the Interest's lifetime.
implements an output stream that constructs ndn::Buffer
Signal< SegmentFetcher, uint32_t, std::string > onError
Emits when the retrieval could not be completed due to an error.
Represents a Data packet.
A received FinalBlockId did not contain a segment component.
const optional< name::Component > & getFinalBlock() const
Interface for validating data and interest packets.
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.
Interest & setName(const Name &name)
Set the Interest's name.