notification-subscriber.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2019 Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
12  *
13  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
14  * terms of the GNU Lesser General Public License as published by the Free Software
15  * Foundation, either version 3 of the License, or (at your option) any later version.
16  *
17  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
18  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
19  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
20  *
21  * You should have received copies of the GNU General Public License and GNU Lesser
22  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
23  * <http://www.gnu.org/licenses/>.
24  *
25  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
26  */
27 
29 #include "ndn-cxx/util/random.hpp"
30 
31 #include <cmath>
32 
33 namespace ndn {
34 namespace util {
35 
37  time::milliseconds interestLifetime)
38  : m_face(face)
39  , m_prefix(prefix)
40  , m_isRunning(false)
41  , m_lastSequenceNum(std::numeric_limits<uint64_t>::max())
42  , m_lastNackSequenceNum(std::numeric_limits<uint64_t>::max())
43  , m_attempts(1)
44  , m_scheduler(face.getIoService())
45  , m_interestLifetime(interestLifetime)
46 {
47 }
48 
50 
51 void
53 {
54  if (m_isRunning) // already running
55  return;
56  m_isRunning = true;
57 
58  sendInitialInterest();
59 }
60 
61 void
63 {
64  if (!m_isRunning) // not running
65  return;
66  m_isRunning = false;
67 
68  m_lastInterest.cancel();
69 }
70 
71 void
72 NotificationSubscriberBase::sendInitialInterest()
73 {
74  if (shouldStop())
75  return;
76 
77  auto interest = make_shared<Interest>(m_prefix);
78  interest->setCanBePrefix(true);
79  interest->setMustBeFresh(true);
80  interest->setInterestLifetime(m_interestLifetime);
81  sendInterest(*interest);
82 }
83 
84 void
85 NotificationSubscriberBase::sendNextInterest()
86 {
87  if (shouldStop())
88  return;
89 
90  Name nextName = m_prefix;
91  nextName.appendSequenceNumber(m_lastSequenceNum + 1);
92 
93  auto interest = make_shared<Interest>(nextName);
94  interest->setCanBePrefix(false);
95  interest->setInterestLifetime(m_interestLifetime);
96  sendInterest(*interest);
97 }
98 
99 void
100 NotificationSubscriberBase::sendInterest(const Interest& interest)
101 {
102  m_lastInterest = m_face.expressInterest(interest,
103  [this] (const auto&, const auto& d) { this->afterReceiveData(d); },
104  [this] (const auto&, const auto& n) { this->afterReceiveNack(n); },
105  [this] (const auto&) { this->afterTimeout(); });
106 }
107 
108 bool
109 NotificationSubscriberBase::shouldStop()
110 {
111  if (!m_isRunning)
112  return true;
113 
114  if (!hasSubscriber() && onNack.isEmpty()) {
115  stop();
116  return true;
117  }
118  return false;
119 }
120 
121 void
122 NotificationSubscriberBase::afterReceiveData(const Data& data)
123 {
124  if (shouldStop())
125  return;
126 
127  try {
128  m_lastSequenceNum = data.getName().get(-1).toSequenceNumber();
129  }
130  catch (const tlv::Error&) {
131  onDecodeError(data);
132  sendInitialInterest();
133  return;
134  }
135 
136  if (!decodeAndDeliver(data)) {
137  onDecodeError(data);
138  sendInitialInterest();
139  return;
140  }
141 
142  sendNextInterest();
143 }
144 
145 void
146 NotificationSubscriberBase::afterReceiveNack(const lp::Nack& nack)
147 {
148  if (shouldStop())
149  return;
150 
151  onNack(nack);
152 
153  time::milliseconds delay = exponentialBackoff(nack);
154  m_nackEvent = m_scheduler.schedule(delay, [this] { sendInitialInterest(); });
155 }
156 
157 void
158 NotificationSubscriberBase::afterTimeout()
159 {
160  if (shouldStop())
161  return;
162 
163  onTimeout();
164 
165  sendInitialInterest();
166 }
167 
169 NotificationSubscriberBase::exponentialBackoff(lp::Nack nack)
170 {
171  uint64_t nackSequenceNum;
172  try {
173  nackSequenceNum = nack.getInterest().getName().get(-1).toSequenceNumber();
174  }
175  catch (const tlv::Error&) {
176  nackSequenceNum = 0;
177  }
178 
179  if (m_lastNackSequenceNum == nackSequenceNum) {
180  ++m_attempts;
181  }
182  else {
183  m_attempts = 1;
184  }
185 
186  m_lastNackSequenceNum = nackSequenceNum;
187 
188  return time::milliseconds(static_cast<time::milliseconds::rep>(std::pow(2, m_attempts) * 100 +
189  random::generateWord32() % 100));
190 }
191 
192 } // namespace util
193 } // namespace ndn
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:91
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
Definition: face.cpp:164
Represents an absolute name.
Definition: name.hpp:46
Name & appendSequenceNumber(uint64_t seqNo)
Append a sequence number component.
Definition: name.hpp:458
void cancel()
Cancel the operation.
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:92
void start()
start or resume receiving notifications
signal::Signal< NotificationSubscriberBase, Data > onDecodeError
fires when a Data packet in the Notification Stream cannot be decoded as Notification
void stop()
stop receiving notifications
signal::Signal< NotificationSubscriberBase > onTimeout
fires when no Notification is received within .getInterestLifetime period
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
construct a NotificationSubscriber
signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
fires when a NACK is received
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32)
Definition: random.cpp:66
boost::chrono::milliseconds milliseconds
Definition: time.hpp:48
@ Data
Definition: tlv.hpp:66
@ Interest
Definition: tlv.hpp:65
Definition: data.cpp:25