stream-transport-impl.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2022 Regents of the University of California.
4  *
5  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6  *
7  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8  * terms of the GNU Lesser General Public License as published by the Free Software
9  * Foundation, either version 3 of the License, or (at your option) any later version.
10  *
11  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14  *
15  * You should have received copies of the GNU General Public License and GNU Lesser
16  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17  * <http://www.gnu.org/licenses/>.
18  *
19  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20  */
21 
22 #ifndef NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
23 #define NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
24 
26 
27 #include <boost/asio/steady_timer.hpp>
28 #include <boost/asio/write.hpp>
29 
30 #include <list>
31 #include <queue>
32 
33 namespace ndn {
34 namespace detail {
35 
41 template<typename BaseTransport, typename Protocol>
42 class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
43 {
44 public:
46  using TransmissionQueue = std::queue<Block, std::list<Block>>;
47 
48  StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
49  : m_transport(transport)
50  , m_socket(ioService)
51  , m_connectTimer(ioService)
52  {
53  }
54 
55  void
56  connect(const typename Protocol::endpoint& endpoint)
57  {
58  if (m_isConnecting) {
59  return;
60  }
61  m_isConnecting = true;
62 
63  // Wait at most 4 seconds to connect
65  m_connectTimer.expires_from_now(std::chrono::seconds(4));
66  m_connectTimer.async_wait([self = this->shared_from_this()] (const auto& error) {
67  self->connectTimeoutHandler(error);
68  });
69 
70  m_socket.open();
71  m_socket.async_connect(endpoint, [self = this->shared_from_this()] (const auto& error) {
72  self->connectHandler(error);
73  });
74  }
75 
76  void
78  {
79  m_isConnecting = false;
80 
81  boost::system::error_code error; // to silently ignore all errors
82  m_connectTimer.cancel(error);
83  m_socket.cancel(error);
84  m_socket.close(error);
85 
86  m_transport.m_isConnected = false;
87  m_transport.m_isReceiving = false;
88  TransmissionQueue{}.swap(m_transmissionQueue); // clear the queue
89  }
90 
91  void
93  {
94  if (m_isConnecting)
95  return;
96 
97  if (m_transport.m_isReceiving) {
98  m_transport.m_isReceiving = false;
99  m_socket.cancel();
100  }
101  }
102 
103  void
105  {
106  if (m_isConnecting)
107  return;
108 
109  if (!m_transport.m_isReceiving) {
110  m_transport.m_isReceiving = true;
111  m_inputBufferSize = 0;
112  asyncReceive();
113  }
114  }
115 
116  void
117  send(const Block& block)
118  {
119  m_transmissionQueue.push(block);
120 
121  if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
122  asyncWrite();
123  }
124  // if not connected or there's another transmission in progress (m_transmissionQueue.size() > 1),
125  // the next write will be scheduled either in connectHandler or in asyncWriteHandler
126  }
127 
128 protected:
129  void
130  connectHandler(const boost::system::error_code& error)
131  {
132  m_isConnecting = false;
133  m_connectTimer.cancel();
134 
135  if (error) {
136  m_transport.m_isConnected = false;
137  m_transport.close();
138  NDN_THROW(Transport::Error(error, "error while connecting to the forwarder"));
139  }
140 
141  m_transport.m_isConnected = true;
142 
143  if (!m_transmissionQueue.empty()) {
144  resume();
145  asyncWrite();
146  }
147  }
148 
149  void
150  connectTimeoutHandler(const boost::system::error_code& error)
151  {
152  if (error) // e.g., cancelled timer
153  return;
154 
155  m_transport.close();
156  NDN_THROW(Transport::Error(error, "error while connecting to the forwarder"));
157  }
158 
159  void
161  {
162  BOOST_ASSERT(!m_transmissionQueue.empty());
163  boost::asio::async_write(m_socket, boost::asio::buffer(m_transmissionQueue.front()),
164  // capture a copy of the shared_ptr to "this" to prevent deallocation
165  [this, self = this->shared_from_this()] (const auto& error, size_t) {
166  if (error) {
167  if (error == boost::system::errc::operation_canceled) {
168  // async receive has been explicitly cancelled (e.g., socket close)
169  return;
170  }
171  m_transport.close();
172  NDN_THROW(Transport::Error(error, "error while writing data to socket"));
173  }
174 
175  if (!m_transport.m_isConnected) {
176  return; // queue has been already cleared
177  }
178 
179  BOOST_ASSERT(!m_transmissionQueue.empty());
180  m_transmissionQueue.pop();
181 
182  if (!m_transmissionQueue.empty()) {
183  asyncWrite();
184  }
185  });
186  }
187 
188  void
190  {
191  m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
192  MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
193  // capture a copy of the shared_ptr to "this" to prevent deallocation
194  [this, self = this->shared_from_this()] (const auto& error, size_t nBytesRecvd) {
195  if (error) {
196  if (error == boost::system::errc::operation_canceled) {
197  // async receive has been explicitly cancelled (e.g., socket close)
198  return;
199  }
200  m_transport.close();
201  NDN_THROW(Transport::Error(error, "error while receiving data from socket"));
202  }
203 
204  m_inputBufferSize += nBytesRecvd;
205  // do magic
206 
207  std::size_t offset = 0;
208  bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
209  if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
210  m_transport.close();
211  NDN_THROW(Transport::Error("input buffer full, but a valid TLV cannot be decoded"));
212  }
213 
214  if (offset > 0) {
215  if (offset != m_inputBufferSize) {
216  std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
217  m_inputBufferSize -= offset;
218  }
219  else {
220  m_inputBufferSize = 0;
221  }
222  }
223 
224  asyncReceive();
225  });
226  }
227 
228  bool
229  processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
230  {
231  while (offset < nBytesAvailable) {
232  bool isOk = false;
233  Block element;
234  std::tie(isOk, element) = Block::fromBuffer({buffer + offset, nBytesAvailable - offset});
235  if (!isOk)
236  return false;
237 
238  m_transport.m_receiveCallback(element);
239  offset += element.size();
240  }
241  return true;
242  }
243 
244 protected:
245  BaseTransport& m_transport;
246 
247  typename Protocol::socket m_socket;
248  uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
249  size_t m_inputBufferSize = 0;
251  boost::asio::steady_timer m_connectTimer;
252  bool m_isConnecting = false;
253 };
254 
255 } // namespace detail
256 } // namespace ndn
257 
258 #endif // NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
Represents a TLV element of the NDN packet format.
Definition: block.hpp:45
size_t size() const
Return the size of the encoded wire, i.e.
Definition: block.cpp:305
static std::tuple< bool, Block > fromBuffer(ConstBufferPtr buffer, size_t offset=0)
Try to parse Block from a wire buffer.
Definition: block.cpp:167
Implementation detail of a Boost.Asio-based stream-oriented transport.
bool processAllReceived(uint8_t *buffer, size_t &offset, size_t nBytesAvailable)
boost::asio::steady_timer m_connectTimer
std::queue< Block, std::list< Block > > TransmissionQueue
void connect(const typename Protocol::endpoint &endpoint)
StreamTransportImpl(BaseTransport &transport, boost::asio::io_service &ioService)
void connectHandler(const boost::system::error_code &error)
void connectTimeoutHandler(const boost::system::error_code &error)
#define NDN_THROW(e)
Definition: exception.hpp:61
boost::chrono::seconds seconds
Definition: time.hpp:47
Definition: data.cpp:25
const size_t MAX_NDN_PACKET_SIZE
Practical size limit of a network-layer packet.
Definition: tlv.hpp:41