NFD: Named Data Networking Forwarding Daemon 24.07-28-gdcc0e6e0
Loading...
Searching...
No Matches
stream-transport.hpp
Go to the documentation of this file.
1/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2024, Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28
29#include "transport.hpp"
30#include "socket-utils.hpp"
31#include "common/global.hpp"
32
33#include <array>
34#include <queue>
35
36#include <boost/asio/defer.hpp>
37#include <boost/asio/write.hpp>
38
39namespace nfd::face {
40
46template<class Protocol>
48{
49public:
50 using protocol = Protocol;
51
57 explicit
58 StreamTransport(typename protocol::socket&& socket);
59
60 ssize_t
62
63protected:
64 void
65 doClose() override;
66
67 void
69
70 void
71 doSend(const Block& packet) override;
72
73 void
75
76 void
77 handleSend(const boost::system::error_code& error,
78 size_t nBytesSent);
79
80 void
82
83 void
84 handleReceive(const boost::system::error_code& error,
85 size_t nBytesReceived);
86
87 void
88 processErrorCode(const boost::system::error_code& error);
89
90 virtual void
91 handleError(const boost::system::error_code& error);
92
93 void
95
96 void
98
99 size_t
101
102protected:
103 typename protocol::socket m_socket;
104
106
107private:
108 size_t m_sendQueueBytes = 0;
109 std::queue<Block> m_sendQueue;
110 size_t m_receiveBufferSize = 0;
111 std::array<uint8_t, ndn::MAX_NDN_PACKET_SIZE> m_receiveBuffer;
112};
113
114
115template<class T>
116StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
117 : m_socket(std::move(socket))
118{
119 // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
120 // Therefore, protecting against send queue overflows is less critical than in other transport
121 // types. Instead, we use the default threshold specified in the GenericLinkService options.
122
123 startReceive();
124}
125
126template<class T>
127ssize_t
129{
130 ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
131 if (queueLength == QUEUE_ERROR) {
132 NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
133 }
134 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
135}
136
137template<class T>
138void
140{
141 NFD_LOG_FACE_TRACE(__func__);
142
143 if (m_socket.is_open()) {
144 // Cancel all outstanding operations and shutdown the socket
145 // so that no further sends or receives are possible.
146 // Use the non-throwing variants and ignore errors, if any.
147 boost::system::error_code error;
148 m_socket.cancel(error);
149 m_socket.shutdown(boost::asio::socket_base::shutdown_both, error);
150 }
151
152 // Ensure that the Transport stays alive at least until
153 // all pending handlers are dispatched
154 boost::asio::defer(getGlobalIoService(), [this] { deferredClose(); });
155
156 // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
157 //
158 // When doClose is called from a socket event handler (e.g., from handleReceive),
159 // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
160 // Instead, handleSend is invoked as nothing bad happened.
161 //
162 // In order to prevent the assertion in handleSend from failing, we clear the queue
163 // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
164 // this point have been executed. If more send operations are scheduled after this
165 // point, they will fail because the socket has been shutdown, and their callbacks
166 // will be invoked with error code == asio::error::shut_down.
167}
168
169template<class T>
170void
172{
173 NFD_LOG_FACE_TRACE(__func__);
174
175 resetSendQueue();
176
177 // use the non-throwing variant and ignore errors, if any
178 boost::system::error_code error;
179 m_socket.close(error);
180
181 this->setState(TransportState::CLOSED);
182}
183
184template<class T>
185void
186StreamTransport<T>::doSend(const Block& packet)
187{
188 NFD_LOG_FACE_TRACE(__func__);
189
190 if (getState() != TransportState::UP)
191 return;
192
193 bool wasQueueEmpty = m_sendQueue.empty();
194 m_sendQueue.push(packet);
195 m_sendQueueBytes += packet.size();
196
197 if (wasQueueEmpty)
198 sendFromQueue();
199}
200
201template<class T>
202void
204{
205 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
206 [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
207}
208
209template<class T>
210void
211StreamTransport<T>::handleSend(const boost::system::error_code& error,
212 size_t nBytesSent)
213{
214 if (error)
215 return processErrorCode(error);
216
217 NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
218
219 BOOST_ASSERT(!m_sendQueue.empty());
220 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
221 m_sendQueueBytes -= nBytesSent;
222 m_sendQueue.pop();
223
224 if (!m_sendQueue.empty())
225 sendFromQueue();
226}
227
228template<class T>
229void
231{
232 BOOST_ASSERT(getState() == TransportState::UP);
233
234 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer.data() + m_receiveBufferSize,
235 m_receiveBuffer.size() - m_receiveBufferSize),
236 [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
237}
238
239template<class T>
240void
241StreamTransport<T>::handleReceive(const boost::system::error_code& error,
242 size_t nBytesReceived)
243{
244 if (error)
245 return processErrorCode(error);
246
247 NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
248
249 m_receiveBufferSize += nBytesReceived;
250 auto unparsedBytes = ndn::make_span(m_receiveBuffer).first(m_receiveBufferSize);
251 while (!unparsedBytes.empty()) {
252 auto [isOk, element] = Block::fromBuffer(unparsedBytes);
253 if (!isOk)
254 break;
255
256 unparsedBytes = unparsedBytes.subspan(element.size());
257 this->receive(element);
258 }
259
260 if (unparsedBytes.empty()) {
261 // nothing left in the receive buffer
262 m_receiveBufferSize = 0;
263 }
264 else if (unparsedBytes.data() != m_receiveBuffer.data()) {
265 // move remaining unparsed bytes to the beginning of the receive buffer
266 std::copy(unparsedBytes.begin(), unparsedBytes.end(), m_receiveBuffer.begin());
267 m_receiveBufferSize = unparsedBytes.size();
268 }
269 else if (unparsedBytes.size() == m_receiveBuffer.size()) {
270 NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
271 this->setState(TransportState::FAILED);
272 doClose();
273 return;
274 }
275
276 startReceive();
277}
278
279template<class T>
280void
281StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
282{
283 NFD_LOG_FACE_TRACE(__func__);
284
285 if (getState() == TransportState::CLOSING ||
286 getState() == TransportState::FAILED ||
287 getState() == TransportState::CLOSED ||
288 error == boost::asio::error::operation_aborted || // when cancel() is called
289 error == boost::asio::error::shut_down) // after shutdown() is called
290 // transport is shutting down, ignore any errors
291 return;
292
293 handleError(error);
294}
295
296template<class T>
297void
298StreamTransport<T>::handleError(const boost::system::error_code& error)
299{
300 if (error == boost::asio::error::eof) {
301 this->setState(TransportState::CLOSING);
302 }
303 else {
304 NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
305 this->setState(TransportState::FAILED);
306 }
307 doClose();
308}
309
310template<class T>
311void
313{
314 m_receiveBufferSize = 0;
315}
316
317template<class T>
318void
320{
321 std::queue<Block> emptyQueue;
322 std::swap(emptyQueue, m_sendQueue);
323 m_sendQueueBytes = 0;
324}
325
326template<class T>
327size_t
329{
330 return m_sendQueueBytes;
331}
332
333} // namespace nfd::face
334
335#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
Implements a Transport for stream-based protocols.
virtual void handleError(const boost::system::error_code &error)
void doClose() override
Performs Transport specific operations to close the transport.
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
ssize_t getSendQueueLength() override
Returns the current send queue length of the transport (in octets).
void doSend(const Block &packet) override
Performs Transport specific operations to send a packet.
void processErrorCode(const boost::system::error_code &error)
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
The lower half of a Face.
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_MEMBER_DECL()
Definition logger.hpp:32
@ CLOSED
the transport is closed, and can be safely deallocated
@ CLOSING
the transport is being closed gracefully, either by the peer or by a call to close()
@ FAILED
the transport is being closed due to a failure
@ UP
the transport is up and can transmit packets
constexpr ssize_t QUEUE_ERROR
Indicates that the transport was unable to retrieve the queue capacity/length.
ssize_t getTxQueueLength(int fd)
Obtain send queue length from a specified system socket.
boost::asio::io_context & getGlobalIoService()
Returns the global io_context instance for the calling thread.
Definition global.cpp:36