rib-manager.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
26 #include "rib-manager.hpp"
27 #include "core/global-io.hpp"
28 #include "core/logger.hpp"
29 #include "core/scheduler.hpp"
30 #include <ndn-cxx/lp/tags.hpp>
31 #include <ndn-cxx/mgmt/nfd/face-status.hpp>
32 #include <ndn-cxx/mgmt/nfd/rib-entry.hpp>
33 
34 namespace nfd {
35 namespace rib {
36 
37 NFD_LOG_INIT("RibManager");
38 
39 const Name RibManager::LOCAL_HOST_TOP_PREFIX = "/localhost/nfd";
40 const Name RibManager::LOCAL_HOP_TOP_PREFIX = "/localhop/nfd";
41 const std::string RibManager::MGMT_MODULE_NAME = "rib";
42 const Name RibManager::FACES_LIST_DATASET_PREFIX = "/localhost/nfd/faces/list";
43 const time::seconds RibManager::ACTIVE_FACE_FETCH_INTERVAL = time::seconds(300);
44 
45 RibManager::RibManager(Dispatcher& dispatcher,
46  ndn::Face& face,
47  ndn::KeyChain& keyChain)
48  : ManagerBase(dispatcher, MGMT_MODULE_NAME)
49  , m_face(face)
50  , m_keyChain(keyChain)
51  , m_nfdController(m_face, m_keyChain)
52  , m_faceMonitor(m_face)
53  , m_localhostValidator(m_face)
54  , m_localhopValidator(m_face)
55  , m_isLocalhopEnabled(false)
56  , m_prefixPropagator(m_nfdController, m_keyChain, m_rib)
57  , m_fibUpdater(m_rib, m_nfdController)
58  , m_addTopPrefix([&dispatcher] (const Name& topPrefix) {
59  dispatcher.addTopPrefix(topPrefix, false);
60  })
61 {
62  registerCommandHandler<ndn::nfd::RibRegisterCommand>("register",
63  bind(&RibManager::registerEntry, this, _2, _3, _4, _5));
64  registerCommandHandler<ndn::nfd::RibUnregisterCommand>("unregister",
65  bind(&RibManager::unregisterEntry, this, _2, _3, _4, _5));
66 
67  registerStatusDatasetHandler("list", bind(&RibManager::listEntries, this, _1, _2, _3));
68 }
69 
71 {
72  scheduler::cancel(m_activeFaceFetchEvent);
73 }
74 
75 void
77 {
78  registerTopPrefix(LOCAL_HOST_TOP_PREFIX);
79 
80  if (m_isLocalhopEnabled) {
81  registerTopPrefix(LOCAL_HOP_TOP_PREFIX);
82  }
83 
84  NFD_LOG_INFO("Start monitoring face create/destroy events");
85  m_faceMonitor.onNotification.connect(bind(&RibManager::onNotification, this, _1));
86  m_faceMonitor.start();
87 
88  scheduleActiveFaceFetch(ACTIVE_FACE_FETCH_INTERVAL);
89 }
90 
91 void
93 {
94  m_nfdController.start<ndn::nfd::FaceEnableLocalControlCommand>(
95  ControlParameters()
96  .setLocalControlFeature(ndn::nfd::LOCAL_CONTROL_FEATURE_INCOMING_FACE_ID),
97  bind(&RibManager::onControlHeaderSuccess, this),
98  bind(&RibManager::onControlHeaderError, this, _1));
99 }
100 
101 void
103 {
104  configFile.addSectionHandler("rib",
105  bind(&RibManager::onConfig, this, _1, _2, _3));
106 }
107 
108 void
110 {
111  NFD_LOG_DEBUG("RIB update succeeded for " << update);
112 }
113 
114 void
115 RibManager::onRibUpdateFailure(const RibUpdate& update, uint32_t code, const std::string& error)
116 {
117  NFD_LOG_DEBUG("RIB update failed for " << update << " (code: " << code
118  << ", error: " << error << ")");
119 
120  // Since the FIB rejected the update, clean up invalid routes
121  scheduleActiveFaceFetch(time::seconds(1));
122 }
123 
124 void
125 RibManager::onConfig(const ConfigSection& configSection,
126  bool isDryRun,
127  const std::string& filename)
128 {
129  bool isAutoPrefixPropagatorEnabled = false;
130 
131  for (const auto& item : configSection) {
132  if (item.first == "localhost_security") {
133  m_localhostValidator.load(item.second, filename);
134  }
135  else if (item.first == "localhop_security") {
136  m_localhopValidator.load(item.second, filename);
137  m_isLocalhopEnabled = true;
138  }
139  else if (item.first == "auto_prefix_propagate") {
140  m_prefixPropagator.loadConfig(item.second);
141  isAutoPrefixPropagatorEnabled = true;
142 
143  // Avoid other actions when isDryRun == true
144  if (isDryRun) {
145  continue;
146  }
147 
148  m_prefixPropagator.enable();
149  }
150  else {
151  BOOST_THROW_EXCEPTION(Error("Unrecognized rib property: " + item.first));
152  }
153  }
154 
155  if (!isAutoPrefixPropagatorEnabled) {
156  m_prefixPropagator.disable();
157  }
158 }
159 
160 void
161 RibManager::registerTopPrefix(const Name& topPrefix)
162 {
163  // register entry to the FIB
164  m_nfdController.start<ndn::nfd::FibAddNextHopCommand>(
165  ControlParameters()
166  .setName(Name(topPrefix).append(MGMT_MODULE_NAME))
167  .setFaceId(0),
168  bind(&RibManager::onCommandPrefixAddNextHopSuccess, this, cref(topPrefix), _1),
169  bind(&RibManager::onCommandPrefixAddNextHopError, this, cref(topPrefix), _1));
170 
171  // add top prefix to the dispatcher
172  m_addTopPrefix(topPrefix);
173 }
174 
175 void
176 RibManager::registerEntry(const Name& topPrefix, const Interest& interest,
177  ControlParameters parameters,
178  const ndn::mgmt::CommandContinuation& done)
179 {
180  setFaceForSelfRegistration(interest, parameters);
181 
182  // Respond since command is valid and authorized
183  done(ControlResponse(200, "Success").setBody(parameters.wireEncode()));
184 
185  Route route;
186  route.faceId = parameters.getFaceId();
187  route.origin = parameters.getOrigin();
188  route.cost = parameters.getCost();
189  route.flags = parameters.getFlags();
190 
191  if (parameters.hasExpirationPeriod() &&
192  parameters.getExpirationPeriod() != time::milliseconds::max())
193  {
194  route.expires = time::steady_clock::now() + parameters.getExpirationPeriod();
195 
196  // Schedule a new event, the old one will be cancelled during rib insertion.
197  scheduler::EventId eventId = scheduler::schedule(parameters.getExpirationPeriod(),
198  bind(&Rib::onRouteExpiration, &m_rib, parameters.getName(), route));
199 
200  NFD_LOG_TRACE("Scheduled unregistration at: " << route.expires <<
201  " with EventId: " << eventId);
202 
203  // Set the NewEventId of this entry
204  route.setExpirationEvent(eventId);
205  }
206  else {
207  route.expires = time::steady_clock::TimePoint::max();
208  }
209 
210  NFD_LOG_INFO("Adding route " << parameters.getName() << " nexthop=" << route.faceId
211  << " origin=" << route.origin
212  << " cost=" << route.cost);
213 
214  RibUpdate update;
215  update.setAction(RibUpdate::REGISTER)
216  .setName(parameters.getName())
217  .setRoute(route);
218 
219  m_rib.beginApplyUpdate(update,
220  bind(&RibManager::onRibUpdateSuccess, this, update),
221  bind(&RibManager::onRibUpdateFailure, this, update, _1, _2));
222 
223  m_registeredFaces.insert(route.faceId);
224 }
225 
226 void
227 RibManager::unregisterEntry(const Name& topPrefix, const Interest& interest,
228  ControlParameters parameters,
229  const ndn::mgmt::CommandContinuation& done)
230 {
231  setFaceForSelfRegistration(interest, parameters);
232 
233  // Respond since command is valid and authorized
234  done(ControlResponse(200, "Success").setBody(parameters.wireEncode()));
235 
236  Route route;
237  route.faceId = parameters.getFaceId();
238  route.origin = parameters.getOrigin();
239 
240  NFD_LOG_INFO("Removing route " << parameters.getName() << " nexthop=" << route.faceId
241  << " origin=" << route.origin);
242 
243  RibUpdate update;
244  update.setAction(RibUpdate::UNREGISTER)
245  .setName(parameters.getName())
246  .setRoute(route);
247 
248  m_rib.beginApplyUpdate(update,
249  bind(&RibManager::onRibUpdateSuccess, this, update),
250  bind(&RibManager::onRibUpdateFailure, this, update, _1, _2));
251 }
252 
253 void
254 RibManager::listEntries(const Name& topPrefix, const Interest& interest,
255  ndn::mgmt::StatusDatasetContext& context)
256 {
257  for (auto&& ribTableEntry : m_rib) {
258  const auto& ribEntry = *ribTableEntry.second;
259  ndn::nfd::RibEntry record;
260 
261  for (auto&& route : ribEntry) {
262  ndn::nfd::Route routeElement;
263  routeElement.setFaceId(route.faceId)
264  .setOrigin(route.origin)
265  .setCost(route.cost)
266  .setFlags(route.flags);
267 
268  if (route.expires < time::steady_clock::TimePoint::max()) {
269  routeElement.setExpirationPeriod(time::duration_cast<time::milliseconds>(
270  route.expires - time::steady_clock::now()));
271  }
272 
273  record.addRoute(routeElement);
274  }
275 
276  record.setName(ribEntry.getName());
277  context.append(record.wireEncode());
278  }
279 
280  context.end();
281 }
282 
283 void
284 RibManager::setFaceForSelfRegistration(const Interest& request, ControlParameters& parameters)
285 {
286  bool isSelfRegistration = (parameters.getFaceId() == 0);
287  if (isSelfRegistration) {
288  shared_ptr<lp::IncomingFaceIdTag> incomingFaceIdTag = request.getTag<lp::IncomingFaceIdTag>();
289  // NDNLPv2 says "application MUST be prepared to receive a packet without IncomingFaceId field",
290  // but it's fine to assert IncomingFaceId is available, because InternalFace lives inside NFD
291  // and is initialized synchronously with IncomingFaceId field enabled.
292  BOOST_ASSERT(incomingFaceIdTag != nullptr);
293  parameters.setFaceId(*incomingFaceIdTag);
294  }
295 }
296 
297 ndn::mgmt::Authorization
298 RibManager::makeAuthorization(const std::string& verb)
299 {
300  return [this] (const Name& prefix, const Interest& interest,
301  const ndn::mgmt::ControlParameters* params,
302  const ndn::mgmt::AcceptContinuation& accept,
303  const ndn::mgmt::RejectContinuation& reject) {
304  BOOST_ASSERT(params != nullptr);
305  BOOST_ASSERT(typeid(*params) == typeid(ndn::nfd::ControlParameters));
306  BOOST_ASSERT(prefix == LOCAL_HOST_TOP_PREFIX || prefix == LOCAL_HOP_TOP_PREFIX);
307 
308  ndn::ValidatorConfig& validator = prefix == LOCAL_HOST_TOP_PREFIX ?
309  m_localhostValidator : m_localhopValidator;
310  validator.validate(interest,
311  bind([&interest, this, accept] { extractRequester(interest, accept); }),
312  bind([reject] { reject(ndn::mgmt::RejectReply::STATUS403); }));
313  };
314 }
315 
316 void
317 RibManager::fetchActiveFaces()
318 {
319  NFD_LOG_DEBUG("Fetching active faces");
320 
321  Interest interest(FACES_LIST_DATASET_PREFIX);
322  interest.setChildSelector(1);
323  interest.setMustBeFresh(true);
324 
325  shared_ptr<ndn::OBufferStream> buffer = make_shared<ndn::OBufferStream>();
326 
327  m_face.expressInterest(interest,
328  bind(&RibManager::fetchSegments, this, _2, buffer),
329  bind(&RibManager::onFetchFaceStatusTimeout, this));
330 }
331 
332 void
333 RibManager::fetchSegments(const Data& data, shared_ptr<ndn::OBufferStream> buffer)
334 {
335  buffer->write(reinterpret_cast<const char*>(data.getContent().value()),
336  data.getContent().value_size());
337 
338  uint64_t currentSegment = data.getName().get(-1).toSegment();
339 
340  const name::Component& finalBlockId = data.getMetaInfo().getFinalBlockId();
341 
342  if (finalBlockId.empty() || finalBlockId.toSegment() > currentSegment) {
343  m_face.expressInterest(data.getName().getPrefix(-1).appendSegment(currentSegment+1),
344  bind(&RibManager::fetchSegments, this, _2, buffer),
345  bind(&RibManager::onFetchFaceStatusTimeout, this));
346  }
347  else {
348  removeInvalidFaces(buffer);
349  }
350 }
351 
352 void
353 RibManager::onFetchFaceStatusTimeout()
354 {
355  std::cerr << "Face Status Dataset request timed out" << std::endl;
356  scheduleActiveFaceFetch(ACTIVE_FACE_FETCH_INTERVAL);
357 }
358 
359 void
360 RibManager::onFaceDestroyedEvent(uint64_t faceId)
361 {
362  m_rib.beginRemoveFace(faceId);
363  m_registeredFaces.erase(faceId);
364 }
365 
366 void
367 RibManager::scheduleActiveFaceFetch(const time::seconds& timeToWait)
368 {
369  scheduler::cancel(m_activeFaceFetchEvent);
370 
371  m_activeFaceFetchEvent = scheduler::schedule(timeToWait,
372  bind(&RibManager::fetchActiveFaces, this));
373 }
374 
375 void
376 RibManager::removeInvalidFaces(shared_ptr<ndn::OBufferStream> buffer)
377 {
378  NFD_LOG_DEBUG("Checking for invalid face registrations");
379 
380  ndn::ConstBufferPtr buf = buffer->buf();
381 
382  Block block;
383  size_t offset = 0;
384  FaceIdSet activeFaces;
385 
386  while (offset < buf->size()) {
387  bool isOk = false;
388  std::tie(isOk, block) = Block::fromBuffer(buf, offset);
389  if (!isOk) {
390  std::cerr << "ERROR: cannot decode FaceStatus TLV" << std::endl;
391  break;
392  }
393 
394  offset += block.size();
395 
396  ndn::nfd::FaceStatus status(block);
397  activeFaces.insert(status.getFaceId());
398  }
399 
400  // Look for face IDs that were registered but not active to find missed
401  // face destroyed events
402  for (auto&& faceId : m_registeredFaces) {
403  if (activeFaces.find(faceId) == activeFaces.end()) {
404  NFD_LOG_DEBUG("Removing invalid face ID: " << faceId);
405 
406  scheduler::schedule(time::seconds(0),
407  bind(&RibManager::onFaceDestroyedEvent, this, faceId));
408  }
409  }
410 
411  // Reschedule the check for future clean up
412  scheduleActiveFaceFetch(ACTIVE_FACE_FETCH_INTERVAL);
413 }
414 
415 void
416 RibManager::onNotification(const FaceEventNotification& notification)
417 {
418  NFD_LOG_TRACE("onNotification: " << notification);
419 
420  if (notification.getKind() == ndn::nfd::FACE_EVENT_DESTROYED) {
421  NFD_LOG_DEBUG("Received notification for destroyed faceId: " << notification.getFaceId());
422 
423  scheduler::schedule(time::seconds(0),
424  bind(&RibManager::onFaceDestroyedEvent, this, notification.getFaceId()));
425  }
426 }
427 
428 void
429 RibManager::onCommandPrefixAddNextHopSuccess(const Name& prefix,
430  const ndn::nfd::ControlParameters& result)
431 {
432  NFD_LOG_DEBUG("Successfully registered " + prefix.toUri() + " with NFD");
433 
434  // Routes must be inserted into the RIB so route flags can be applied
435  Route route;
436  route.faceId = result.getFaceId();
437  route.origin = ndn::nfd::ROUTE_ORIGIN_APP;
438  route.expires = time::steady_clock::TimePoint::max();
439  route.flags = ndn::nfd::ROUTE_FLAG_CHILD_INHERIT;
440 
441  m_rib.insert(prefix, route);
442 
443  m_registeredFaces.insert(route.faceId);
444 }
445 
446 void
447 RibManager::onCommandPrefixAddNextHopError(const Name& name,
448  const ndn::nfd::ControlResponse& response)
449 {
450  BOOST_THROW_EXCEPTION(Error("Error in setting interest filter (" + name.toUri() +
451  "): " + response.getText()));
452 }
453 
454 void
455 RibManager::onControlHeaderSuccess()
456 {
457  NFD_LOG_DEBUG("Local control header enabled");
458 }
459 
460 void
461 RibManager::onControlHeaderError(const ndn::nfd::ControlResponse& response)
462 {
463  std::ostringstream os;
464  os << "Couldn't enable local control header "
465  << "(code: " << response.getCode() << ", info: " << response.getText() << ")";
466  BOOST_THROW_EXCEPTION(Error(os.str()));
467 }
468 
469 } // namespace rib
470 } // namespace nfd
void addSectionHandler(const std::string &sectionName, ConfigSectionHandler subscriber)
setup notification of configuration file sections
Definition: config-file.cpp:79
#define NFD_LOG_DEBUG(expression)
Definition: logger.hpp:161
configuration file parsing utility
Definition: config-file.hpp:50
void cancel(const EventId &eventId)
cancel a scheduled event
Definition: scheduler.cpp:53
void disable()
disable automatic prefix propagation
void enable()
enable automatic prefix propagation
void enableLocalControlHeader()
Definition: rib-manager.cpp:92
a collection of common functions shared by all NFD managers and RIB manager, such as communicating wi...
#define NFD_LOG_INFO(expression)
Definition: logger.hpp:162
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
Definition: algorithm.hpp:32
void onRibUpdateFailure(const RibUpdate &update, uint32_t code, const std::string &error)
void loadConfig(const ConfigSection &configSection)
load the "auto_prefix_propagate" section from config file
boost::property_tree::ptree ConfigSection
Definition: config-file.hpp:35
void extractRequester(const Interest &interest, ndn::mgmt::AcceptContinuation accept)
extract a requester from a ControlCommand request
void setConfigFile(ConfigFile &configFile)
#define NFD_LOG_INIT(name)
Definition: logger.hpp:122
EventId schedule(const time::nanoseconds &after, const Scheduler::Event &event)
schedule an event
Definition: scheduler.cpp:47
#define NFD_LOG_TRACE(expression)
Definition: logger.hpp:160
void onRibUpdateSuccess(const RibUpdate &update)
void onRouteExpiration(const Name &prefix, const Route &route)
Definition: rib.cpp:187
void beginApplyUpdate(const RibUpdate &update, const UpdateSuccessCallback &onSuccess, const UpdateFailureCallback &onFailure)
passes the provided RibUpdateBatch to FibUpdater to calculate and send FibUpdates.
Definition: rib.cpp:338
RibManager(Dispatcher &dispatcher, ndn::Face &face, ndn::KeyChain &keyChain)
Definition: rib-manager.cpp:45