notification-subscriber.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2024 Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
12  *
13  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
14  * terms of the GNU Lesser General Public License as published by the Free Software
15  * Foundation, either version 3 of the License, or (at your option) any later version.
16  *
17  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
18  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
19  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
20  *
21  * You should have received copies of the GNU General Public License and GNU Lesser
22  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
23  * <http://www.gnu.org/licenses/>.
24  *
25  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
26  */
27 
29 #include "ndn-cxx/util/random.hpp"
30 
31 #include <cmath>
32 
33 namespace ndn::util {
34 
36  time::milliseconds interestLifetime)
37  : m_face(face)
38  , m_prefix(prefix)
39  , m_scheduler(face.getIoContext())
40  , m_interestLifetime(interestLifetime)
41 {
42 }
43 
45 
46 void
48 {
49  if (m_isRunning)
50  return;
51 
52  m_isRunning = true;
53  sendInitialInterest();
54 }
55 
56 void
58 {
59  if (!m_isRunning)
60  return;
61 
62  m_isRunning = false;
63  m_lastInterest.cancel();
64 }
65 
66 void
67 NotificationSubscriberBase::sendInitialInterest()
68 {
69  if (shouldStop())
70  return;
71 
72  auto interest = make_shared<Interest>(m_prefix);
73  interest->setCanBePrefix(true);
74  interest->setMustBeFresh(true);
75  interest->setInterestLifetime(m_interestLifetime);
76  sendInterest(*interest);
77 }
78 
79 void
80 NotificationSubscriberBase::sendNextInterest()
81 {
82  if (shouldStop())
83  return;
84 
85  Name nextName = m_prefix;
86  nextName.appendSequenceNumber(m_lastSequenceNum + 1);
87 
88  auto interest = make_shared<Interest>(nextName);
89  interest->setInterestLifetime(m_interestLifetime);
90  sendInterest(*interest);
91 }
92 
93 void
94 NotificationSubscriberBase::sendInterest(const Interest& interest)
95 {
96  m_lastInterest = m_face.expressInterest(interest,
97  [this] (const auto&, const auto& d) { afterReceiveData(d); },
98  [this] (const auto&, const auto& n) { afterReceiveNack(n); },
99  [this] (const auto&) { afterTimeout(); });
100 }
101 
102 bool
103 NotificationSubscriberBase::shouldStop()
104 {
105  if (!m_isRunning)
106  return true;
107 
108  if (!hasSubscriber() && onNack.isEmpty()) {
109  stop();
110  return true;
111  }
112  return false;
113 }
114 
115 void
116 NotificationSubscriberBase::afterReceiveData(const Data& data)
117 {
118  if (shouldStop())
119  return;
120 
121  try {
122  m_lastSequenceNum = data.getName().get(-1).toSequenceNumber();
123  }
124  catch (const tlv::Error&) {
125  onDecodeError(data);
126  sendInitialInterest();
127  return;
128  }
129 
130  if (!decodeAndDeliver(data)) {
131  onDecodeError(data);
132  sendInitialInterest();
133  return;
134  }
135 
136  sendNextInterest();
137 }
138 
139 void
140 NotificationSubscriberBase::afterReceiveNack(const lp::Nack& nack)
141 {
142  if (shouldStop())
143  return;
144 
145  onNack(nack);
146 
147  auto delay = exponentialBackoff(nack);
148  m_nackEvent = m_scheduler.schedule(delay, [this] { sendInitialInterest(); });
149 }
150 
151 void
152 NotificationSubscriberBase::afterTimeout()
153 {
154  if (shouldStop())
155  return;
156 
157  onTimeout();
158  sendInitialInterest();
159 }
160 
162 NotificationSubscriberBase::exponentialBackoff(const lp::Nack& nack)
163 {
164  uint64_t nackSequenceNum = 0;
165  try {
166  nackSequenceNum = nack.getInterest().getName().get(-1).toSequenceNumber();
167  }
168  catch (const tlv::Error&) {
169  // pass
170  }
171 
172  if (m_lastNackSequenceNum == nackSequenceNum) {
173  ++m_attempts;
174  }
175  else {
176  m_attempts = 1;
177  }
178 
179  m_lastNackSequenceNum = nackSequenceNum;
180 
181  return time::milliseconds(static_cast<time::milliseconds::rep>(std::pow(2, m_attempts) * 100 +
182  random::generateWord32() % 100));
183 }
184 
185 } // namespace ndn::util
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:91
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express an Interest.
Definition: face.cpp:151
Represents an absolute name.
Definition: name.hpp:45
Name & appendSequenceNumber(uint64_t seqNo)
Append a sequence number component.
Definition: name.hpp:476
void cancel()
Cancel the operation.
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:78
void start()
Start or resume receiving notifications.
void stop()
Stop receiving notifications.
ndn::signal::Signal< NotificationSubscriberBase > onTimeout
Fires when no Notification is received within getInterestLifetime() period.
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
Construct a NotificationSubscriber.
ndn::signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
Fires when a Nack is received.
ndn::signal::Signal< NotificationSubscriberBase, Data > onDecodeError
Fires when a Data packet in the notification stream cannot be decoded as a Notification.
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32).
Definition: random.cpp:68
::boost::chrono::milliseconds milliseconds
Definition: time.hpp:52
@ Data
Definition: tlv.hpp:69
@ Interest
Definition: tlv.hpp:68