30 #include <boost/asio/io_service.hpp> 
   31 #include <boost/lexical_cast.hpp> 
   32 #include <boost/range/adaptor/map.hpp> 
   39 constexpr 
double SegmentFetcher::MIN_SSTHRESH;
 
   45     NDN_THROW(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
 
   49     NDN_THROW(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
 
   53     NDN_THROW(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
 
   56   if (mdCoef < 0.0 || mdCoef > 1.0) {
 
   57     NDN_THROW(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
 
   61 SegmentFetcher::SegmentFetcher(
Face& face,
 
   66   , m_scheduler(m_face.getIoService())
 
   67   , m_validator(validator)
 
   69   , m_timeLastSegmentReceived(time::steady_clock::now())
 
   70   , m_cwnd(options.initCwnd)
 
   71   , m_ssthresh(options.initSsthresh)
 
   76 shared_ptr<SegmentFetcher>
 
   82   shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
 
   83   fetcher->m_this = fetcher;
 
   84   fetcher->fetchFirstSegment(baseInterest, 
false);
 
   95   m_pendingSegments.clear(); 
 
   96   m_face.
getIoService().post([
self = std::move(m_this)] {});
 
  100 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
 
  102   auto self = weakSelf.lock();
 
  103   return self == 
nullptr || 
self->m_this == 
nullptr;
 
  107 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest, 
bool isRetransmission)
 
  110   interest.setCanBePrefix(
true);
 
  111   interest.setMustBeFresh(
true);
 
  113   if (isRetransmission) {
 
  114     interest.refreshNonce();
 
  117   sendInterest(0, interest, isRetransmission);
 
  121 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
 
  123   if (checkAllSegmentsReceived()) {
 
  125     return finalizeFetch();
 
  128   int64_t availableWindowSize;
 
  130     availableWindowSize = std::min<int64_t>(m_cwnd, m_options.
flowControlWindow - m_segmentBuffer.size());
 
  133     availableWindowSize = 
static_cast<int64_t
>(m_cwnd);
 
  135   availableWindowSize -= m_nSegmentsInFlight;
 
  137   std::vector<std::pair<uint64_t, bool>> segmentsToRequest; 
 
  139   while (availableWindowSize > 0) {
 
  140     if (!m_retxQueue.empty()) {
 
  141       auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
 
  143       if (pendingSegmentIt == m_pendingSegments.end()) {
 
  147       BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
 
  148       segmentsToRequest.emplace_back(pendingSegmentIt->first, 
true);
 
  150     else if (m_nSegments == 0 || m_nextSegmentNum < 
static_cast<uint64_t
>(m_nSegments)) {
 
  151       if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
 
  156       segmentsToRequest.emplace_back(m_nextSegmentNum++, 
false);
 
  161     availableWindowSize--;
 
  164   for (
const auto& segment : segmentsToRequest) {
 
  166     interest.setName(
Name(m_versionedDataName).appendSegment(segment.first));
 
  167     interest.setCanBePrefix(
false);
 
  168     interest.setMustBeFresh(
false);
 
  170     interest.refreshNonce();
 
  171     sendInterest(segment.first, interest, segment.second);
 
  176 SegmentFetcher::sendInterest(uint64_t segNum, 
const Interest& interest, 
bool isRetransmission)
 
  178   weak_ptr<SegmentFetcher> weakSelf = m_this;
 
  180   ++m_nSegmentsInFlight;
 
  182     [
this, weakSelf] (
const Interest& interest, 
const Data& data) {
 
  183       afterSegmentReceivedCb(interest, data, weakSelf);
 
  186       afterNackReceivedCb(interest, nack, weakSelf);
 
  191   auto timeoutEvent = m_scheduler.
schedule(timeout, [
this, interest, weakSelf] {
 
  192     afterTimeoutCb(interest, weakSelf);
 
  195   if (isRetransmission) {
 
  196     updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
 
  201                                 pendingInterest, timeoutEvent};
 
  202   bool isNew = m_pendingSegments.emplace(segNum, std::move(pendingSegment)).second;
 
  204   m_highInterest = segNum;
 
  208 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest, 
const Data& data,
 
  209                                        const weak_ptr<SegmentFetcher>& weakSelf)
 
  211   if (shouldStop(weakSelf))
 
  214   BOOST_ASSERT(m_nSegmentsInFlight > 0);
 
  215   m_nSegmentsInFlight--;
 
  217   name::Component currentSegmentComponent = data.getName().get(-1);
 
  218   if (!currentSegmentComponent.isSegment()) {
 
  222   uint64_t currentSegment = currentSegmentComponent.toSegment();
 
  225   std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
 
  226   if (m_receivedSegments.size() > 0) {
 
  227     pendingSegmentIt = m_pendingSegments.find(currentSegment);
 
  230     pendingSegmentIt = m_pendingSegments.begin();
 
  233   if (pendingSegmentIt == m_pendingSegments.end()) {
 
  237   pendingSegmentIt->second.timeoutEvent.cancel();
 
  242     [=] (
const Data& d) { afterValidationSuccess(d, origInterest, pendingSegmentIt, weakSelf); },
 
  243     [=] (
const Data& d, 
const auto& error) { afterValidationFailure(d, error, weakSelf); });
 
  247 SegmentFetcher::afterValidationSuccess(
const Data& data, 
const Interest& origInterest,
 
  248                                        std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
 
  249                                        const weak_ptr<SegmentFetcher>& weakSelf)
 
  251   if (shouldStop(weakSelf))
 
  261   uint64_t currentSegment = data.getName().get(-1).toSegment();
 
  262   m_receivedSegments.insert(currentSegment);
 
  265   if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
 
  266     BOOST_ASSERT(m_nSegmentsInFlight >= 0);
 
  267     m_rttEstimator.
addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
 
  268                                   static_cast<size_t>(m_nSegmentsInFlight) + 1);
 
  272   m_pendingSegments.erase(pendingSegmentIt);
 
  275   auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
 
  276                                                    std::forward_as_tuple(currentSegment),
 
  277                                                    std::forward_as_tuple(data.getContent().value_size()));
 
  278   std::copy(data.getContent().value_begin(), data.getContent().value_end(),
 
  279             receivedSegmentIt.first->second.begin());
 
  280   m_nBytesReceived += data.getContent().value_size();
 
  283   if (data.getFinalBlock()) {
 
  284     if (!data.getFinalBlock()->isSegment()) {
 
  286                          "Received FinalBlockId did not contain a segment component");
 
  289     if (data.getFinalBlock()->toSegment() + 1 != 
static_cast<uint64_t
>(m_nSegments)) {
 
  290       m_nSegments = data.getFinalBlock()->toSegment() + 1;
 
  291       cancelExcessInFlightSegments();
 
  295   if (m_options.
inOrder && m_nextSegmentInOrder == currentSegment) {
 
  297       onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
 
  298       m_segmentBuffer.erase(m_nextSegmentInOrder++);
 
  299     } 
while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
 
  302   if (m_receivedSegments.size() == 1) {
 
  303     m_versionedDataName = data.getName().
getPrefix(-1);
 
  304     if (currentSegment == 0) {
 
  310   if (m_highData < currentSegment) {
 
  311     m_highData = currentSegment;
 
  321   fetchSegmentsInWindow(origInterest);
 
  325 SegmentFetcher::afterValidationFailure(
const Data&,
 
  326                                        const security::ValidationError& error,
 
  327                                        const weak_ptr<SegmentFetcher>& weakSelf)
 
  329   if (shouldStop(weakSelf))
 
  336 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest, 
const lp::Nack& nack,
 
  337                                     const weak_ptr<SegmentFetcher>& weakSelf)
 
  339   if (shouldStop(weakSelf))
 
  344   BOOST_ASSERT(m_nSegmentsInFlight > 0);
 
  345   m_nSegmentsInFlight--;
 
  347   switch (nack.getReason()) {
 
  350       afterNackOrTimeout(origInterest);
 
  359 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
 
  360                                const weak_ptr<SegmentFetcher>& weakSelf)
 
  362   if (shouldStop(weakSelf))
 
  367   BOOST_ASSERT(m_nSegmentsInFlight > 0);
 
  368   m_nSegmentsInFlight--;
 
  369   afterNackOrTimeout(origInterest);
 
  373 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
 
  380   name::Component lastNameComponent = origInterest.getName().get(-1);
 
  381   std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
 
  382   BOOST_ASSERT(m_pendingSegments.size() > 0);
 
  383   if (lastNameComponent.isSegment()) {
 
  384     BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
 
  385     pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
 
  388     BOOST_ASSERT(m_pendingSegments.size() > 0);
 
  389     pendingSegmentIt = m_pendingSegments.begin();
 
  393   pendingSegmentIt->second.timeoutEvent.cancel();
 
  394   pendingSegmentIt->second.state = SegmentState::InRetxQueue;
 
  398   if (m_receivedSegments.size() == 0) {
 
  400     fetchFirstSegment(origInterest, 
true);
 
  404     m_retxQueue.push(pendingSegmentIt->first);
 
  405     fetchSegmentsInWindow(origInterest);
 
  410 SegmentFetcher::finalizeFetch()
 
  419     BOOST_ASSERT(m_receivedSegments.size() >= 
static_cast<uint64_t
>(m_nSegments));
 
  421     for (int64_t i = 0; i < m_nSegments; i++) {
 
  422       buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
 
  430 SegmentFetcher::windowIncrease()
 
  433     BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
 
  437   if (m_cwnd < m_ssthresh) {
 
  438     m_cwnd += m_options.
aiStep; 
 
  441     m_cwnd += m_options.
aiStep / std::floor(m_cwnd); 
 
  446 SegmentFetcher::windowDecrease()
 
  448   if (m_options.
disableCwa || m_highData > m_recPoint) {
 
  449     m_recPoint = m_highInterest;
 
  452       BOOST_ASSERT(m_cwnd == m_options.
initCwnd);
 
  457     m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.
mdCoef); 
 
  463 SegmentFetcher::signalError(uint32_t code, 
const std::string& msg)
 
  470 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
 
  471                                            const PendingInterestHandle& pendingInterest,
 
  472                                            scheduler::EventId timeoutEvent)
 
  474   auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
 
  475   BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
 
  476   BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
 
  477   pendingSegmentIt->second.state = SegmentState::Retransmitted;
 
  478   pendingSegmentIt->second.hdl = pendingInterest; 
 
  479   pendingSegmentIt->second.timeoutEvent = timeoutEvent;
 
  483 SegmentFetcher::cancelExcessInFlightSegments()
 
  485   for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
 
  486     if (it->first >= 
static_cast<uint64_t
>(m_nSegments)) {
 
  487       it = m_pendingSegments.erase(it); 
 
  488       BOOST_ASSERT(m_nSegmentsInFlight > 0);
 
  489       m_nSegmentsInFlight--;
 
  498 SegmentFetcher::checkAllSegmentsReceived()
 
  500   bool haveReceivedAllSegments = 
false;
 
  502   if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
 
  503     haveReceivedAllSegments = 
true;
 
  505     for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
 
  506       if (m_receivedSegments.count(i) == 0) {
 
  508         haveReceivedAllSegments = 
false;
 
  513   return haveReceivedAllSegments;
 
  517 SegmentFetcher::getEstimatedRto()
 
  522                   time::duration_cast<time::milliseconds>(m_rttEstimator.
getEstimatedRto()));
 
Provide a communication channel with local or remote NDN forwarder.
 
boost::asio::io_service & getIoService()
Returns a reference to the io_service used by this face.
 
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
 
Represents an Interest packet.
 
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
 
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
 
Interface for validating data and interest packets.
 
void validate(const Data &data, const DataValidationSuccessCallback &successCb, const DataValidationFailureCallback &failureCb)
Asynchronously validate data.
 
static time_point now() noexcept
 
void backoffRto()
Backoff RTO by a factor of Options::rtoBackoffMultiplier.
 
void addMeasurement(time::nanoseconds rtt, size_t nExpectedSamples=1)
Records a new RTT measurement.
 
time::nanoseconds getEstimatedRto() const
Returns the estimated RTO value.
 
bool useConstantInterestTimeout
if true, Interest timeout is kept at maxTimeout
 
double mdCoef
multiplicative decrease coefficient
 
double aiStep
additive increase step (in segments)
 
double initCwnd
initial congestion window size
 
bool disableCwa
disable Conservative Window Adaptation
 
bool resetCwndToInit
reduce cwnd to initCwnd when loss event occurs
 
time::milliseconds interestLifetime
lifetime of sent Interests - independent of Interest timeout
 
bool useConstantCwnd
if true, window size is kept at initCwnd
 
bool inOrder
true for 'in order' mode, false for 'block' mode
 
size_t flowControlWindow
maximum number of segments stored in the reorder buffer
 
bool ignoreCongMarks
disable window decrease after congestion mark received
 
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
 
Utility class to fetch the latest version of a segmented object.
 
Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in 'in order' mode.
 
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::Validator &validator, const Options &options=Options())
Initiates segment fetching.
 
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
 
Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
 
Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
 
@ INTEREST_TIMEOUT
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
 
@ DATA_HAS_NO_SEGMENT
One of the retrieved Data packets lacked a segment number in the last Name component (excl....
 
@ FINALBLOCKID_NOT_SEGMENT
A received FinalBlockId did not contain a segment component.
 
@ SEGMENT_VALIDATION_FAIL
One of the retrieved segments failed user-provided validation.
 
@ NACK_ERROR
An unrecoverable Nack was received during retrieval.
 
Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
 
void stop()
Stops fetching.
 
Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
 
Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
 
Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
 
boost::chrono::milliseconds milliseconds