stream-transport-impl.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
22 #ifndef NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
23 #define NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
24 
25 #include "transport.hpp"
26 
27 #include <boost/asio.hpp>
28 #include <list>
29 
30 namespace ndn {
31 
37 template<typename BaseTransport, typename Protocol>
38 class StreamTransportImpl : public enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
39 {
40 public:
42  typedef std::list<Block> BlockSequence;
43  typedef std::list<BlockSequence> TransmissionQueue;
44 
45  StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
46  : m_transport(transport)
47  , m_socket(ioService)
49  , m_isConnecting(false)
50  , m_connectTimer(ioService)
51  {
52  }
53 
54  void
55  connect(const typename Protocol::endpoint& endpoint)
56  {
57  if (!m_isConnecting) {
58  m_isConnecting = true;
59 
60  // Wait at most 4 seconds to connect
62  m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
63  m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this->shared_from_this(), _1));
64 
65  m_socket.open();
66  m_socket.async_connect(endpoint, bind(&Impl::connectHandler, this->shared_from_this(), _1));
67  }
68  }
69 
70  void
72  {
73  m_isConnecting = false;
74 
75  boost::system::error_code error; // to silently ignore all errors
76  m_connectTimer.cancel(error);
77  m_socket.cancel(error);
78  m_socket.close(error);
79 
80  m_transport.m_isConnected = false;
81  m_transport.m_isReceiving = false;
82  m_transmissionQueue.clear();
83  }
84 
85  void
87  {
88  if (m_isConnecting)
89  return;
90 
91  if (m_transport.m_isReceiving) {
92  m_transport.m_isReceiving = false;
93  m_socket.cancel();
94  }
95  }
96 
97  void
99  {
100  if (m_isConnecting)
101  return;
102 
103  if (!m_transport.m_isReceiving) {
104  m_transport.m_isReceiving = true;
105  m_inputBufferSize = 0;
106  asyncReceive();
107  }
108  }
109 
110  void
111  send(const Block& wire)
112  {
113  BlockSequence sequence;
114  sequence.push_back(wire);
115  send(std::move(sequence));
116  }
117 
118  void
119  send(const Block& header, const Block& payload)
120  {
121  BlockSequence sequence;
122  sequence.push_back(header);
123  sequence.push_back(payload);
124  send(std::move(sequence));
125  }
126 
127 protected:
128  void
129  connectHandler(const boost::system::error_code& error)
130  {
131  m_isConnecting = false;
132  m_connectTimer.cancel();
133 
134  if (!error) {
135  m_transport.m_isConnected = true;
136 
137  if (!m_transmissionQueue.empty()) {
138  resume();
139  asyncWrite();
140  }
141  }
142  else {
143  m_transport.m_isConnected = false;
144  m_transport.close();
145  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
146  }
147  }
148 
149  void
150  connectTimeoutHandler(const boost::system::error_code& error)
151  {
152  if (error) // e.g., cancelled timer
153  return;
154 
155  m_transport.close();
156  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
157  }
158 
159  void
160  send(BlockSequence&& sequence)
161  {
162  m_transmissionQueue.emplace_back(sequence);
163 
164  if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
165  asyncWrite();
166  }
167 
168  // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
169  // next write will be scheduled either in connectHandler or in asyncWriteHandler
170  }
171 
172  void
174  {
175  BOOST_ASSERT(!m_transmissionQueue.empty());
176  boost::asio::async_write(m_socket, m_transmissionQueue.front(),
177  bind(&Impl::handleAsyncWrite, this->shared_from_this(), _1, m_transmissionQueue.begin()));
178  }
179 
180  void
181  handleAsyncWrite(const boost::system::error_code& error, TransmissionQueue::iterator queueItem)
182  {
183  if (error) {
184  if (error == boost::system::errc::operation_canceled) {
185  // async receive has been explicitly cancelled (e.g., socket close)
186  return;
187  }
188 
189  m_transport.close();
190  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
191  }
192 
193  if (!m_transport.m_isConnected) {
194  return; // queue has been already cleared
195  }
196 
197  m_transmissionQueue.erase(queueItem);
198 
199  if (!m_transmissionQueue.empty()) {
200  asyncWrite();
201  }
202  }
203 
204  void
206  {
207  m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
209  bind(&Impl::handleAsyncReceive, this->shared_from_this(), _1, _2));
210  }
211 
212  void
213  handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
214  {
215  if (error) {
216  if (error == boost::system::errc::operation_canceled) {
217  // async receive has been explicitly cancelled (e.g., socket close)
218  return;
219  }
220 
221  m_transport.close();
222  BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
223  }
224 
225  m_inputBufferSize += nBytesRecvd;
226  // do magic
227 
228  std::size_t offset = 0;
229  bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
230  if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
231  m_transport.close();
232  BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
233  "input buffer full, but a valid TLV cannot be "
234  "decoded"));
235  }
236 
237  if (offset > 0) {
238  if (offset != m_inputBufferSize) {
240  m_inputBufferSize -= offset;
241  }
242  else {
243  m_inputBufferSize = 0;
244  }
245  }
246 
247  asyncReceive();
248  }
249 
250  bool
251  processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
252  {
253  while (offset < nBytesAvailable) {
254  bool isOk = false;
255  Block element;
256  std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
257  if (!isOk)
258  return false;
259 
260  m_transport.receive(element);
261  offset += element.size();
262  }
263  return true;
264  }
265 
266 protected:
267  BaseTransport& m_transport;
268 
269  typename Protocol::socket m_socket;
272 
273  TransmissionQueue m_transmissionQueue;
275 
276  boost::asio::deadline_timer m_connectTimer;
277 };
278 
279 } // namespace ndn
280 
281 #endif // NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
bool processAllReceived(uint8_t *buffer, size_t &offset, size_t nBytesAvailable)
void connectTimeoutHandler(const boost::system::error_code &error)
Copyright (c) 2013-2017 Regents of the University of California.
Definition: common.hpp:66
static std::tuple< bool, Block > fromBuffer(ConstBufferPtr buffer, size_t offset)
Try to parse Block from a wire buffer.
Definition: block.cpp:195
void send(BlockSequence &&sequence)
StreamTransportImpl(BaseTransport &transport, boost::asio::io_service &ioService)
Represents a TLV element of NDN packet format.
Definition: block.hpp:42
StreamTransportImpl< BaseTransport, Protocol > Impl
implementation detail of a Boost.Asio-based stream-oriented transport
void handleAsyncReceive(const boost::system::error_code &error, std::size_t nBytesRecvd)
void send(const Block &wire)
size_t size() const
Get size of encoded wire, including Type-Length-Value.
Definition: block.cpp:299
void connect(const typename Protocol::endpoint &endpoint)
void handleAsyncWrite(const boost::system::error_code &error, TransmissionQueue::iterator queueItem)
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE]
void send(const Block &header, const Block &payload)
boost::asio::deadline_timer m_connectTimer
std::list< BlockSequence > TransmissionQueue
void connectHandler(const boost::system::error_code &error)
const size_t MAX_NDN_PACKET_SIZE
practical limit of network layer packet size