26 #ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
36 #include <boost/asio/defer.hpp>
37 #include <boost/asio/write.hpp>
46 template<
class Protocol>
71 doSend(
const Block& packet)
override;
85 size_t nBytesReceived);
108 size_t m_sendQueueBytes = 0;
109 std::queue<Block> m_sendQueue;
110 size_t m_receiveBufferSize = 0;
111 std::array<uint8_t, ndn::MAX_NDN_PACKET_SIZE> m_receiveBuffer;
117 : m_socket(std::move(socket))
132 NFD_LOG_FACE_WARN(
"Failed to obtain send queue length from socket: " << std::strerror(errno));
134 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
143 if (m_socket.is_open()) {
147 boost::system::error_code error;
148 m_socket.cancel(error);
149 m_socket.shutdown(boost::asio::socket_base::shutdown_both, error);
178 boost::system::error_code error;
179 m_socket.close(error);
193 bool wasQueueEmpty = m_sendQueue.empty();
194 m_sendQueue.push(packet);
195 m_sendQueueBytes += packet.size();
205 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
206 [
this] (
auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
215 return processErrorCode(error);
219 BOOST_ASSERT(!m_sendQueue.empty());
220 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
221 m_sendQueueBytes -= nBytesSent;
224 if (!m_sendQueue.empty())
234 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer.data() + m_receiveBufferSize,
235 m_receiveBuffer.size() - m_receiveBufferSize),
236 [
this] (
auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
242 size_t nBytesReceived)
245 return processErrorCode(error);
249 m_receiveBufferSize += nBytesReceived;
250 auto unparsedBytes = ndn::make_span(m_receiveBuffer).first(m_receiveBufferSize);
251 while (!unparsedBytes.empty()) {
252 auto [isOk, element] = Block::fromBuffer(unparsedBytes);
256 unparsedBytes = unparsedBytes.subspan(element.size());
257 this->receive(element);
260 if (unparsedBytes.empty()) {
262 m_receiveBufferSize = 0;
264 else if (unparsedBytes.data() != m_receiveBuffer.data()) {
266 std::copy(unparsedBytes.begin(), unparsedBytes.end(), m_receiveBuffer.begin());
267 m_receiveBufferSize = unparsedBytes.size();
269 else if (unparsedBytes.size() == m_receiveBuffer.size()) {
288 error == boost::asio::error::operation_aborted ||
289 error == boost::asio::error::shut_down)
300 if (error == boost::asio::error::eof) {
314 m_receiveBufferSize = 0;
321 std::queue<Block> emptyQueue;
322 std::swap(emptyQueue, m_sendQueue);
323 m_sendQueueBytes = 0;
330 return m_sendQueueBytes;
Implements a Transport for stream-based protocols.
virtual void handleError(const boost::system::error_code &error)
void doClose() override
Performs Transport specific operations to close the transport.
void resetReceiveBuffer()
size_t getSendQueueBytes() const
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
protocol::socket m_socket
ssize_t getSendQueueLength() override
Returns the current send queue length of the transport (in octets).
void doSend(const Block &packet) override
Performs Transport specific operations to send a packet.
void processErrorCode(const boost::system::error_code &error)
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
The lower half of a Face.
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_MEMBER_DECL()
@ CLOSED
the transport is closed, and can be safely deallocated
@ CLOSING
the transport is being closed gracefully, either by the peer or by a call to close()
@ FAILED
the transport is being closed due to a failure
@ UP
the transport is up and can transmit packets
constexpr ssize_t QUEUE_ERROR
Indicates that the transport was unable to retrieve the queue capacity/length.
ssize_t getTxQueueLength(int fd)
Obtain send queue length from a specified system socket.
boost::asio::io_context & getGlobalIoService()
Returns the global io_context instance for the calling thread.