stream-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2024, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28 
29 #include "transport.hpp"
30 #include "socket-utils.hpp"
31 #include "common/global.hpp"
32 
33 #include <array>
34 #include <queue>
35 
36 #include <boost/asio/defer.hpp>
37 #include <boost/asio/write.hpp>
38 
39 namespace nfd::face {
40 
46 template<class Protocol>
47 class StreamTransport : public Transport
48 {
49 public:
50  using protocol = Protocol;
51 
57  explicit
58  StreamTransport(typename protocol::socket&& socket);
59 
60  ssize_t
61  getSendQueueLength() override;
62 
63 protected:
64  void
65  doClose() override;
66 
67  void
69 
70  void
71  doSend(const Block& packet) override;
72 
73  void
75 
76  void
77  handleSend(const boost::system::error_code& error,
78  size_t nBytesSent);
79 
80  void
82 
83  void
84  handleReceive(const boost::system::error_code& error,
85  size_t nBytesReceived);
86 
87  void
88  processErrorCode(const boost::system::error_code& error);
89 
90  virtual void
91  handleError(const boost::system::error_code& error);
92 
93  void
95 
96  void
98 
99  size_t
101 
102 protected:
103  typename protocol::socket m_socket;
104 
106 
107 private:
108  size_t m_sendQueueBytes = 0;
109  std::queue<Block> m_sendQueue;
110  size_t m_receiveBufferSize = 0;
111  std::array<uint8_t, ndn::MAX_NDN_PACKET_SIZE> m_receiveBuffer;
112 };
113 
114 
115 template<class T>
116 StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
117  : m_socket(std::move(socket))
118 {
119  // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
120  // Therefore, protecting against send queue overflows is less critical than in other transport
121  // types. Instead, we use the default threshold specified in the GenericLinkService options.
122 
123  startReceive();
124 }
125 
126 template<class T>
127 ssize_t
129 {
130  ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
131  if (queueLength == QUEUE_ERROR) {
132  NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
133  }
134  return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
135 }
136 
137 template<class T>
138 void
140 {
141  NFD_LOG_FACE_TRACE(__func__);
142 
143  if (m_socket.is_open()) {
144  // Cancel all outstanding operations and shutdown the socket
145  // so that no further sends or receives are possible.
146  // Use the non-throwing variants and ignore errors, if any.
147  boost::system::error_code error;
148  m_socket.cancel(error);
149  m_socket.shutdown(boost::asio::socket_base::shutdown_both, error);
150  }
151 
152  // Ensure that the Transport stays alive at least until
153  // all pending handlers are dispatched
154  boost::asio::defer(getGlobalIoService(), [this] { deferredClose(); });
155 
156  // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
157  //
158  // When doClose is called from a socket event handler (e.g., from handleReceive),
159  // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
160  // Instead, handleSend is invoked as nothing bad happened.
161  //
162  // In order to prevent the assertion in handleSend from failing, we clear the queue
163  // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
164  // this point have been executed. If more send operations are scheduled after this
165  // point, they will fail because the socket has been shutdown, and their callbacks
166  // will be invoked with error code == asio::error::shut_down.
167 }
168 
169 template<class T>
170 void
172 {
173  NFD_LOG_FACE_TRACE(__func__);
174 
175  resetSendQueue();
176 
177  // use the non-throwing variant and ignore errors, if any
178  boost::system::error_code error;
179  m_socket.close(error);
180 
181  this->setState(TransportState::CLOSED);
182 }
183 
184 template<class T>
185 void
186 StreamTransport<T>::doSend(const Block& packet)
187 {
188  NFD_LOG_FACE_TRACE(__func__);
189 
190  if (getState() != TransportState::UP)
191  return;
192 
193  bool wasQueueEmpty = m_sendQueue.empty();
194  m_sendQueue.push(packet);
195  m_sendQueueBytes += packet.size();
196 
197  if (wasQueueEmpty)
198  sendFromQueue();
199 }
200 
201 template<class T>
202 void
204 {
205  boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
206  [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
207 }
208 
209 template<class T>
210 void
211 StreamTransport<T>::handleSend(const boost::system::error_code& error,
212  size_t nBytesSent)
213 {
214  if (error)
215  return processErrorCode(error);
216 
217  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
218 
219  BOOST_ASSERT(!m_sendQueue.empty());
220  BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
221  m_sendQueueBytes -= nBytesSent;
222  m_sendQueue.pop();
223 
224  if (!m_sendQueue.empty())
225  sendFromQueue();
226 }
227 
228 template<class T>
229 void
231 {
232  BOOST_ASSERT(getState() == TransportState::UP);
233 
234  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer.data() + m_receiveBufferSize,
235  m_receiveBuffer.size() - m_receiveBufferSize),
236  [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
237 }
238 
239 template<class T>
240 void
241 StreamTransport<T>::handleReceive(const boost::system::error_code& error,
242  size_t nBytesReceived)
243 {
244  if (error)
245  return processErrorCode(error);
246 
247  NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
248 
249  m_receiveBufferSize += nBytesReceived;
250  auto unparsedBytes = ndn::make_span(m_receiveBuffer).first(m_receiveBufferSize);
251  while (!unparsedBytes.empty()) {
252  auto [isOk, element] = Block::fromBuffer(unparsedBytes);
253  if (!isOk)
254  break;
255 
256  unparsedBytes = unparsedBytes.subspan(element.size());
257  this->receive(element);
258  }
259 
260  if (unparsedBytes.empty()) {
261  // nothing left in the receive buffer
262  m_receiveBufferSize = 0;
263  }
264  else if (unparsedBytes.data() != m_receiveBuffer.data()) {
265  // move remaining unparsed bytes to the beginning of the receive buffer
266  std::copy(unparsedBytes.begin(), unparsedBytes.end(), m_receiveBuffer.begin());
267  m_receiveBufferSize = unparsedBytes.size();
268  }
269  else if (unparsedBytes.size() == m_receiveBuffer.size()) {
270  NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
271  this->setState(TransportState::FAILED);
272  doClose();
273  return;
274  }
275 
276  startReceive();
277 }
278 
279 template<class T>
280 void
281 StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
282 {
283  NFD_LOG_FACE_TRACE(__func__);
284 
285  if (getState() == TransportState::CLOSING ||
286  getState() == TransportState::FAILED ||
287  getState() == TransportState::CLOSED ||
288  error == boost::asio::error::operation_aborted || // when cancel() is called
289  error == boost::asio::error::shut_down) // after shutdown() is called
290  // transport is shutting down, ignore any errors
291  return;
292 
293  handleError(error);
294 }
295 
296 template<class T>
297 void
298 StreamTransport<T>::handleError(const boost::system::error_code& error)
299 {
300  if (error == boost::asio::error::eof) {
301  this->setState(TransportState::CLOSING);
302  }
303  else {
304  NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
305  this->setState(TransportState::FAILED);
306  }
307  doClose();
308 }
309 
310 template<class T>
311 void
313 {
314  m_receiveBufferSize = 0;
315 }
316 
317 template<class T>
318 void
320 {
321  std::queue<Block> emptyQueue;
322  std::swap(emptyQueue, m_sendQueue);
323  m_sendQueueBytes = 0;
324 }
325 
326 template<class T>
327 size_t
329 {
330  return m_sendQueueBytes;
331 }
332 
333 } // namespace nfd::face
334 
335 #endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
Implements a Transport for stream-based protocols.
virtual void handleError(const boost::system::error_code &error)
void doClose() override
Performs Transport specific operations to close the transport.
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
ssize_t getSendQueueLength() override
Returns the current send queue length of the transport (in octets).
void doSend(const Block &packet) override
Performs Transport specific operations to send a packet.
void processErrorCode(const boost::system::error_code &error)
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
The lower half of a Face.
Definition: transport.hpp:123
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_MEMBER_DECL()
Definition: logger.hpp:32
@ CLOSED
the transport is closed, and can be safely deallocated
@ CLOSING
the transport is being closed gracefully, either by the peer or by a call to close()
@ FAILED
the transport is being closed due to a failure
@ UP
the transport is up and can transmit packets
constexpr ssize_t QUEUE_ERROR
Indicates that the transport was unable to retrieve the queue capacity/length.
Definition: transport.hpp:116
ssize_t getTxQueueLength(int fd)
Obtain send queue length from a specified system socket.
boost::asio::io_context & getGlobalIoService()
Returns the global io_context instance for the calling thread.
Definition: global.cpp:36