All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
stream-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
7 #ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
8 #define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
9 
10 #include "../common.hpp"
11 
12 namespace ndn {
13 
14 const size_t MAX_LENGTH = 9000;
15 
16 template<class BaseTransport, class Protocol>
18 {
19 public:
20  typedef BaseTransport base_transport;
21  typedef Protocol protocol;
23 
24  StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
25  : m_transport(transport)
26  , m_socket(ioService)
28  , m_connectionInProgress(false)
29  , m_connectTimer(ioService)
30  {
31  }
32 
33  void
34  connectHandler(const boost::system::error_code& error)
35  {
36  m_connectionInProgress = false;
37  m_connectTimer.cancel();
38 
39  if (!error)
40  {
41  resume();
42  m_transport.m_isConnected = true;
43 
44  for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
45  m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
46  bind(&impl::handle_async_send, this, _1, *i));
47 
48  for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
49  i != m_sendPairQueue.end(); ++i)
50  {
51  std::vector<boost::asio::const_buffer> buffer;
52  buffer.reserve(2);
53  buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
54  buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
55  m_socket.async_send(buffer,
56  bind(&impl::handle_async_send, this, _1, i->first, i->second));
57  }
58 
59  m_sendQueue.clear();
60  m_sendPairQueue.clear();
61  }
62  else
63  {
64  // may need to throw exception
65  m_transport.m_isConnected = false;
66  m_transport.close();
67  throw Transport::Error(error, "error while connecting to the forwarder");
68  }
69  }
70 
71  void
72  connectTimeoutHandler(const boost::system::error_code& error)
73  {
74  if (error) // e.g., cancelled timer
75  return;
76 
77  m_connectionInProgress = false;
78  m_transport.m_isConnected = false;
79  m_transport.m_isExpectingData = false;
80  m_socket.close();
81  throw Transport::Error(error, "error while connecting to the forwarder");
82  }
83 
84  void
85  connect(const typename protocol::endpoint& endpoint)
86  {
89 
90  // Wait at most 4 time::seconds to connect
92  m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
93  m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
94 
95  m_socket.open();
96  m_socket.async_connect(endpoint,
97  bind(&impl::connectHandler, this, _1));
98  }
99  }
100 
101  void
103  {
104  m_connectTimer.cancel();
105  m_socket.close();
106  m_transport.m_isConnected = false;
107  m_transport.m_isExpectingData = false;
108  m_sendQueue.clear();
109  m_sendPairQueue.clear();
110  }
111 
112  void
114  {
115  if (m_transport.m_isExpectingData)
116  {
117  m_transport.m_isExpectingData = false;
118  m_socket.cancel();
119  }
120  }
121 
122  void
124  {
125  if (!m_transport.m_isExpectingData)
126  {
127  m_transport.m_isExpectingData = true;
128  m_inputBufferSize = 0;
129  m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
130  bind(&impl::handle_async_receive, this, _1, _2));
131  }
132  }
133 
134  void
135  send(const Block& wire)
136  {
137  if (!m_transport.m_isConnected)
138  m_sendQueue.push_back(wire);
139  else
140  m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
141  bind(&impl::handle_async_send, this, _1, wire));
142  }
143 
144  void
145  send(const Block& header, const Block& payload)
146  {
147  if (!m_transport.m_isConnected)
148  {
149  m_sendPairQueue.push_back(std::make_pair(header, payload));
150  }
151  else
152  {
153  std::vector<boost::asio::const_buffer> buffers;
154  buffers.reserve(2);
155  buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
156  buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
157 
158  m_socket.async_send(buffers,
159  bind(&impl::handle_async_send, this, _1, header, payload));
160  }
161  }
162 
163  inline bool
164  processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
165  {
166  Block element;
167  while(offset < availableSize)
168  {
169  bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
170  if (!ok)
171  return false;
172 
173  m_transport.receive(element);
174  offset += element.size();
175  }
176  return true;
177  }
178 
179  void
180  handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
181  {
182  if (error)
183  {
184  if (error == boost::system::errc::operation_canceled) {
185  // async receive has been explicitly cancelled (e.g., socket close)
186  return;
187  }
188 
189  m_socket.close(); // closing at this point may not be that necessary
190  m_transport.m_isConnected = true;
191  throw Transport::Error(error, "error while receiving data from socket");
192  }
193 
194  m_inputBufferSize += bytes_recvd;
195  // do magic
196 
197  std::size_t offset = 0;
198  bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
199  if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
200  {
201  // very bad... should close connection
202  m_socket.close();
203  m_transport.m_isConnected = false;
204  m_transport.m_isExpectingData = false;
205  throw Transport::Error(boost::system::error_code(),
206  "input buffer full, but a valid TLV cannot be decoded");
207  }
208 
209  if (offset > 0)
210  {
211  if (offset != m_inputBufferSize)
212  {
213  std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
214  m_inputBuffer);
215  m_inputBufferSize -= offset;
216  }
217  else
218  {
219  m_inputBufferSize = 0;
220  }
221  }
222 
223  m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
225  bind(&impl::handle_async_receive, this, _1, _2));
226  }
227 
228  void
229  handle_async_send(const boost::system::error_code& error, const Block& wire)
230  {
231  // pass (needed to keep data block alive during the send)
232  }
233 
234  void
235  handle_async_send(const boost::system::error_code& error,
236  const Block& header, const Block& payload)
237  {
238  // pass (needed to keep data blocks alive during the send)
239  }
240 
241 protected:
243 
244  typename protocol::socket m_socket;
247 
248  std::list< Block > m_sendQueue;
249  std::list< std::pair<Block, Block> > m_sendPairQueue;
251 
252  boost::asio::deadline_timer m_connectTimer;
253 };
254 
255 
256 template<class BaseTransport, class Protocol>
257 class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
258 {
259 public:
260  typedef BaseTransport base_transport;
263 
264  StreamTransportWithResolverImpl(base_transport& transport, boost::asio::io_service& ioService)
265  : StreamTransportImpl<base_transport, protocol>(transport, ioService)
266  {
267  }
268 
269  void
270  resolveHandler(const boost::system::error_code& error,
271  typename protocol::resolver::iterator endpoint,
272  const shared_ptr<typename protocol::resolver>&)
273  {
274  if (error)
275  {
276  if (error == boost::system::errc::operation_canceled)
277  return;
278 
279  throw Transport::Error(error, "Error during resolution of host or port");
280  }
281 
282  typename protocol::resolver::iterator end;
283  if (endpoint == end)
284  {
285  this->m_connectionInProgress = false;
286  this->m_transport.m_isConnected = false;
287  this->m_transport.m_isExpectingData = false;
288  this->m_socket.close();
289  throw Transport::Error(error, "Unable to resolve because host or port");
290  }
291 
292  this->m_socket.async_connect(*endpoint,
293  bind(&impl::connectHandler, this, _1));
294  }
295 
296  void
297  connect(const typename protocol::resolver::query& query)
298  {
299  if (!this->m_connectionInProgress) {
300  this->m_connectionInProgress = true;
301 
302  // Wait at most 4 time::seconds to connect
304  this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
305  this->m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
306 
307  // typename boost::asio::ip::basic_resolver< protocol > resolver;
308  shared_ptr<typename protocol::resolver> resolver =
309  make_shared<typename protocol::resolver>(boost::ref(this->m_socket.get_io_service()));
310 
311  resolver->async_resolve(query, bind(&impl::resolveHandler, this, _1, _2, resolver));
312  }
313  }
314 };
315 
316 
317 } // namespace ndn
318 
319 #endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP
void connectTimeoutHandler(const boost::system::error_code &error)
const size_t MAX_LENGTH
std::list< Block > m_sendQueue
Class representing wire element of the NDN packet.
Definition: block.hpp:26
void send(const Block &wire)
size_t size() const
Definition: block.hpp:415
StreamTransportImpl< BaseTransport, Protocol > impl
StreamTransportImpl(base_transport &transport, boost::asio::io_service &ioService)
void resolveHandler(const boost::system::error_code &error, typename protocol::resolver::iterator endpoint, const shared_ptr< typename protocol::resolver > &)
bool processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
void handle_async_send(const boost::system::error_code &error, const Block &header, const Block &payload)
const uint8_t * wire() const
Definition: block.hpp:437
void send(const Block &header, const Block &payload)
void connect(const typename protocol::resolver::query &query)
static bool fromBuffer(const ConstBufferPtr &wire, size_t offset, Block &block)
Try to construct block from Buffer, referencing data block pointed by wire.
Definition: block.cpp:209
boost::asio::deadline_timer m_connectTimer
void handle_async_send(const boost::system::error_code &error, const Block &wire)
uint8_t m_inputBuffer[MAX_LENGTH]
StreamTransportWithResolverImpl(base_transport &transport, boost::asio::io_service &ioService)
StreamTransportWithResolverImpl< BaseTransport, Protocol > impl
void handle_async_receive(const boost::system::error_code &error, std::size_t bytes_recvd)
void connectHandler(const boost::system::error_code &error)
void connect(const typename protocol::endpoint &endpoint)
std::list< std::pair< Block, Block > > m_sendPairQueue