ndn-cxx: NDN C++ Library 0.9.0-33-g832ea91d
Loading...
Searching...
No Matches
notification-subscriber.cpp
Go to the documentation of this file.
1/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2024 Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
12 *
13 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
14 * terms of the GNU Lesser General Public License as published by the Free Software
15 * Foundation, either version 3 of the License, or (at your option) any later version.
16 *
17 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
18 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
19 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
20 *
21 * You should have received copies of the GNU General Public License and GNU Lesser
22 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
23 * <http://www.gnu.org/licenses/>.
24 *
25 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
26 */
27
30
31#include <cmath>
32
33namespace ndn::util {
34
36 time::milliseconds interestLifetime)
37 : m_face(face)
38 , m_prefix(prefix)
39 , m_scheduler(face.getIoContext())
40 , m_interestLifetime(interestLifetime)
41{
42}
43
45
46void
48{
49 if (m_isRunning)
50 return;
51
52 m_isRunning = true;
53 sendInitialInterest();
54}
55
56void
58{
59 if (!m_isRunning)
60 return;
61
62 m_isRunning = false;
63 m_lastInterest.cancel();
64}
65
66void
67NotificationSubscriberBase::sendInitialInterest()
68{
69 if (shouldStop())
70 return;
71
72 auto interest = make_shared<Interest>(m_prefix);
73 interest->setCanBePrefix(true);
74 interest->setMustBeFresh(true);
75 interest->setInterestLifetime(m_interestLifetime);
76 sendInterest(*interest);
77}
78
79void
80NotificationSubscriberBase::sendNextInterest()
81{
82 if (shouldStop())
83 return;
84
85 Name nextName = m_prefix;
86 nextName.appendSequenceNumber(m_lastSequenceNum + 1);
87
88 auto interest = make_shared<Interest>(nextName);
89 interest->setInterestLifetime(m_interestLifetime);
90 sendInterest(*interest);
91}
92
93void
94NotificationSubscriberBase::sendInterest(const Interest& interest)
95{
96 m_lastInterest = m_face.expressInterest(interest,
97 [this] (const auto&, const auto& d) { afterReceiveData(d); },
98 [this] (const auto&, const auto& n) { afterReceiveNack(n); },
99 [this] (const auto&) { afterTimeout(); });
100}
101
102bool
103NotificationSubscriberBase::shouldStop()
104{
105 if (!m_isRunning)
106 return true;
107
108 if (!hasSubscriber() && onNack.isEmpty()) {
109 stop();
110 return true;
111 }
112 return false;
113}
114
115void
116NotificationSubscriberBase::afterReceiveData(const Data& data)
117{
118 if (shouldStop())
119 return;
120
121 try {
122 m_lastSequenceNum = data.getName().get(-1).toSequenceNumber();
123 }
124 catch (const tlv::Error&) {
125 onDecodeError(data);
126 sendInitialInterest();
127 return;
128 }
129
130 if (!decodeAndDeliver(data)) {
131 onDecodeError(data);
132 sendInitialInterest();
133 return;
134 }
135
136 sendNextInterest();
137}
138
139void
140NotificationSubscriberBase::afterReceiveNack(const lp::Nack& nack)
141{
142 if (shouldStop())
143 return;
144
145 onNack(nack);
146
147 auto delay = exponentialBackoff(nack);
148 m_nackEvent = m_scheduler.schedule(delay, [this] { sendInitialInterest(); });
149}
150
151void
152NotificationSubscriberBase::afterTimeout()
153{
154 if (shouldStop())
155 return;
156
157 onTimeout();
158 sendInitialInterest();
159}
160
162NotificationSubscriberBase::exponentialBackoff(const lp::Nack& nack)
163{
164 uint64_t nackSequenceNum = 0;
165 try {
166 nackSequenceNum = nack.getInterest().getName().get(-1).toSequenceNumber();
167 }
168 catch (const tlv::Error&) {
169 // pass
170 }
171
172 if (m_lastNackSequenceNum == nackSequenceNum) {
173 ++m_attempts;
174 }
175 else {
176 m_attempts = 1;
177 }
178
179 m_lastNackSequenceNum = nackSequenceNum;
180
181 return time::milliseconds(static_cast<time::milliseconds::rep>(std::pow(2, m_attempts) * 100 +
182 random::generateWord32() % 100));
183}
184
185} // namespace ndn::util
Provide a communication channel with local or remote NDN forwarder.
Definition face.hpp:91
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express an Interest.
Definition face.cpp:151
Represents an absolute name.
Definition name.hpp:45
Name & appendSequenceNumber(uint64_t seqNo)
Append a sequence number component.
Definition name.hpp:476
void cancel()
Cancel the operation.
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Definition scheduler.cpp:78
void start()
Start or resume receiving notifications.
void stop()
Stop receiving notifications.
ndn::signal::Signal< NotificationSubscriberBase > onTimeout
Fires when no Notification is received within getInterestLifetime() period.
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
Construct a NotificationSubscriber.
ndn::signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
Fires when a Nack is received.
ndn::signal::Signal< NotificationSubscriberBase, Data > onDecodeError
Fires when a Data packet in the notification stream cannot be decoded as a Notification.
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32).
Definition random.cpp:68
::boost::chrono::milliseconds milliseconds
Definition time.hpp:52