30 #include <boost/asio/io_service.hpp> 31 #include <boost/lexical_cast.hpp> 32 #include <boost/range/adaptor/map.hpp> 39 constexpr
double SegmentFetcher::MIN_SSTHRESH;
45 NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
49 NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
53 NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
56 if (mdCoef < 0.0 || mdCoef > 1.0) {
57 NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
61 SegmentFetcher::SegmentFetcher(
Face& face,
66 , m_scheduler(m_face.getIoService())
67 , m_validator(validator)
68 , m_rttEstimator(make_shared<RttEstimator::Options>(options.
rttOptions))
76 shared_ptr<SegmentFetcher>
82 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
83 fetcher->m_this = fetcher;
84 fetcher->fetchFirstSegment(baseInterest,
false);
95 m_pendingSegments.clear();
96 m_face.getIoService().post([
self = std::move(m_this)] {});
100 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
102 auto self = weakSelf.lock();
103 return self ==
nullptr ||
self->m_this ==
nullptr;
107 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
113 if (isRetransmission) {
117 sendInterest(0, interest, isRetransmission);
121 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
123 if (checkAllSegmentsReceived()) {
125 return finalizeFetch();
128 int64_t availableWindowSize;
129 if (m_options.inOrder) {
130 availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
133 availableWindowSize =
static_cast<int64_t
>(m_cwnd);
135 availableWindowSize -= m_nSegmentsInFlight;
137 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
139 while (availableWindowSize > 0) {
140 if (!m_retxQueue.empty()) {
141 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
143 if (pendingSegmentIt == m_pendingSegments.end()) {
147 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
148 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
150 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
151 if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
156 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
161 availableWindowSize--;
164 for (
const auto& segment : segmentsToRequest) {
166 interest.
setName(
Name(m_versionedDataName).appendSegment(segment.first));
171 sendInterest(segment.first, interest, segment.second);
176 SegmentFetcher::sendInterest(uint64_t segNum,
const Interest& interest,
bool isRetransmission)
178 weak_ptr<SegmentFetcher> weakSelf = m_this;
180 ++m_nSegmentsInFlight;
181 auto pendingInterest = m_face.expressInterest(interest,
182 [
this, weakSelf] (
const Interest& interest,
const Data& data) {
183 afterSegmentReceivedCb(interest, data, weakSelf);
186 afterNackReceivedCb(interest, nack, weakSelf);
190 auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
191 auto timeoutEvent = m_scheduler.schedule(timeout, [
this, interest, weakSelf] {
192 afterTimeoutCb(interest, weakSelf);
195 if (isRetransmission) {
196 updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
201 pendingInterest, timeoutEvent};
202 bool isNew = m_pendingSegments.emplace(segNum, std::move(pendingSegment)).second;
204 m_highInterest = segNum;
208 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
209 const weak_ptr<SegmentFetcher>& weakSelf)
211 if (shouldStop(weakSelf))
214 BOOST_ASSERT(m_nSegmentsInFlight > 0);
215 m_nSegmentsInFlight--;
218 if (!currentSegmentComponent.
isSegment()) {
222 uint64_t currentSegment = currentSegmentComponent.
toSegment();
225 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
226 if (m_receivedSegments.size() > 0) {
227 pendingSegmentIt = m_pendingSegments.find(currentSegment);
230 pendingSegmentIt = m_pendingSegments.begin();
233 if (pendingSegmentIt == m_pendingSegments.end()) {
237 pendingSegmentIt->second.timeoutEvent.cancel();
241 m_validator.validate(data,
242 bind(&SegmentFetcher::afterValidationSuccess,
this, _1, origInterest,
243 pendingSegmentIt, weakSelf),
244 bind(&SegmentFetcher::afterValidationFailure,
this, _1, _2, weakSelf));
248 SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
249 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
250 const weak_ptr<SegmentFetcher>& weakSelf)
252 if (shouldStop(weakSelf))
263 m_receivedSegments.insert(currentSegment);
266 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
267 BOOST_ASSERT(m_nSegmentsInFlight >= 0);
268 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
269 static_cast<size_t>(m_nSegmentsInFlight) + 1);
273 m_pendingSegments.erase(pendingSegmentIt);
276 auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
277 std::forward_as_tuple(currentSegment),
280 receivedSegmentIt.first->second.begin());
287 "Received FinalBlockId did not contain a segment component");
290 if (data.
getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
292 cancelExcessInFlightSegments();
296 if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
298 onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
299 m_segmentBuffer.erase(m_nextSegmentInOrder++);
300 }
while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
303 if (m_receivedSegments.size() == 1) {
305 if (currentSegment == 0) {
311 if (m_highData < currentSegment) {
312 m_highData = currentSegment;
322 fetchSegmentsInWindow(origInterest);
326 SegmentFetcher::afterValidationFailure(
const Data& data,
328 const weak_ptr<SegmentFetcher>& weakSelf)
330 if (shouldStop(weakSelf))
337 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
338 const weak_ptr<SegmentFetcher>& weakSelf)
340 if (shouldStop(weakSelf))
345 BOOST_ASSERT(m_nSegmentsInFlight > 0);
346 m_nSegmentsInFlight--;
351 afterNackOrTimeout(origInterest);
360 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
361 const weak_ptr<SegmentFetcher>& weakSelf)
363 if (shouldStop(weakSelf))
368 BOOST_ASSERT(m_nSegmentsInFlight > 0);
369 m_nSegmentsInFlight--;
370 afterNackOrTimeout(origInterest);
374 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
382 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
383 BOOST_ASSERT(m_pendingSegments.size() > 0);
385 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.
toSegment()) > 0);
386 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.
toSegment());
389 BOOST_ASSERT(m_pendingSegments.size() > 0);
390 pendingSegmentIt = m_pendingSegments.begin();
394 pendingSegmentIt->second.timeoutEvent.cancel();
395 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
397 m_rttEstimator.backoffRto();
399 if (m_receivedSegments.size() == 0) {
401 fetchFirstSegment(origInterest,
true);
405 m_retxQueue.push(pendingSegmentIt->first);
406 fetchSegmentsInWindow(origInterest);
411 SegmentFetcher::finalizeFetch()
413 if (m_options.inOrder) {
420 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
422 for (int64_t i = 0; i < m_nSegments; i++) {
423 buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
431 SegmentFetcher::windowIncrease()
433 if (m_options.useConstantCwnd) {
434 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
438 if (m_cwnd < m_ssthresh) {
439 m_cwnd += m_options.aiStep;
442 m_cwnd += m_options.aiStep / std::floor(m_cwnd);
447 SegmentFetcher::windowDecrease()
449 if (m_options.disableCwa || m_highData > m_recPoint) {
450 m_recPoint = m_highInterest;
452 if (m_options.useConstantCwnd) {
453 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
458 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef);
459 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
464 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
471 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
475 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
476 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
477 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
478 pendingSegmentIt->second.state = SegmentState::Retransmitted;
479 pendingSegmentIt->second.hdl = pendingInterest;
480 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
484 SegmentFetcher::cancelExcessInFlightSegments()
486 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
487 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
488 it = m_pendingSegments.erase(it);
489 BOOST_ASSERT(m_nSegmentsInFlight > 0);
490 m_nSegmentsInFlight--;
499 SegmentFetcher::checkAllSegmentsReceived()
501 bool haveReceivedAllSegments =
false;
503 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
504 haveReceivedAllSegments =
true;
506 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
507 if (m_receivedSegments.count(i) == 0) {
509 haveReceivedAllSegments =
false;
514 return haveReceivedAllSegments;
518 SegmentFetcher::getEstimatedRto()
522 return std::min(m_options.maxTimeout,
523 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
const Component & get(ssize_t i) const
Returns an immutable reference to the component at the specified index.
static time_point now() noexcept
An unrecoverable Nack was received during retrieval.
void refreshNonce()
Change nonce value.
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
size_t value_size() const noexcept
Return the size of TLV-VALUE, aka TLV-LENGTH.
Represents an Interest packet.
Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in 'in order' mode.
void stop()
Stops fetching.
A handle for a scheduled event.
represents a Network Nack
NackReason getReason() const
Buffer::const_iterator value_begin() const
Get begin iterator of TLV-VALUE.
double aiStep
additive increase step (in segments)
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
One of the retrieved segments failed user-provided validation.
Handle for a pending Interest.
bool isSegment() const
Check if the component is a segment number per NDN naming conventions.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
Buffer::const_iterator value_end() const
Get end iterator of TLV-VALUE.
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
const Name & getName() const noexcept
Get name.
double initSsthresh
initial slow start threshold
double initCwnd
initial congestion window size
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
Validation error code and optional detailed error message.
const Block & getContent() const noexcept
Get the Content element.
const Name & getName() const noexcept
Interest & setInterestLifetime(time::milliseconds lifetime)
Set the Interest's lifetime.
implements an output stream that constructs ndn::Buffer
Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
Represents a Data packet.
A received FinalBlockId did not contain a segment component.
const optional< name::Component > & getFinalBlock() const
Interface for validating data and interest packets.
Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.
Interest & setName(const Name &name)
Set the Interest's name.