dispatcher.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2021 Regents of the University of California.
4  *
5  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6  *
7  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8  * terms of the GNU Lesser General Public License as published by the Free Software
9  * Foundation, either version 3 of the License, or (at your option) any later version.
10  *
11  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14  *
15  * You should have received copies of the GNU General Public License and GNU Lesser
16  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17  * <http://www.gnu.org/licenses/>.
18  *
19  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20  */
21 
23 #include "ndn-cxx/lp/tags.hpp"
24 #include "ndn-cxx/util/logger.hpp"
25 
27 
28 namespace ndn {
29 namespace mgmt {
30 
33 {
34  return [] (const Name& prefix,
35  const Interest& interest,
36  const ControlParameters* params,
37  const AcceptContinuation& accept,
38  const RejectContinuation& reject) {
39  accept("");
40  };
41 }
42 
43 Dispatcher::Dispatcher(Face& face, KeyChain& keyChain,
44  const security::SigningInfo& signingInfo,
45  size_t imsCapacity)
46  : m_face(face)
47  , m_keyChain(keyChain)
48  , m_signingInfo(signingInfo)
49  , m_storage(m_face.getIoService(), imsCapacity)
50 {
51 }
52 
53 Dispatcher::~Dispatcher() = default;
54 
55 void
56 Dispatcher::addTopPrefix(const Name& prefix, bool wantRegister,
57  const security::SigningInfo& signingInfo)
58 {
59  bool hasOverlap = std::any_of(m_topLevelPrefixes.begin(), m_topLevelPrefixes.end(),
60  [&prefix] (const auto& x) {
61  return x.first.isPrefixOf(prefix) || prefix.isPrefixOf(x.first);
62  });
63  if (hasOverlap) {
64  NDN_THROW(std::out_of_range("top-level prefix overlaps"));
65  }
66 
67  TopPrefixEntry& topPrefixEntry = m_topLevelPrefixes[prefix];
68 
69  if (wantRegister) {
70  topPrefixEntry.registeredPrefix = m_face.registerPrefix(prefix,
71  nullptr,
72  [] (const Name&, const std::string& reason) {
73  NDN_THROW(std::runtime_error("prefix registration failed: " + reason));
74  },
75  signingInfo);
76  }
77 
78  for (const auto& entry : m_handlers) {
79  Name fullPrefix = Name(prefix).append(entry.first);
80  auto filterHdl = m_face.setInterestFilter(fullPrefix,
81  [=, cb = entry.second] (const auto&, const auto& interest) {
82  cb(prefix, interest);
83  });
84  topPrefixEntry.interestFilters.emplace_back(std::move(filterHdl));
85  }
86 }
87 
88 void
90 {
91  m_topLevelPrefixes.erase(prefix);
92 }
93 
94 bool
95 Dispatcher::isOverlappedWithOthers(const PartialName& relPrefix) const
96 {
97  bool hasOverlapWithHandlers =
98  std::any_of(m_handlers.begin(), m_handlers.end(),
99  [&] (const auto& entry) {
100  return entry.first.isPrefixOf(relPrefix) || relPrefix.isPrefixOf(entry.first);
101  });
102  bool hasOverlapWithStreams =
103  std::any_of(m_streams.begin(), m_streams.end(),
104  [&] (const auto& entry) {
105  return entry.first.isPrefixOf(relPrefix) || relPrefix.isPrefixOf(entry.first);
106  });
107 
108  return hasOverlapWithHandlers || hasOverlapWithStreams;
109 }
110 
111 void
112 Dispatcher::afterAuthorizationRejected(RejectReply act, const Interest& interest)
113 {
114  if (act == RejectReply::STATUS403) {
115  sendControlResponse(ControlResponse(403, "authorization rejected"), interest);
116  }
117 }
118 
119 void
120 Dispatcher::queryStorage(const Name& prefix, const Interest& interest,
121  const InterestHandler& missContinuation)
122 {
123  auto data = m_storage.find(interest);
124  if (data == nullptr) {
125  // invoke missContinuation to process this Interest if the query fails.
126  if (missContinuation)
127  missContinuation(prefix, interest);
128  }
129  else {
130  // send the fetched data through face if query succeeds.
131  sendOnFace(*data);
132  }
133 }
134 
135 void
136 Dispatcher::sendData(const Name& dataName, const Block& content, const MetaInfo& metaInfo,
137  SendDestination option)
138 {
139  auto data = make_shared<Data>(dataName);
140  data->setContent(content).setMetaInfo(metaInfo).setFreshnessPeriod(1_s);
141 
142  m_keyChain.sign(*data, m_signingInfo);
143 
144  if (option == SendDestination::IMS || option == SendDestination::FACE_AND_IMS) {
145  lp::CachePolicy policy;
146  policy.setPolicy(lp::CachePolicyType::NO_CACHE);
147  data->setTag(make_shared<lp::CachePolicyTag>(policy));
148  m_storage.insert(*data, 1_s);
149  }
150 
151  if (option == SendDestination::FACE || option == SendDestination::FACE_AND_IMS) {
152  sendOnFace(*data);
153  }
154 }
155 
156 void
157 Dispatcher::sendOnFace(const Data& data)
158 {
159  try {
160  m_face.put(data);
161  }
162  catch (const Face::Error& e) {
163  NDN_LOG_ERROR("sendOnFace(" << data.getName() << "): " << e.what());
164  }
165 }
166 
167 void
168 Dispatcher::processControlCommandInterest(const Name& prefix,
169  const Name& relPrefix,
170  const Interest& interest,
171  const ControlParametersParser& parser,
172  const Authorization& authorization,
173  const AuthorizationAcceptedCallback& accepted,
174  const AuthorizationRejectedCallback& rejected)
175 {
176  // /<prefix>/<relPrefix>/<parameters>
177  size_t parametersLoc = prefix.size() + relPrefix.size();
178  const name::Component& pc = interest.getName().get(parametersLoc);
179 
180  shared_ptr<ControlParameters> parameters;
181  try {
182  parameters = parser(pc);
183  }
184  catch (const tlv::Error&) {
185  return;
186  }
187 
188  AcceptContinuation accept = [=] (const auto& req) { accepted(req, prefix, interest, parameters); };
189  RejectContinuation reject = [=] (RejectReply reply) { rejected(reply, interest); };
190  authorization(prefix, interest, parameters.get(), accept, reject);
191 }
192 
193 void
194 Dispatcher::processAuthorizedControlCommandInterest(const std::string& requester,
195  const Name& prefix,
196  const Interest& interest,
197  const shared_ptr<ControlParameters>& parameters,
198  const ValidateParameters& validateParams,
199  const ControlCommandHandler& handler)
200 {
201  if (validateParams(*parameters)) {
202  handler(prefix, interest, *parameters,
203  [=] (const auto& resp) { this->sendControlResponse(resp, interest); });
204  }
205  else {
206  sendControlResponse(ControlResponse(400, "failed in validating parameters"), interest);
207  }
208 }
209 
210 void
211 Dispatcher::sendControlResponse(const ControlResponse& resp, const Interest& interest, bool isNack)
212 {
213  MetaInfo metaInfo;
214  if (isNack) {
215  metaInfo.setType(tlv::ContentType_Nack);
216  }
217 
218  // control response is always sent out through the face
219  sendData(interest.getName(), resp.wireEncode(), metaInfo, SendDestination::FACE);
220 }
221 
222 void
224  Authorization authorize,
225  StatusDatasetHandler handle)
226 {
227  if (!m_topLevelPrefixes.empty()) {
228  NDN_THROW(std::domain_error("one or more top-level prefix has been added"));
229  }
230 
231  if (isOverlappedWithOthers(relPrefix)) {
232  NDN_THROW(std::out_of_range("status dataset name overlaps"));
233  }
234 
235  AuthorizationAcceptedCallback accepted =
236  std::bind(&Dispatcher::processAuthorizedStatusDatasetInterest, this, _2, _3, std::move(handle));
237  AuthorizationRejectedCallback rejected = [this] (auto&&... args) {
238  afterAuthorizationRejected(std::forward<decltype(args)>(args)...);
239  };
240 
241  // follow the general path if storage is a miss
242  InterestHandler missContinuation = std::bind(&Dispatcher::processStatusDatasetInterest, this, _1, _2,
243  std::move(authorize), std::move(accepted), std::move(rejected));
244 
245  m_handlers[relPrefix] = [this, miss = std::move(missContinuation)] (auto&&... args) {
246  this->queryStorage(std::forward<decltype(args)>(args)..., miss);
247  };
248 }
249 
250 void
251 Dispatcher::processStatusDatasetInterest(const Name& prefix,
252  const Interest& interest,
253  const Authorization& authorization,
254  const AuthorizationAcceptedCallback& accepted,
255  const AuthorizationRejectedCallback& rejected)
256 {
257  const Name& interestName = interest.getName();
258  bool endsWithVersionOrSegment = interestName.size() >= 1 &&
259  (interestName[-1].isVersion() || interestName[-1].isSegment());
260  if (endsWithVersionOrSegment) {
261  return;
262  }
263 
264  AcceptContinuation accept = [=] (const auto& req) { accepted(req, prefix, interest, nullptr); };
265  RejectContinuation reject = [=] (RejectReply reply) { rejected(reply, interest); };
266  authorization(prefix, interest, nullptr, accept, reject);
267 }
268 
269 void
270 Dispatcher::processAuthorizedStatusDatasetInterest(const Name& prefix,
271  const Interest& interest,
272  const StatusDatasetHandler& handler)
273 {
274  StatusDatasetContext context(interest,
275  [this] (auto&&... args) {
276  sendStatusDatasetSegment(std::forward<decltype(args)>(args)...);
277  },
278  [this, interest] (auto&&... args) {
279  sendControlResponse(std::forward<decltype(args)>(args)..., interest, true);
280  });
281  handler(prefix, interest, context);
282 }
283 
284 void
285 Dispatcher::sendStatusDatasetSegment(const Name& dataName, const Block& content, bool isFinalBlock)
286 {
287  // the first segment will be sent to both places (the face and the in-memory storage)
288  // other segments will be inserted to the in-memory storage only
289  auto destination = SendDestination::IMS;
290  if (dataName[-1].toSegment() == 0) {
291  destination = SendDestination::FACE_AND_IMS;
292  }
293 
294  MetaInfo metaInfo;
295  if (isFinalBlock) {
296  metaInfo.setFinalBlock(dataName[-1]);
297  }
298 
299  sendData(dataName, content, metaInfo, destination);
300 }
301 
304 {
305  if (!m_topLevelPrefixes.empty()) {
306  NDN_THROW(std::domain_error("one or more top-level prefix has been added"));
307  }
308 
309  if (isOverlappedWithOthers(relPrefix)) {
310  NDN_THROW(std::out_of_range("notification stream name overlaps"));
311  }
312 
313  // register a handler for the subscriber of this notification stream
314  // keep silent if Interest does not match a stored notification
315  m_handlers[relPrefix] = [this] (auto&&... args) {
316  this->queryStorage(std::forward<decltype(args)>(args)..., nullptr);
317  };
318  m_streams[relPrefix] = 0;
319 
320  return [=] (const Block& b) { postNotification(b, relPrefix); };
321 }
322 
323 void
324 Dispatcher::postNotification(const Block& notification, const PartialName& relPrefix)
325 {
326  if (m_topLevelPrefixes.size() != 1) {
327  NDN_LOG_WARN("postNotification: no top-level prefix or too many top-level prefixes");
328  return;
329  }
330 
331  Name streamName(m_topLevelPrefixes.begin()->first);
332  streamName.append(relPrefix);
333  streamName.appendSequenceNumber(m_streams[streamName]++);
334 
335  // notification is sent out via the face after inserting into the in-memory storage,
336  // because a request may be pending in the PIT
337  sendData(streamName, notification, {}, SendDestination::FACE_AND_IMS);
338 }
339 
340 } // namespace mgmt
341 } // namespace ndn
Represents a TLV element of the NDN packet format.
Definition: block.hpp:45
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:91
RegisteredPrefixHandle registerPrefix(const Name &prefix, const RegisterPrefixSuccessCallback &onSuccess, const RegisterPrefixFailureCallback &onFailure, const security::SigningInfo &signingInfo=security::SigningInfo(), uint64_t flags=nfd::ROUTE_FLAG_CHILD_INHERIT)
Register prefix with the connected NDN forwarder.
Definition: face.cpp:246
RegisteredPrefixHandle setInterestFilter(const InterestFilter &filter, const InterestCallback &onInterest, const RegisterPrefixFailureCallback &onFailure, const security::SigningInfo &signingInfo=security::SigningInfo(), uint64_t flags=nfd::ROUTE_FLAG_CHILD_INHERIT)
Set InterestFilter to dispatch incoming matching interest to onInterest callback and register the fil...
Definition: face.cpp:212
void put(Data data)
Publish a Data packet.
Definition: face.cpp:196
void insert(const Data &data, const time::milliseconds &mustBeFreshProcessingWindow=INFINITE_WINDOW)
Inserts a Data packet.
shared_ptr< const Data > find(const Interest &interest)
Finds the best match Data for an Interest.
Represents an Interest packet.
Definition: interest.hpp:50
const Name & getName() const noexcept
Definition: interest.hpp:173
Represents an absolute name.
Definition: name.hpp:44
size_t size() const noexcept
Returns the number of components.
Definition: name.hpp:155
Base class for a struct that contains ControlCommand parameters.
Implements a request dispatcher on server side of NFD Management protocol.
Definition: dispatcher.hpp:131
PostNotification addNotificationStream(const PartialName &relPrefix)
Register a NotificationStream.
Definition: dispatcher.cpp:303
Dispatcher(Face &face, KeyChain &keyChain, const security::SigningInfo &signingInfo=security::SigningInfo(), size_t imsCapacity=256)
Constructor.
Definition: dispatcher.cpp:43
void addStatusDataset(const PartialName &relPrefix, Authorization authorize, StatusDatasetHandler handle)
Register a StatusDataset or a prefix under which StatusDatasets can be requested.
Definition: dispatcher.cpp:223
void addTopPrefix(const Name &prefix, bool wantRegister=true, const security::SigningInfo &signingInfo=security::SigningInfo())
Add a top-level prefix.
Definition: dispatcher.cpp:56
void removeTopPrefix(const Name &prefix)
Remove a top-level prefix.
Definition: dispatcher.cpp:89
Signing parameters passed to KeyChain.
#define NDN_THROW(e)
Definition: exception.hpp:61
#define NDN_LOG_WARN(expression)
Log at WARN level.
Definition: logger.hpp:264
#define NDN_LOG_ERROR(expression)
Log at ERROR level.
Definition: logger.hpp:269
#define NDN_LOG_INIT(name)
Define a non-member log module.
Definition: logger.hpp:163
@ CachePolicy
Definition: tlv.hpp:46
std::function< void(const Name &prefix, const Interest &interest, const ControlParameters *params, const AcceptContinuation &accept, const RejectContinuation &reject)> Authorization
A function that performs authorization.
Definition: dispatcher.hpp:77
std::function< void(const Block &notification)> PostNotification
A function to post a notification.
Definition: dispatcher.hpp:124
std::function< void(const Name &prefix, const Interest &interest, const ControlParameters &params, const CommandContinuation &done)> ControlCommandHandler
A function to handle an authorized ControlCommand.
Definition: dispatcher.hpp:106
std::function< bool(const ControlParameters &params)> ValidateParameters
A function to validate input ControlParameters.
Definition: dispatcher.hpp:90
RejectReply
Indicate how to reply in case authorization is rejected.
Definition: dispatcher.hpp:49
@ STATUS403
Reply with a ControlResponse where StatusCode is 403.
std::function< void(const std::string &requester)> AcceptContinuation
A function to be called if authorization is successful.
Definition: dispatcher.hpp:45
std::function< void(RejectReply reply)> RejectContinuation
A function to be called if authorization is rejected.
Definition: dispatcher.hpp:60
Authorization makeAcceptAllAuthorization()
Return an Authorization that accepts all Interests, with empty string as requester.
Definition: dispatcher.cpp:32
std::function< void(const Name &prefix, const Interest &interest, StatusDatasetContext &context)> StatusDatasetHandler
A function to handle a StatusDataset request.
Definition: dispatcher.hpp:118
mgmt::ControlResponse ControlResponse
@ Name
Definition: tlv.hpp:71
@ Data
Definition: tlv.hpp:69
@ MetaInfo
Definition: tlv.hpp:92
@ Interest
Definition: tlv.hpp:68
@ ContentType_Nack
application-level nack
Definition: tlv.hpp:148
Definition: data.cpp:25