All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
face.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
8 #include "common.hpp"
9 
10 #include "face.hpp"
11 
13 
14 #include "util/time.hpp"
15 #include "util/random.hpp"
16 #include "util/config-file.hpp"
17 #include <cstdlib>
18 
22 
23 namespace ndn {
24 
26 {
27  const std::string socketName = UnixTransport::getDefaultSocketName(m_config);
28  construct(shared_ptr<Transport>(new UnixTransport(socketName)),
29  make_shared<boost::asio::io_service>());
30 }
31 
32 Face::Face(const shared_ptr<boost::asio::io_service>& ioService)
33 {
34  const std::string socketName = UnixTransport::getDefaultSocketName(m_config);
35  construct(shared_ptr<Transport>(new UnixTransport(socketName)),
36  ioService);
37 }
38 
40 {
41 public:
42  void
43  operator()(boost::asio::io_service*)
44  {
45  }
46 };
47 
48 Face::Face(boost::asio::io_service& ioService)
49 {
50  const std::string socketName = UnixTransport::getDefaultSocketName(m_config);
51  construct(shared_ptr<Transport>(new UnixTransport(socketName)),
52  shared_ptr<boost::asio::io_service>(&ioService, NullIoDeleter()));
53 }
54 
55 Face::Face(const std::string& host, const std::string& port/* = "6363"*/)
56 {
57  construct(shared_ptr<Transport>(new TcpTransport(host, port)),
58  make_shared<boost::asio::io_service>());
59 }
60 
61 Face::Face(const shared_ptr<Transport>& transport)
62 {
63  construct(transport,
64  make_shared<boost::asio::io_service>());
65 }
66 
67 Face::Face(const shared_ptr<Transport>& transport,
68  boost::asio::io_service& ioService)
69 {
70  construct(transport,
71  shared_ptr<boost::asio::io_service>(&ioService, NullIoDeleter()));
72 }
73 
74 void
75 Face::setController(const shared_ptr<Controller>& controller)
76 {
77  m_fwController = controller;
78 }
79 
80 void
81 Face::construct(const shared_ptr<Transport>& transport,
82  const shared_ptr<boost::asio::io_service>& ioService)
83 {
84  m_pitTimeoutCheckTimerActive = false;
85  m_transport = transport;
86  m_ioService = ioService;
87 
88  m_pitTimeoutCheckTimer = make_shared<monotonic_deadline_timer>(boost::ref(*m_ioService));
89  m_processEventsTimeoutTimer = make_shared<monotonic_deadline_timer>(boost::ref(*m_ioService));
90 
91  std::string protocol = "nrd-0.1";
92 
93  try
94  {
95  protocol = m_config.getParsedConfiguration().get<std::string>("protocol");
96  }
97  catch (boost::property_tree::ptree_bad_path& error)
98  {
99  // protocol not specified
100  }
101  catch (boost::property_tree::ptree_bad_data& error)
102  {
103  throw ConfigFile::Error(error.what());
104  }
105 
106  if (isSupportedNrdProtocol(protocol))
107  {
108  m_fwController = make_shared<nrd::Controller>(boost::ref(*this));
109  }
110  else if (isSupportedNfdProtocol(protocol))
111  {
112  m_fwController = make_shared<nfd::Controller>(boost::ref(*this));
113  }
114  else if (isSupportedNdndProtocol(protocol))
115  {
116  m_fwController = make_shared<ndnd::Controller>(boost::ref(*this));
117  }
118  else
119  {
120  throw Face::Error("Cannot create controller for unsupported protocol \"" + protocol + "\"");
121  }
122 }
123 
124 const PendingInterestId*
125 Face::expressInterest(const Interest& interest, const OnData& onData, const OnTimeout& onTimeout)
126 {
127  if (!m_transport->isConnected())
128  m_transport->connect(*m_ioService,
129  bind(&Face::onReceiveElement, this, _1));
130 
131  shared_ptr<const Interest> interestToExpress(new Interest(interest));
132 
133  // If the same ioService thread, dispatch directly calls the method
134  m_ioService->dispatch(bind(&Face::asyncExpressInterest, this,
135  interestToExpress, onData, onTimeout));
136 
137  return reinterpret_cast<const PendingInterestId*>(interestToExpress.get());
138 }
139 
140 const PendingInterestId*
142  const Interest& tmpl,
143  const OnData& onData, const OnTimeout& onTimeout/* = OnTimeout()*/)
144 {
145  return expressInterest(Interest(name,
146  tmpl.getMinSuffixComponents(),
147  tmpl.getMaxSuffixComponents(),
148  tmpl.getExclude(),
149  tmpl.getChildSelector(),
150  tmpl.getMustBeFresh(),
151  tmpl.getScope(),
152  tmpl.getInterestLifetime()),
153  onData, onTimeout);
154 }
155 
156 void
157 Face::asyncExpressInterest(const shared_ptr<const Interest>& interest,
158  const OnData& onData, const OnTimeout& onTimeout)
159 {
160  if (!m_transport->isExpectingData())
161  m_transport->resume();
162 
163  m_pendingInterestTable.push_back(shared_ptr<PendingInterest>(new PendingInterest
164  (interest, onData, onTimeout)));
165 
166  if (!interest->getLocalControlHeader().empty(false, true))
167  {
168  // encode only NextHopFaceId towards the forwarder
169  m_transport->send(interest->getLocalControlHeader().wireEncode(*interest, false, true),
170  interest->wireEncode());
171  }
172  else
173  {
174  m_transport->send(interest->wireEncode());
175  }
176 
177  if (!m_pitTimeoutCheckTimerActive) {
178  m_pitTimeoutCheckTimerActive = true;
179  m_pitTimeoutCheckTimer->expires_from_now(time::milliseconds(100));
180  m_pitTimeoutCheckTimer->async_wait(bind(&Face::checkPitExpire, this));
181  }
182 }
183 
184 void
185 Face::put(const Data& data)
186 {
187  if (!m_transport->isConnected())
188  m_transport->connect(*m_ioService,
189  bind(&Face::onReceiveElement, this, _1));
190 
191  if (!data.getLocalControlHeader().empty(false, true))
192  {
193  m_transport->send(data.getLocalControlHeader().wireEncode(data, false, true),
194  data.wireEncode());
195  }
196  else
197  {
198  m_transport->send(data.wireEncode());
199  }
200 }
201 
202 void
203 Face::removePendingInterest(const PendingInterestId* pendingInterestId)
204 {
205  m_ioService->post(bind(&Face::asyncRemovePendingInterest, this, pendingInterestId));
206 }
207 
208 
209 void
210 Face::asyncRemovePendingInterest(const PendingInterestId* pendingInterestId)
211 {
212  m_pendingInterestTable.remove_if(MatchPendingInterestId(pendingInterestId));
213 }
214 
215 const RegisteredPrefixId*
217  const OnInterest& onInterest,
218  const OnSetInterestFilterFailed& onSetInterestFilterFailed)
219 {
220  shared_ptr<RegisteredPrefix> prefixToRegister(new RegisteredPrefix(prefix, onInterest));
221 
222  m_fwController->selfRegisterPrefix(prefixToRegister->getPrefix(),
223  bind(&RegisteredPrefixTable::push_back,
224  &m_registeredPrefixTable, prefixToRegister),
225  bind(onSetInterestFilterFailed,
226  prefixToRegister->getPrefix(), _1));
227 
228  return reinterpret_cast<const RegisteredPrefixId*>(prefixToRegister.get());
229 }
230 
231 void
232 Face::unsetInterestFilter(const RegisteredPrefixId* registeredPrefixId)
233 {
234  m_ioService->post(bind(&Face::asyncUnsetInterestFilter, this, registeredPrefixId));
235 }
236 
237 void
238 Face::asyncUnsetInterestFilter(const RegisteredPrefixId* registeredPrefixId)
239 {
240  RegisteredPrefixTable::iterator i = std::find_if(m_registeredPrefixTable.begin(),
241  m_registeredPrefixTable.end(),
242  MatchRegisteredPrefixId(registeredPrefixId));
243  if (i != m_registeredPrefixTable.end())
244  {
245  m_fwController->selfDeregisterPrefix((*i)->getPrefix(),
246  bind(&Face::finalizeUnsetInterestFilter, this, i),
248  }
249 
250  // there cannot be two registered prefixes with the same id
251 }
252 
253 void
254 Face::finalizeUnsetInterestFilter(RegisteredPrefixTable::iterator item)
255 {
256  m_registeredPrefixTable.erase(item);
257 
258  if (!m_pitTimeoutCheckTimerActive && m_registeredPrefixTable.empty())
259  {
260  m_transport->pause();
261  if (!m_ioServiceWork) {
262  m_processEventsTimeoutTimer->cancel();
263  }
264  }
265 }
266 
267 void
268 Face::processEvents(const time::milliseconds& timeout/* = time::milliseconds::zero()*/,
269  bool keepThread/* = false*/)
270 {
271  try
272  {
273  if (timeout < time::milliseconds::zero())
274  {
275  // do not block if timeout is negative, but process pending events
276  m_ioService->poll();
277  return;
278  }
279 
280  if (timeout > time::milliseconds::zero())
281  {
282  m_processEventsTimeoutTimer->expires_from_now(time::milliseconds(timeout));
283  m_processEventsTimeoutTimer->async_wait(&fireProcessEventsTimeout);
284  }
285 
286  if (keepThread) {
287  // work will ensure that m_ioService is running until work object exists
288  m_ioServiceWork = make_shared<boost::asio::io_service::work>(boost::ref(*m_ioService));
289  }
290 
291  m_ioService->run();
292  m_ioService->reset(); // so it is possible to run processEvents again (if necessary)
293  }
294  catch (Face::ProcessEventsTimeout&)
295  {
296  // break
297  m_ioService->reset();
298  }
299  catch (std::exception&)
300  {
301  m_ioService->reset();
302  m_pendingInterestTable.clear();
303  m_registeredPrefixTable.clear();
304  throw;
305  }
306 }
307 
308 void
310 {
311  m_ioService->post(bind(&Face::asyncShutdown, this));
312 }
313 
314 void
315 Face::asyncShutdown()
316 {
317  m_pendingInterestTable.clear();
318  m_registeredPrefixTable.clear();
319 
320  m_transport->close();
321  m_pitTimeoutCheckTimer->cancel();
322  m_processEventsTimeoutTimer->cancel();
323  m_pitTimeoutCheckTimerActive = false;
324 }
325 
326 void
327 Face::fireProcessEventsTimeout(const boost::system::error_code& error)
328 {
329  if (!error) // can fire for some other reason, e.g., cancelled
330  throw Face::ProcessEventsTimeout();
331 }
332 
333 void
334 Face::checkPitExpire()
335 {
336  // Check for PIT entry timeouts.
337  time::steady_clock::TimePoint now = time::steady_clock::now();
338 
339  PendingInterestTable::iterator i = m_pendingInterestTable.begin();
340  while (i != m_pendingInterestTable.end())
341  {
342  if ((*i)->isTimedOut(now))
343  {
344  // Save the PendingInterest and remove it from the PIT. Then call the callback.
345  shared_ptr<PendingInterest> pendingInterest = *i;
346 
347  i = m_pendingInterestTable.erase(i);
348 
349  pendingInterest->callTimeout();
350  }
351  else
352  ++i;
353  }
354 
355  if (!m_pendingInterestTable.empty()) {
356  m_pitTimeoutCheckTimerActive = true;
357 
358  m_pitTimeoutCheckTimer->expires_from_now(time::milliseconds(100));
359  m_pitTimeoutCheckTimer->async_wait(bind(&Face::checkPitExpire, this));
360  }
361  else {
362  m_pitTimeoutCheckTimerActive = false;
363 
364  if (m_registeredPrefixTable.empty()) {
365  m_transport->pause();
366  if (!m_ioServiceWork) {
367  m_processEventsTimeoutTimer->cancel();
368  }
369  }
370  }
371 }
372 
373 
374 void
375 Face::onReceiveElement(const Block& blockFromDaemon)
376 {
377  const Block& block = nfd::LocalControlHeader::getPayload(blockFromDaemon);
378 
379  if (block.type() == Tlv::Interest)
380  {
381  shared_ptr<Interest> interest(new Interest());
382  interest->wireDecode(block);
383  if (&block != &blockFromDaemon)
384  interest->getLocalControlHeader().wireDecode(blockFromDaemon);
385 
386  processInterestFilters(*interest);
387  }
388  else if (block.type() == Tlv::Data)
389  {
390  shared_ptr<Data> data(new Data());
391  data->wireDecode(block);
392  if (&block != &blockFromDaemon)
393  data->getLocalControlHeader().wireDecode(blockFromDaemon);
394 
395  satisfyPendingInterests(*data);
396 
397  if (m_pendingInterestTable.empty()) {
398  m_pitTimeoutCheckTimer->cancel(); // this will cause checkPitExpire invocation
399  }
400  }
401  // ignore any other type
402 }
403 
404 void
405 Face::satisfyPendingInterests(Data& data)
406 {
407  for (PendingInterestTable::iterator i = m_pendingInterestTable.begin ();
408  i != m_pendingInterestTable.end();
409  )
410  {
411  if ((*i)->getInterest()->matchesData(data))
412  {
413  // Copy pointers to the objects and remove the PIT entry before calling the callback.
414  OnData onData = (*i)->getOnData();
415  shared_ptr<const Interest> interest = (*i)->getInterest();
416 
417  PendingInterestTable::iterator next = i;
418  ++next;
419  m_pendingInterestTable.erase(i);
420  i = next;
421 
422  if (static_cast<bool>(onData)) {
423  onData(*interest, data);
424  }
425  }
426  else
427  ++i;
428  }
429 }
430 
431 void
432 Face::processInterestFilters(Interest& interest)
433 {
434  for (RegisteredPrefixTable::iterator i = m_registeredPrefixTable.begin();
435  i != m_registeredPrefixTable.end();
436  ++i)
437  {
438  if ((*i)->getPrefix().isPrefixOf(interest.getName()))
439  {
440  (*i)->getOnInterest()((*i)->getPrefix(), interest);
441  }
442  }
443 }
444 
445 } // namespace ndn
int getMinSuffixComponents() const
Definition: interest.hpp:320
time_point TimePoint
Definition: time.hpp:84
int getMaxSuffixComponents() const
Definition: interest.hpp:336
static std::string getDefaultSocketName(const ConfigFile &config)
Determine the default NFD unix socket.
const Parsed & getParsedConfiguration() const
Definition: config-file.hpp:94
An Interest holds a Name and other fields for an interest.
Definition: interest.hpp:24
function< void(const Interest &)> OnTimeout
An OnTimeout function object is used to pass a callback to expressInterest.
Definition: face.hpp:37
function< void(const Name &, const Interest &)> OnInterest
An OnInterest function object is used to pass a callback to registerPrefix.
Definition: face.hpp:42
const time::milliseconds & getInterestLifetime() const
Definition: interest.hpp:230
int getChildSelector() const
Definition: interest.hpp:384
size_t wireEncode(EncodingImpl< T > &block, bool unsignedPortion=false) const
Fast encoding or block size estimation.
Definition: data.hpp:242
Face()
Create a new Face using the default transport (UnixTransport)
Definition: face.cpp:25
void operator()(boost::asio::io_service *)
Definition: face.cpp:43
static const Block & getPayload(const Block &wire)
void setController(const shared_ptr< Controller > &controller)
Set controller used for prefix registration.
Definition: face.cpp:75
int getMustBeFresh() const
Definition: interest.hpp:400
function< void(const std::string &)> FailCallback
Definition: controller.hpp:24
const RegisteredPrefixId * setInterestFilter(const Name &prefix, const OnInterest &onInterest, const OnSetInterestFilterFailed &onSetInterestFilterFailed)
Register prefix with the connected NDN hub and call onInterest when a matching interest is received...
Definition: face.cpp:216
shared_ptr< boost::asio::io_service > ioService()
Get shared_ptr of the IO service object.
Definition: face.hpp:279
void shutdown()
Shutdown face operations.
Definition: face.cpp:309
function< void(const Name &, const std::string &)> OnSetInterestFilterFailed
An OnRegisterFailed function object is used to report when registerPrefix fails.
Definition: face.hpp:47
const Exclude & getExclude() const
Definition: interest.hpp:368
Functor to match pending interests against PendingInterestId.
A Name holds an array of Name::Component and represents an NDN name.
Definition: name.hpp:26
const PendingInterestId * expressInterest(const Interest &interest, const OnData &onData, const OnTimeout &onTimeout=OnTimeout())
Express Interest.
Definition: face.cpp:125
void unsetInterestFilter(const RegisteredPrefixId *registeredPrefixId)
Remove the registered prefix entry with the registeredPrefixId from the pending interest table...
Definition: face.cpp:232
bool empty(bool encodeIncomingFaceId, bool encodeNextHopFaceId) const
int getScope() const
Definition: interest.hpp:214
function< void(const Interest &, Data &)> OnData
An OnData function object is used to pass a callback to expressInterest.
Definition: face.hpp:27
Functor to match pending interests against PendingInterestId.
Block wireEncode(const U &payload, bool encodeIncomingFaceId, bool encodeNextHopFaceId) const
Create wire encoding with options LocalControlHeader and the supplied item.
void processEvents(const time::milliseconds &timeout=time::milliseconds::zero(), bool keepThread=false)
Process any data to receive or call timeout callbacks.
Definition: face.cpp:268
nfd::LocalControlHeader & getLocalControlHeader()
Definition: data.hpp:472
void removePendingInterest(const PendingInterestId *pendingInterestId)
Remove the pending interest entry with the pendingInterestId from the pending interest table...
Definition: face.cpp:203
void put(const Data &data)
Publish data packet.
Definition: face.cpp:185