34 #include <boost/asio/io_service.hpp>    35 #include <boost/lexical_cast.hpp>    36 #include <boost/range/adaptor/map.hpp>    43 constexpr 
double SegmentFetcher::MIN_SSTHRESH;
    49     NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
    53     NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
    57     NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
    60   if (mdCoef < 0.0 || mdCoef > 1.0) {
    61     NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
    65 SegmentFetcher::SegmentFetcher(
Face& face,
    70   , m_scheduler(m_face.getIoService())
    71   , m_validator(validator)
    72   , m_rttEstimator(make_shared<RttEstimator::Options>(options.
rttOptions))
    77   , m_nSegmentsInFlight(0)
    88 shared_ptr<SegmentFetcher>
    94   shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
    95   fetcher->m_this = fetcher;
    96   fetcher->fetchFirstSegment(baseInterest, 
false);
   107   m_pendingSegments.clear(); 
   108   m_face.getIoService().post([
self = 
std::move(m_this)] {});
   112 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
   114   auto self = weakSelf.lock();
   115   return self == 
nullptr || 
self->m_this == 
nullptr;
   119 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest, 
bool isRetransmission)
   125   if (isRetransmission) {
   129   sendInterest(0, interest, isRetransmission);
   133 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
   135   if (checkAllSegmentsReceived()) {
   137     return finalizeFetch();
   140   int64_t availableWindowSize = 
static_cast<int64_t
>(m_cwnd) - m_nSegmentsInFlight;
   141   std::vector<std::pair<uint64_t, bool>> segmentsToRequest; 
   143   while (availableWindowSize > 0) {
   144     if (!m_retxQueue.empty()) {
   145       auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
   147       if (pendingSegmentIt == m_pendingSegments.end()) {
   151       BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
   152       segmentsToRequest.emplace_back(pendingSegmentIt->first, 
true);
   154     else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
   155       if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
   160       segmentsToRequest.emplace_back(m_nextSegmentNum++, 
false);
   165     availableWindowSize--;
   168   for (
const auto& segment : segmentsToRequest) {
   170     interest.
setName(
Name(m_versionedDataName).appendSegment(segment.first));
   175     sendInterest(segment.first, interest, segment.second);
   180 SegmentFetcher::sendInterest(uint64_t segNum, 
const Interest& interest, 
bool isRetransmission)
   182   weak_ptr<SegmentFetcher> weakSelf = m_this;
   184   ++m_nSegmentsInFlight;
   185   auto pendingInterest = m_face.expressInterest(interest,
   186     [
this, weakSelf] (
const Interest& interest, 
const Data& data) {
   187       afterSegmentReceivedCb(interest, data, weakSelf);
   190       afterNackReceivedCb(interest, nack, weakSelf);
   194   auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
   195   auto timeoutEvent = m_scheduler.schedule(timeout, [
this, interest, weakSelf] {
   196     afterTimeoutCb(interest, weakSelf);
   199   if (isRetransmission) {
   200     updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
   205                                 pendingInterest, timeoutEvent};
   206   bool isNew = m_pendingSegments.emplace(segNum, 
std::move(pendingSegment)).second;
   208   m_highInterest = segNum;
   212 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest, 
const Data& data,
   213                                        const weak_ptr<SegmentFetcher>& weakSelf)
   215   if (shouldStop(weakSelf))
   218   BOOST_ASSERT(m_nSegmentsInFlight > 0);
   219   m_nSegmentsInFlight--;
   222   if (!currentSegmentComponent.
isSegment()) {
   226   uint64_t currentSegment = currentSegmentComponent.
toSegment();
   229   std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
   230   if (m_receivedSegments.size() > 0) {
   231     pendingSegmentIt = m_pendingSegments.find(currentSegment);
   234     pendingSegmentIt = m_pendingSegments.begin();
   237   if (pendingSegmentIt == m_pendingSegments.end()) {
   241   pendingSegmentIt->second.timeoutEvent.cancel();
   245   m_validator.validate(data,
   246                        bind(&SegmentFetcher::afterValidationSuccess, 
this, _1, origInterest,
   247                             pendingSegmentIt, weakSelf),
   248                        bind(&SegmentFetcher::afterValidationFailure, 
this, _1, _2, weakSelf));
   252 SegmentFetcher::afterValidationSuccess(
const Data& data, 
const Interest& origInterest,
   253                                        std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
   254                                        const weak_ptr<SegmentFetcher>& weakSelf)
   256   if (shouldStop(weakSelf))
   268   if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
   269     BOOST_ASSERT(m_nSegmentsInFlight >= 0);
   270     m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
   271                                   static_cast<size_t>(m_nSegmentsInFlight) + 1);
   275   m_pendingSegments.erase(pendingSegmentIt);
   278   auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
   279                                                       std::forward_as_tuple(currentSegment),
   282             receivedSegmentIt.first->second.begin());
   289                          "Received FinalBlockId did not contain a segment component");
   292     if (data.
getFinalBlock()->toSegment() + 1 != 
static_cast<uint64_t
>(m_nSegments)) {
   294       cancelExcessInFlightSegments();
   298   if (m_receivedSegments.size() == 1) {
   300     if (currentSegment == 0) {
   306   if (m_highData < currentSegment) {
   307     m_highData = currentSegment;
   317   fetchSegmentsInWindow(origInterest);
   321 SegmentFetcher::afterValidationFailure(
const Data& data,
   323                                        const weak_ptr<SegmentFetcher>& weakSelf)
   325   if (shouldStop(weakSelf))
   332 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest, 
const lp::Nack& nack,
   333                                     const weak_ptr<SegmentFetcher>& weakSelf)
   335   if (shouldStop(weakSelf))
   340   BOOST_ASSERT(m_nSegmentsInFlight > 0);
   341   m_nSegmentsInFlight--;
   346       afterNackOrTimeout(origInterest);
   355 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
   356                                const weak_ptr<SegmentFetcher>& weakSelf)
   358   if (shouldStop(weakSelf))
   363   BOOST_ASSERT(m_nSegmentsInFlight > 0);
   364   m_nSegmentsInFlight--;
   365   afterNackOrTimeout(origInterest);
   369 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
   377   std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
   378   BOOST_ASSERT(m_pendingSegments.size() > 0);
   380     BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.
toSegment()) > 0);
   381     pendingSegmentIt = m_pendingSegments.find(lastNameComponent.
toSegment());
   384     BOOST_ASSERT(m_pendingSegments.size() > 0);
   385     pendingSegmentIt = m_pendingSegments.begin();
   389   pendingSegmentIt->second.timeoutEvent.cancel();
   390   pendingSegmentIt->second.state = SegmentState::InRetxQueue;
   392   m_rttEstimator.backoffRto();
   394   if (m_receivedSegments.size() == 0) {
   396     fetchFirstSegment(origInterest, 
true);
   400     m_retxQueue.push(pendingSegmentIt->first);
   401     fetchSegmentsInWindow(origInterest);
   406 SegmentFetcher::finalizeFetch()
   411   BOOST_ASSERT(m_receivedSegments.size() >= 
static_cast<uint64_t
>(m_nSegments));
   413   for (int64_t i = 0; i < m_nSegments; i++) {
   414     buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
   422 SegmentFetcher::windowIncrease()
   424   if (m_options.useConstantCwnd) {
   425     BOOST_ASSERT(m_cwnd == m_options.initCwnd);
   429   if (m_cwnd < m_ssthresh) {
   430     m_cwnd += m_options.aiStep; 
   433     m_cwnd += m_options.aiStep / std::floor(m_cwnd); 
   438 SegmentFetcher::windowDecrease()
   440   if (m_options.disableCwa || m_highData > m_recPoint) {
   441     m_recPoint = m_highInterest;
   443     if (m_options.useConstantCwnd) {
   444       BOOST_ASSERT(m_cwnd == m_options.initCwnd);
   449     m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); 
   450     m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
   455 SegmentFetcher::signalError(uint32_t code, 
const std::string& msg)
   462 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
   466   auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
   467   BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
   468   BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
   469   pendingSegmentIt->second.state = SegmentState::Retransmitted;
   470   pendingSegmentIt->second.hdl = pendingInterest; 
   471   pendingSegmentIt->second.timeoutEvent = timeoutEvent;
   475 SegmentFetcher::cancelExcessInFlightSegments()
   477   for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
   478     if (it->first >= static_cast<uint64_t>(m_nSegments)) {
   479       it = m_pendingSegments.erase(it); 
   480       BOOST_ASSERT(m_nSegmentsInFlight > 0);
   481       m_nSegmentsInFlight--;
   490 SegmentFetcher::checkAllSegmentsReceived()
   492   bool haveReceivedAllSegments = 
false;
   494   if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
   495     haveReceivedAllSegments = 
true;
   497     for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
   498       if (m_receivedSegments.count(i) == 0) {
   500         haveReceivedAllSegments = 
false;
   505   return haveReceivedAllSegments;
   509 SegmentFetcher::getEstimatedRto()
   513   return std::min(m_options.maxTimeout,
   514                   time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
 PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name. 
const Name & getName() const
Get name. 
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element. 
const Block & getContent() const
Get Content. 
const Component & get(ssize_t i) const
Returns an immutable reference to the component at the specified index. 
static time_point now() noexcept
An unrecoverable Nack was received during retrieval. 
void refreshNonce()
Change nonce value. 
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emits upon successful retrieval of the complete data. 
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments 
Utility class to fetch the latest version of a segmented object. 
size_t value_size() const noexcept
Return the size of TLV-VALUE, aka TLV-LENGTH. 
Represents an Interest packet. 
void stop()
Stops fetching. 
A handle for a scheduled event. 
represents a Network Nack 
NackReason getReason() const
Buffer::const_iterator value_begin() const
Get begin iterator of TLV-VALUE. 
double aiStep
additive increase step (in segments) 
uint64_t getCongestionMark() const
get the value of the CongestionMark tag 
One of the retrieved segments failed user-provided validation. 
A handle of pending Interest. 
bool isSegment() const
Check if the component is a segment number per NDN naming conventions. 
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching. 
Signal< SegmentFetcher > afterSegmentNacked
Emits whenever an Interest for a data segment is nacked. 
Provide a communication channel with local or remote NDN forwarder. 
Signal< SegmentFetcher, Data > afterSegmentValidated
Emits whenever a received data segment has been successfully validated. 
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
Buffer::const_iterator value_end() const
Get end iterator of TLV-VALUE. 
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
double initSsthresh
initial slow start threshold 
double initCwnd
initial congestion window size 
Represents a name component. 
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer. 
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions. 
Signal< SegmentFetcher, Data > afterSegmentReceived
Emits whenever a data segment received. 
Signal< SegmentFetcher > afterSegmentTimedOut
Emits whenever an Interest for a data segment times out. 
RttEstimator::Options rttOptions
options for RTT estimator 
Validation error code and optional detailed error message. 
const Name & getName() const noexcept
Interest & setInterestLifetime(time::milliseconds lifetime)
Set the Interest's lifetime. 
implements an output stream that constructs ndn::Buffer 
Signal< SegmentFetcher, uint32_t, std::string > onError
Emits when the retrieval could not be completed due to an error. 
Represents a Data packet. 
A received FinalBlockId did not contain a segment component. 
const optional< name::Component > & getFinalBlock() const
Interface for validating data and interest packets. 
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element. 
Interest & setName(const Name &name)
Set the Interest's name.