34 #include <boost/asio/io_service.hpp> 35 #include <boost/lexical_cast.hpp> 36 #include <boost/range/adaptor/map.hpp> 43 constexpr
double SegmentFetcher::MIN_SSTHRESH;
49 NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
53 NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
57 NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
60 if (mdCoef < 0.0 || mdCoef > 1.0) {
61 NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
65 SegmentFetcher::SegmentFetcher(
Face& face,
70 , m_scheduler(m_face.getIoService())
71 , m_validator(validator)
77 , m_nSegmentsInFlight(0)
88 shared_ptr<SegmentFetcher>
94 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
95 fetcher->m_this = fetcher;
96 fetcher->fetchFirstSegment(baseInterest,
false);
107 m_pendingSegments.clear();
108 m_face.getIoService().post([
self =
std::move(m_this)] {});
112 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
114 auto self = weakSelf.lock();
115 return self ==
nullptr ||
self->m_this ==
nullptr;
119 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
125 if (isRetransmission) {
129 sendInterest(0, interest, isRetransmission);
133 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
135 if (checkAllSegmentsReceived()) {
137 return finalizeFetch();
140 int64_t availableWindowSize =
static_cast<int64_t
>(m_cwnd) - m_nSegmentsInFlight;
141 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
143 while (availableWindowSize > 0) {
144 if (!m_retxQueue.empty()) {
145 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
147 if (pendingSegmentIt == m_pendingSegments.end()) {
151 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
152 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
154 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
155 if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
160 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
165 availableWindowSize--;
168 for (
const auto& segment : segmentsToRequest) {
170 interest.
setName(
Name(m_versionedDataName).appendSegment(segment.first));
175 sendInterest(segment.first, interest, segment.second);
180 SegmentFetcher::sendInterest(uint64_t segNum,
const Interest& interest,
bool isRetransmission)
182 weak_ptr<SegmentFetcher> weakSelf = m_this;
184 ++m_nSegmentsInFlight;
185 auto pendingInterest = m_face.expressInterest(interest,
186 [
this, weakSelf] (
const Interest& interest,
const Data& data) {
187 afterSegmentReceivedCb(interest, data, weakSelf);
190 afterNackReceivedCb(interest, nack, weakSelf);
194 auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
195 auto timeoutEvent = m_scheduler.schedule(timeout, [
this, interest, weakSelf] {
196 afterTimeoutCb(interest, weakSelf);
199 if (isRetransmission) {
200 updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
205 pendingInterest, timeoutEvent};
206 bool isNew = m_pendingSegments.emplace(segNum,
std::move(pendingSegment)).second;
208 m_highInterest = segNum;
212 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
213 const weak_ptr<SegmentFetcher>& weakSelf)
215 if (shouldStop(weakSelf))
218 BOOST_ASSERT(m_nSegmentsInFlight > 0);
219 m_nSegmentsInFlight--;
222 if (!currentSegmentComponent.
isSegment()) {
226 uint64_t currentSegment = currentSegmentComponent.
toSegment();
229 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
230 if (m_receivedSegments.size() > 0) {
231 pendingSegmentIt = m_pendingSegments.find(currentSegment);
234 pendingSegmentIt = m_pendingSegments.begin();
237 if (pendingSegmentIt == m_pendingSegments.end()) {
241 pendingSegmentIt->second.timeoutEvent.cancel();
245 m_validator.validate(data,
246 bind(&SegmentFetcher::afterValidationSuccess,
this, _1, origInterest,
247 pendingSegmentIt, weakSelf),
248 bind(&SegmentFetcher::afterValidationFailure,
this, _1, _2, weakSelf));
252 SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
253 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
254 const weak_ptr<SegmentFetcher>& weakSelf)
256 if (shouldStop(weakSelf))
268 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
269 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
270 std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
274 m_pendingSegments.erase(pendingSegmentIt);
277 auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
278 std::forward_as_tuple(currentSegment),
281 receivedSegmentIt.first->second.begin());
288 "Received FinalBlockId did not contain a segment component");
291 if (data.
getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
293 cancelExcessInFlightSegments();
297 if (m_receivedSegments.size() == 1) {
299 if (currentSegment == 0) {
305 if (m_highData < currentSegment) {
306 m_highData = currentSegment;
316 fetchSegmentsInWindow(origInterest);
320 SegmentFetcher::afterValidationFailure(
const Data& data,
322 const weak_ptr<SegmentFetcher>& weakSelf)
324 if (shouldStop(weakSelf))
331 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
332 const weak_ptr<SegmentFetcher>& weakSelf)
334 if (shouldStop(weakSelf))
339 BOOST_ASSERT(m_nSegmentsInFlight > 0);
340 m_nSegmentsInFlight--;
345 afterNackOrTimeout(origInterest);
354 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
355 const weak_ptr<SegmentFetcher>& weakSelf)
357 if (shouldStop(weakSelf))
362 BOOST_ASSERT(m_nSegmentsInFlight > 0);
363 m_nSegmentsInFlight--;
364 afterNackOrTimeout(origInterest);
368 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
376 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
377 BOOST_ASSERT(m_pendingSegments.size() > 0);
379 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.
toSegment()) > 0);
380 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.
toSegment());
383 BOOST_ASSERT(m_pendingSegments.size() > 0);
384 pendingSegmentIt = m_pendingSegments.begin();
388 pendingSegmentIt->second.timeoutEvent.cancel();
389 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
391 m_rttEstimator.backoffRto();
393 if (m_receivedSegments.size() == 0) {
395 fetchFirstSegment(origInterest,
true);
399 m_retxQueue.push(pendingSegmentIt->first);
400 fetchSegmentsInWindow(origInterest);
405 SegmentFetcher::finalizeFetch()
410 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
412 for (int64_t i = 0; i < m_nSegments; i++) {
413 buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
421 SegmentFetcher::windowIncrease()
423 if (m_options.useConstantCwnd) {
424 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
428 if (m_cwnd < m_ssthresh) {
429 m_cwnd += m_options.aiStep;
432 m_cwnd += m_options.aiStep / std::floor(m_cwnd);
437 SegmentFetcher::windowDecrease()
439 if (m_options.disableCwa || m_highData > m_recPoint) {
440 m_recPoint = m_highInterest;
442 if (m_options.useConstantCwnd) {
443 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
448 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef);
449 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
454 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
461 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
465 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
466 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
467 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
468 pendingSegmentIt->second.state = SegmentState::Retransmitted;
469 pendingSegmentIt->second.hdl = pendingInterest;
470 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
474 SegmentFetcher::cancelExcessInFlightSegments()
476 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
477 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
478 it = m_pendingSegments.erase(it);
479 BOOST_ASSERT(m_nSegmentsInFlight > 0);
480 m_nSegmentsInFlight--;
489 SegmentFetcher::checkAllSegmentsReceived()
491 bool haveReceivedAllSegments =
false;
493 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
494 haveReceivedAllSegments =
true;
496 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
497 if (m_receivedSegments.count(i) == 0) {
499 haveReceivedAllSegments =
false;
504 return haveReceivedAllSegments;
508 SegmentFetcher::getEstimatedRto()
512 return std::min(m_options.maxTimeout,
513 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
const Name & getName() const
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
static time_point now() noexcept
An unrecoverable Nack was received during retrieval.
void refreshNonce()
Change nonce value.
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emits upon successful retrieval of the complete data.
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
size_t value_size() const noexcept
Return the size of TLV-VALUE, aka TLV-LENGTH.
Represents an Interest packet.
const optional< name::Component > & getFinalBlock() const
void stop()
Stops fetching.
Buffer::const_iterator value_begin() const
Get begin iterator of TLV-VALUE.
Buffer::const_iterator value_end() const
Get end iterator of TLV-VALUE.
represents a Network Nack
NackReason getReason() const
double aiStep
additive increase step (in segments)
bool isSegment() const
Check if the component is a segment number per NDN naming conventions.
One of the retrieved segments failed user-provided validation.
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
A handle of pending Interest.
Interest & setName(const Name &name)
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emits whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Signal< SegmentFetcher, Data > afterSegmentValidated
Emits whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
double initSsthresh
initial slow start threshold
double initCwnd
initial congestion window size
const Name & getName() const
Get name.
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emits whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emits whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
Validation error code and optional detailed error message.
const Block & getContent() const
Get Content.
PartialName getPrefix(ssize_t nComponents) const
Extract a prefix of the name.
Interest & setInterestLifetime(time::milliseconds lifetime)
Set Interest's lifetime.
implements an output stream that constructs ndn::Buffer
Signal< SegmentFetcher, uint32_t, std::string > onError
Emits when the retrieval could not be completed due to an error.
A handle of scheduled event.
Represents a Data packet.
A received FinalBlockId did not contain a segment component.
const Component & get(ssize_t i) const
Get the component at the given index.
Interface for validating data and interest packets.
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.