dispatcher.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
22 #include "dispatcher.hpp"
23 #include "../lp/tags.hpp"
24 #include "../util/logger.hpp"
25 
27 
28 namespace ndn {
29 namespace mgmt {
30 
31 const time::milliseconds DEFAULT_FRESHNESS_PERIOD = time::milliseconds(1000);
32 
35 {
36  return [] (const Name& prefix,
37  const Interest& interest,
38  const ControlParameters* params,
39  const AcceptContinuation& accept,
40  const RejectContinuation& reject) {
41  accept("");
42  };
43 }
44 
45 Dispatcher::Dispatcher(Face& face, KeyChain& keyChain,
46  const security::SigningInfo& signingInfo,
47  size_t imsCapacity)
48  : m_face(face)
49  , m_keyChain(keyChain)
50  , m_signingInfo(signingInfo)
51  , m_storage(m_face.getIoService(), imsCapacity)
52 {
53 }
54 
56 {
57  std::vector<Name> topPrefixNames;
58 
59  std::transform(m_topLevelPrefixes.begin(),
60  m_topLevelPrefixes.end(),
61  std::back_inserter(topPrefixNames),
62  [] (const std::unordered_map<Name, TopPrefixEntry>::value_type& entry) {
63  return entry.second.topPrefix;
64  });
65 
66  for (auto&& name : topPrefixNames) {
67  removeTopPrefix(name);
68  }
69 }
70 
71 void
73  bool wantRegister,
74  const security::SigningInfo& signingInfo)
75 {
76  bool hasOverlap = std::any_of(m_topLevelPrefixes.begin(),
77  m_topLevelPrefixes.end(),
78  [&] (const std::unordered_map<Name, TopPrefixEntry>::value_type& x) {
79  return x.first.isPrefixOf(prefix) || prefix.isPrefixOf(x.first);
80  });
81  if (hasOverlap) {
82  BOOST_THROW_EXCEPTION(std::out_of_range("Top-level Prefixes overlapped"));
83  }
84 
85  TopPrefixEntry& topPrefixEntry = m_topLevelPrefixes[prefix];;
86  topPrefixEntry.topPrefix = prefix;
87  topPrefixEntry.wantRegister = wantRegister;
88 
89  if (wantRegister) {
90  RegisterPrefixFailureCallback failure = [] (const Name& name, const std::string& reason) {
91  BOOST_THROW_EXCEPTION(std::runtime_error(reason));
92  };
93  topPrefixEntry.registerPrefixId =
94  m_face.registerPrefix(prefix, bind([]{}), failure, signingInfo);
95  }
96 
97  for (auto&& entry : m_handlers) {
98  Name fullPrefix = prefix;
99  fullPrefix.append(entry.first);
100 
101  const InterestFilterId* interestFilterId =
102  m_face.setInterestFilter(fullPrefix, std::bind(entry.second, prefix, _2));
103 
104  topPrefixEntry.interestFilters.push_back(interestFilterId);
105  }
106 }
107 
108 void
110 {
111  auto it = m_topLevelPrefixes.find(prefix);
112  if (it == m_topLevelPrefixes.end()) {
113  return;
114  }
115 
116  const TopPrefixEntry& topPrefixEntry = it->second;
117  if (topPrefixEntry.wantRegister) {
118  m_face.unregisterPrefix(topPrefixEntry.registerPrefixId, bind([]{}), bind([]{}));
119  }
120 
121  for (auto&& filter : topPrefixEntry.interestFilters) {
122  m_face.unsetInterestFilter(filter);
123  }
124 
125  m_topLevelPrefixes.erase(it);
126 }
127 
128 bool
129 Dispatcher::isOverlappedWithOthers(const PartialName& relPrefix)
130 {
131  bool hasOverlapWithHandlers =
132  std::any_of(m_handlers.begin(), m_handlers.end(),
133  [&] (const HandlerMap::value_type& entry) {
134  return entry.first.isPrefixOf(relPrefix) || relPrefix.isPrefixOf(entry.first);
135  });
136  bool hasOverlapWithStreams =
137  std::any_of(m_streams.begin(), m_streams.end(),
138  [&] (const std::unordered_map<PartialName, uint64_t>::value_type& entry) {
139  return entry.first.isPrefixOf(relPrefix) || relPrefix.isPrefixOf(entry.first);
140  });
141 
142  return hasOverlapWithHandlers || hasOverlapWithStreams;
143 }
144 
145 void
146 Dispatcher::afterAuthorizationRejected(RejectReply act, const Interest& interest)
147 {
148  if (act == RejectReply::STATUS403) {
149  sendControlResponse(ControlResponse(403, "authorization rejected"), interest);
150  }
151 }
152 
153 void
154 Dispatcher::queryStorage(const Name& prefix, const Interest& interest,
155  const InterestHandler& missContinuation)
156 {
157  auto data = m_storage.find(interest);
158  if (data == nullptr) {
159  // invoke missContinuation to process this Interest if the query fails.
160  missContinuation(prefix, interest);
161  }
162  else {
163  // send the fetched data through face if query succeeds.
164  sendOnFace(*data);
165  }
166 }
167 
168 void
169 Dispatcher::sendData(const Name& dataName, const Block& content, const MetaInfo& metaInfo,
170  SendDestination option, time::milliseconds imsFresh)
171 {
172  shared_ptr<Data> data = make_shared<Data>(dataName);
173  data->setContent(content).setMetaInfo(metaInfo).setFreshnessPeriod(DEFAULT_FRESHNESS_PERIOD);
174 
175  m_keyChain.sign(*data, m_signingInfo);
176 
177  if (option == SendDestination::IMS || option == SendDestination::FACE_AND_IMS) {
178  lp::CachePolicy policy;
180  data->setTag(make_shared<lp::CachePolicyTag>(policy));
181  m_storage.insert(*data, imsFresh);
182  }
183 
184  if (option == SendDestination::FACE || option == SendDestination::FACE_AND_IMS) {
185  sendOnFace(*data);
186  }
187 }
188 
189 void
190 Dispatcher::sendOnFace(const Data& data)
191 {
192  try {
193  m_face.put(data);
194  }
195  catch (const Face::Error& e) {
196  NDN_LOG_ERROR("sendOnFace: " << e.what());
197  }
198 }
199 
200 void
201 Dispatcher::processControlCommandInterest(const Name& prefix,
202  const Name& relPrefix,
203  const Interest& interest,
204  const ControlParametersParser& parser,
205  const Authorization& authorization,
206  const AuthorizationAcceptedCallback& accepted,
207  const AuthorizationRejectedCallback& rejected)
208 {
209  // /<prefix>/<relPrefix>/<parameters>
210  size_t parametersLoc = prefix.size() + relPrefix.size();
211  const name::Component& pc = interest.getName().get(parametersLoc);
212 
213  shared_ptr<ControlParameters> parameters;
214  try {
215  parameters = parser(pc);
216  }
217  catch (const tlv::Error&) {
218  return;
219  }
220 
221  AcceptContinuation accept = bind(accepted, _1, prefix, interest, parameters);
222  RejectContinuation reject = bind(rejected, _1, interest);
223  authorization(prefix, interest, parameters.get(), accept, reject);
224 }
225 
226 void
227 Dispatcher::processAuthorizedControlCommandInterest(const std::string& requester,
228  const Name& prefix,
229  const Interest& interest,
230  const shared_ptr<ControlParameters>& parameters,
231  const ValidateParameters& validateParams,
232  const ControlCommandHandler& handler)
233 {
234  if (validateParams(*parameters)) {
235  handler(prefix, interest, *parameters,
236  bind(&Dispatcher::sendControlResponse, this, _1, interest, false));
237  }
238  else {
239  sendControlResponse(ControlResponse(400, "failed in validating parameters"), interest);
240  }
241 }
242 
243 void
244 Dispatcher::sendControlResponse(const ControlResponse& resp, const Interest& interest,
245  bool isNack)
246 {
247  MetaInfo metaInfo;
248  if (isNack) {
249  metaInfo.setType(tlv::ContentType_Nack);
250  }
251  // control response is always sent out through the face
252  sendData(interest.getName(), resp.wireEncode(), metaInfo, SendDestination::FACE,
254 }
255 
256 void
258  const Authorization& authorization,
259  const StatusDatasetHandler& handler)
260 {
261  if (!m_topLevelPrefixes.empty()) {
262  BOOST_THROW_EXCEPTION(std::domain_error("one or more top-level prefix has been added"));
263  }
264 
265  if (isOverlappedWithOthers(relPrefix)) {
266  BOOST_THROW_EXCEPTION(std::out_of_range("relPrefix overlapped"));
267  }
268 
269  AuthorizationAcceptedCallback accepted =
270  bind(&Dispatcher::processAuthorizedStatusDatasetInterest, this,
271  _1, _2, _3, handler);
272  AuthorizationRejectedCallback rejected =
273  bind(&Dispatcher::afterAuthorizationRejected, this, _1, _2);
274 
275  // follow the general path if storage is a miss
276  InterestHandler missContinuation = bind(&Dispatcher::processStatusDatasetInterest, this,
277  _1, _2, authorization, accepted, rejected);
278  m_handlers[relPrefix] = bind(&Dispatcher::queryStorage, this, _1, _2, missContinuation);
279 }
280 
281 void
282 Dispatcher::processStatusDatasetInterest(const Name& prefix,
283  const Interest& interest,
284  const Authorization& authorization,
285  const AuthorizationAcceptedCallback& accepted,
286  const AuthorizationRejectedCallback& rejected)
287 {
288  const Name& interestName = interest.getName();
289  bool endsWithVersionOrSegment = interestName.size() >= 1 &&
290  (interestName[-1].isVersion() || interestName[-1].isSegment());
291  if (endsWithVersionOrSegment) {
292  return;
293  }
294 
295  AcceptContinuation accept = bind(accepted, _1, prefix, interest, nullptr);
296  RejectContinuation reject = bind(rejected, _1, interest);
297  authorization(prefix, interest, nullptr, accept, reject);
298 }
299 
300 void
301 Dispatcher::processAuthorizedStatusDatasetInterest(const std::string& requester,
302  const Name& prefix,
303  const Interest& interest,
304  const StatusDatasetHandler& handler)
305 {
306  StatusDatasetContext context(interest,
307  bind(&Dispatcher::sendStatusDatasetSegment, this, _1, _2, _3, _4),
308  bind(&Dispatcher::sendControlResponse, this, _1, interest, true));
309  handler(prefix, interest, context);
310 }
311 
312 void
313 Dispatcher::sendStatusDatasetSegment(const Name& dataName, const Block& content,
314  time::milliseconds imsFresh, bool isFinalBlock)
315 {
316  // the first segment will be sent to both places (the face and the in-memory storage)
317  // other segments will be inserted to the in-memory storage only
318  auto destination = SendDestination::IMS;
319  if (dataName[-1].toSegment() == 0) {
320  destination = SendDestination::FACE_AND_IMS;
321  }
322 
323  MetaInfo metaInfo;
324  if (isFinalBlock) {
325  metaInfo.setFinalBlockId(dataName[-1]);
326  }
327 
328  sendData(dataName, content, metaInfo, destination, imsFresh);
329 }
330 
333 {
334  if (!m_topLevelPrefixes.empty()) {
335  BOOST_THROW_EXCEPTION(std::domain_error("one or more top-level prefix has been added"));
336  }
337 
338  if (isOverlappedWithOthers(relPrefix)) {
339  BOOST_THROW_EXCEPTION(std::out_of_range("relPrefix overlaps with another relPrefix"));
340  }
341 
342  // keep silent if Interest does not match a stored notification
343  InterestHandler missContinuation = bind([]{});
344 
345  // register a handler for the subscriber of this notification stream
346  m_handlers[relPrefix] = bind(&Dispatcher::queryStorage, this, _1, _2, missContinuation);
347  m_streams[relPrefix] = 0;
348  return bind(&Dispatcher::postNotification, this, _1, relPrefix);
349 }
350 
351 void
352 Dispatcher::postNotification(const Block& notification, const PartialName& relPrefix)
353 {
354  if (m_topLevelPrefixes.empty() || m_topLevelPrefixes.size() > 1) {
355  NDN_LOG_WARN("postNotification: no top-level prefix or too many top-level prefixes");
356  return;
357  }
358 
359  Name streamName(m_topLevelPrefixes.begin()->second.topPrefix);
360  streamName.append(relPrefix);
361  streamName.appendSequenceNumber(m_streams[streamName]++);
362 
363  // notification is sent out by the face after inserting into the in-memory storage,
364  // because a request may be pending in the PIT
365  sendData(streamName, notification, MetaInfo(), SendDestination::FACE_AND_IMS,
366  DEFAULT_FRESHNESS_PERIOD);
367 }
368 
369 } // namespace mgmt
370 } // namespace ndn
represents a CachePolicy header field
const Name & getName() const
Definition: interest.hpp:139
Copyright (c) 2013-2017 Regents of the University of California.
Definition: common.hpp:66
indicates a producer generated NACK
std::function< void(const Block &notification)> PostNotification
a function to post a notification
Definition: dispatcher.hpp:123
represents a dispatcher on server side of NFD Management protocol
Definition: dispatcher.hpp:129
RejectReply
indicate how to reply in case authorization is rejected
Definition: dispatcher.hpp:49
const RegisteredPrefixId * setInterestFilter(const InterestFilter &interestFilter, const InterestCallback &onInterest, const RegisterPrefixFailureCallback &onFailure, const security::SigningInfo &signingInfo=security::SigningInfo(), uint64_t flags=nfd::ROUTE_FLAG_CHILD_INHERIT)
Set InterestFilter to dispatch incoming matching interest to onInterest callback and register the fil...
Definition: face.cpp:235
reply with a ControlResponse where StatusCode is 403
CachePolicy & setPolicy(CachePolicyType policy)
set policy type code
Represents a TLV element of NDN packet format.
Definition: block.hpp:42
represents an Interest packet
Definition: interest.hpp:42
std::function< void(RejectReply act)> RejectContinuation
a function to be called if authorization is rejected
Definition: dispatcher.hpp:60
std::function< void(const std::string &requester)> AcceptContinuation
a function to be called if authorization is successful
Definition: dispatcher.hpp:45
Dispatcher(Face &face, KeyChain &keyChain, const security::SigningInfo &signingInfo=security::SigningInfo(), size_t imsCapacity=256)
constructor
Definition: dispatcher.cpp:45
const Block & wireEncode() const
#define NDN_LOG_INIT(name)
declare a log module
Definition: logger.hpp:101
Authorization makeAcceptAllAuthorization()
Definition: dispatcher.cpp:34
Name & append(const Component &component)
Append a component.
Definition: name.hpp:256
Signing parameters passed to KeyChain.
void unregisterPrefix(const RegisteredPrefixId *registeredPrefixId, const UnregisterPrefixSuccessCallback &onSuccess, const UnregisterPrefixFailureCallback &onFailure)
Unregister prefix from RIB.
Definition: face.cpp:304
MetaInfo & setFinalBlockId(const name::Component &finalBlockId)
Definition: meta-info.cpp:66
shared_ptr< const Data > find(const Interest &interest)
Finds the best match Data for an Interest.
MetaInfo & setType(uint32_t type)
set ContentType
Definition: meta-info.cpp:47
mgmt::ControlResponse ControlResponse
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:95
void addTopPrefix(const Name &prefix, bool wantRegister=true, const security::SigningInfo &signingInfo=security::SigningInfo())
add a top-level prefix
Definition: dispatcher.cpp:72
An MetaInfo holds the meta info which is signed inside the data packet.
Definition: meta-info.hpp:58
size_t size() const
Get number of components.
Definition: name.hpp:154
function< void(const Name &, const std::string &)> RegisterPrefixFailureCallback
Callback invoked when registerPrefix or setInterestFilter command fails.
Definition: face.hpp:80
Name & appendSequenceNumber(uint64_t seqNo)
Append a sequence number component.
Definition: name.hpp:406
Represents an absolute name.
Definition: name.hpp:42
bool isPrefixOf(const Name &other) const
Check if this name is a prefix of another name.
Definition: name.cpp:260
const time::milliseconds DEFAULT_FRESHNESS_PERIOD
Definition: dispatcher.cpp:31
void unsetInterestFilter(const RegisteredPrefixId *registeredPrefixId)
Remove the registered prefix entry with the registeredPrefixId.
Definition: face.cpp:288
base class for a struct that contains ControlCommand parameters
#define NDN_LOG_WARN(expression)
log at WARN level
Definition: logger.hpp:160
Component holds a read-only name component value.
std::function< void(const Name &prefix, const Interest &interest, const ControlParameters &params, const CommandContinuation &done)> ControlCommandHandler
a function to handle an authorized ControlCommand
Definition: dispatcher.hpp:106
void addStatusDataset(const PartialName &relPrefix, const Authorization &authorization, const StatusDatasetHandler &handler)
register a StatusDataset or a prefix under which StatusDatasets can be requested
Definition: dispatcher.cpp:257
std::function< bool(const ControlParameters &params)> ValidateParameters
a function to validate input ControlParameters
Definition: dispatcher.hpp:90
void insert(const Data &data, const time::milliseconds &mustBeFreshProcessingWindow=INFINITE_WINDOW)
Inserts a Data packet.
void put(Data data)
Publish data packet.
Definition: face.cpp:217
void removeTopPrefix(const Name &prefix)
remove a top-level prefix
Definition: dispatcher.cpp:109
ControlCommand response.
provides a context for generating response to a StatusDataset request
Represents a Data packet.
Definition: data.hpp:35
std::function< void(const Name &prefix, const Interest &interest, const ControlParameters *params, const AcceptContinuation &accept, const RejectContinuation &reject)> Authorization
a function that performs authorization
Definition: dispatcher.hpp:77
#define NDN_LOG_ERROR(expression)
log at ERROR level
Definition: logger.hpp:165
const Component & get(ssize_t i) const
Get the component at the given index.
Definition: name.hpp:164
const RegisteredPrefixId * registerPrefix(const Name &prefix, const RegisterPrefixSuccessCallback &onSuccess, const RegisterPrefixFailureCallback &onFailure, const security::SigningInfo &signingInfo=security::SigningInfo(), uint64_t flags=nfd::ROUTE_FLAG_CHILD_INHERIT)
Register prefix with the connected NDN forwarder.
Definition: face.cpp:275
represents an error in TLV encoding or decoding
PostNotification addNotificationStream(const PartialName &relPrefix)
register a NotificationStream
Definition: dispatcher.cpp:332
std::function< void(const Name &prefix, const Interest &interest, StatusDatasetContext &context)> StatusDatasetHandler
a function to handle a StatusDataset request
Definition: dispatcher.hpp:117