stream-transport-impl.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2024 Regents of the University of California.
4  *
5  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6  *
7  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8  * terms of the GNU Lesser General Public License as published by the Free Software
9  * Foundation, either version 3 of the License, or (at your option) any later version.
10  *
11  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14  *
15  * You should have received copies of the GNU General Public License and GNU Lesser
16  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17  * <http://www.gnu.org/licenses/>.
18  *
19  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20  */
21 
22 #ifndef NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
23 #define NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
24 
26 
27 #include <boost/asio/steady_timer.hpp>
28 #include <boost/asio/write.hpp>
29 #include <boost/lexical_cast.hpp>
30 
31 #include <array>
32 #include <list>
33 #include <queue>
34 
35 namespace ndn::detail {
36 
43 template<typename BaseTransport, typename Protocol>
44 class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
45 {
46 protected:
47  using TransmissionQueue = std::queue<Block, std::list<Block>>;
48 
49 public:
50  StreamTransportImpl(BaseTransport& transport, boost::asio::io_context& ioCtx)
51  : m_transport(transport)
52  , m_socket(ioCtx)
53  , m_connectTimer(ioCtx)
54  {
55  }
56 
57  void
58  connect(const typename Protocol::endpoint& endpoint)
59  {
60  if (m_transport.getState() == Transport::State::CONNECTING) {
61  return;
62  }
63 
64  m_endpoint = endpoint;
66 
67  // Wait at most 4 seconds to connect
69  m_connectTimer.expires_after(std::chrono::seconds(4));
70  m_connectTimer.async_wait([self = this->shared_from_this()] (const auto& ec) {
71  if (ec) // e.g., cancelled timer
72  return;
73 
74  self->m_transport.close();
75  NDN_THROW(Transport::Error(boost::system::errc::make_error_code(boost::system::errc::timed_out),
76  "could not connect to NDN forwarder at " +
77  boost::lexical_cast<std::string>(self->m_endpoint)));
78  });
79 
80  m_socket.async_connect(m_endpoint, [self = this->shared_from_this()] (const auto& ec) {
81  self->connectHandler(ec);
82  });
83  }
84 
85  void
87  {
89 
90  m_connectTimer.cancel();
91  boost::system::error_code error; // to silently ignore all errors
92  m_socket.cancel(error);
93  m_socket.close(error);
94 
95  TransmissionQueue{}.swap(m_transmissionQueue); // clear the queue
96  }
97 
98  void
100  {
101  if (m_transport.getState() == Transport::State::RUNNING) {
102  m_socket.cancel();
104  }
105  }
106 
107  void
109  {
110  if (m_transport.getState() == Transport::State::PAUSED) {
112  m_rxBufferSize = 0;
113  asyncReceive();
114  }
115  }
116 
117  void
118  send(const Block& block)
119  {
120  m_transmissionQueue.push(block);
121 
122  if (m_transport.getState() != Transport::State::CLOSED &&
124  m_transmissionQueue.size() == 1) {
125  asyncWrite();
126  }
127  // if not connected or there's another transmission in progress (m_transmissionQueue.size() > 1),
128  // the next write will be scheduled either in connectHandler or in asyncWriteHandler
129  }
130 
131 protected:
132  void
133  connectHandler(const boost::system::error_code& error)
134  {
135  m_connectTimer.cancel();
136 
137  if (error) {
138  if (error == boost::asio::error::operation_aborted) {
139  // async_connect was explicitly cancelled (e.g., socket close)
140  return;
141  }
142  m_transport.close();
143  NDN_THROW(Transport::Error(error, "could not connect to NDN forwarder at " +
144  boost::lexical_cast<std::string>(m_endpoint)));
145  }
146 
148 
149  if (!m_transmissionQueue.empty()) {
150  resume();
151  asyncWrite();
152  }
153  }
154 
155  void
157  {
158  BOOST_ASSERT(!m_transmissionQueue.empty());
159  boost::asio::async_write(m_socket, boost::asio::buffer(m_transmissionQueue.front()),
160  // capture a copy of the shared_ptr to "this" to prevent deallocation
161  [this, self = this->shared_from_this()] (const auto& error, size_t) {
162  if (error) {
163  if (error == boost::asio::error::operation_aborted) {
164  // async_write was explicitly cancelled (e.g., socket close)
165  return;
166  }
167  m_transport.close();
168  NDN_THROW(Transport::Error(error, "socket write error"));
169  }
170 
171  if (m_transport.getState() == Transport::State::CLOSED) {
172  return; // queue has already been cleared
173  }
174 
175  BOOST_ASSERT(!m_transmissionQueue.empty());
176  m_transmissionQueue.pop();
177 
178  if (!m_transmissionQueue.empty()) {
179  asyncWrite();
180  }
181  });
182  }
183 
184  void
186  {
187  m_socket.async_receive(boost::asio::buffer(m_rxBuffer.data() + m_rxBufferSize,
188  m_rxBuffer.size() - m_rxBufferSize),
189  // capture a copy of the shared_ptr to "this" to prevent deallocation
190  [this, self = this->shared_from_this()] (const auto& error, size_t nBytesRecvd) {
191  if (error) {
192  if (error == boost::asio::error::operation_aborted) {
193  // async_receive was explicitly cancelled (e.g., socket close)
194  return;
195  }
196  m_transport.close();
197  NDN_THROW(Transport::Error(error, "socket read error"));
198  }
199 
200  m_rxBufferSize += nBytesRecvd;
201  auto unparsedBytes = make_span(m_rxBuffer).first(m_rxBufferSize);
202  while (!unparsedBytes.empty()) {
203  auto [isOk, element] = Block::fromBuffer(unparsedBytes);
204  if (!isOk) {
205  break;
206  }
207  unparsedBytes = unparsedBytes.subspan(element.size());
208  m_transport.m_receiveCallback(element);
209  }
210 
211  if (unparsedBytes.empty()) {
212  // nothing left in the receive buffer
213  m_rxBufferSize = 0;
214  }
215  else if (unparsedBytes.data() != m_rxBuffer.data()) {
216  // move remaining unparsed bytes to the beginning of the receive buffer
217  std::copy(unparsedBytes.begin(), unparsedBytes.end(), m_rxBuffer.begin());
218  m_rxBufferSize = unparsedBytes.size();
219  }
220  else if (unparsedBytes.size() == m_rxBuffer.size()) {
221  m_transport.close();
222  NDN_THROW(Transport::Error("receive buffer full, but a valid TLV cannot be decoded"));
223  }
224 
225  asyncReceive();
226  });
227  }
228 
229 protected:
230  BaseTransport& m_transport;
231  typename Protocol::endpoint m_endpoint;
232  typename Protocol::socket m_socket;
233  boost::asio::steady_timer m_connectTimer;
235  size_t m_rxBufferSize = 0;
236  std::array<uint8_t, MAX_NDN_PACKET_SIZE> m_rxBuffer;
237 };
238 
239 } // namespace ndn::detail
240 
241 #endif // NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
Represents a TLV element of the NDN packet format.
Definition: block.hpp:45
Implementation detail of a Boost.Asio-based stream-oriented transport.
std::array< uint8_t, MAX_NDN_PACKET_SIZE > m_rxBuffer
boost::asio::steady_timer m_connectTimer
StreamTransportImpl(BaseTransport &transport, boost::asio::io_context &ioCtx)
std::queue< Block, std::list< Block > > TransmissionQueue
void connect(const typename Protocol::endpoint &endpoint)
void connectHandler(const boost::system::error_code &error)
#define NDN_THROW(e)
Definition: exception.hpp:56
Contains implementation details that are not part of the ndn-cxx public API.
::boost::chrono::seconds seconds
Definition: time.hpp:51