26 #ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
35 #include <boost/asio/write.hpp>
43 template<
class Protocol>
67 doSend(
const Block& packet)
override;
81 size_t nBytesReceived);
104 uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
105 size_t m_receiveBufferSize;
106 std::queue<Block> m_sendQueue;
107 size_t m_sendQueueBytes;
113 : m_socket(std::move(socket))
114 , m_receiveBufferSize(0)
115 , m_sendQueueBytes(0)
130 NFD_LOG_FACE_WARN(
"Failed to obtain send queue length from socket: " << std::strerror(errno));
132 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
141 if (m_socket.is_open()) {
145 boost::system::error_code error;
146 m_socket.cancel(error);
147 m_socket.shutdown(protocol::socket::shutdown_both, error);
176 boost::system::error_code error;
177 m_socket.close(error);
191 bool wasQueueEmpty = m_sendQueue.empty();
192 m_sendQueue.push(packet);
193 m_sendQueueBytes += packet.size();
203 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
204 [
this] (
auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
213 return processErrorCode(error);
217 BOOST_ASSERT(!m_sendQueue.empty());
218 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
219 m_sendQueueBytes -= nBytesSent;
222 if (!m_sendQueue.empty())
232 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
233 ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
234 [
this] (
auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
240 size_t nBytesReceived)
243 return processErrorCode(error);
247 m_receiveBufferSize += nBytesReceived;
248 auto bufferView = ndn::make_span(m_receiveBuffer, m_receiveBufferSize);
251 while (offset < bufferView.size()) {
253 std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
257 offset += element.size();
258 BOOST_ASSERT(offset <= bufferView.size());
260 this->receive(element);
263 if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
271 if (offset != m_receiveBufferSize) {
272 std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
273 m_receiveBufferSize -= offset;
276 m_receiveBufferSize = 0;
292 error == boost::asio::error::operation_aborted ||
293 error == boost::asio::error::shut_down)
304 if (error == boost::asio::error::eof) {
318 m_receiveBufferSize = 0;
325 std::queue<Block> emptyQueue;
326 std::swap(emptyQueue, m_sendQueue);
327 m_sendQueueBytes = 0;
334 return m_sendQueueBytes;
Implements Transport for stream-based protocols.
virtual void handleError(const boost::system::error_code &error)
void doClose() override
Performs Transport specific operations to close the transport.
void resetReceiveBuffer()
size_t getSendQueueBytes() const
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
protocol::socket m_socket
ssize_t getSendQueueLength() override
Returns the current send queue length of the transport (in octets).
void doSend(const Block &packet) override
Performs Transport specific operations to send a packet.
void processErrorCode(const boost::system::error_code &error)
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
The lower half of a Face.
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_MEMBER_DECL()
@ CLOSED
the transport is closed, and can be safely deallocated
@ CLOSING
the transport is being closed gracefully, either by the peer or by a call to close()
@ FAILED
the transport is being closed due to a failure
@ UP
the transport is up and can transmit packets
constexpr ssize_t QUEUE_ERROR
Indicates that the transport was unable to retrieve the queue capacity/length.
ssize_t getTxQueueLength(int fd)
Obtain send queue length from a specified system socket.
boost::asio::io_service & getGlobalIoService()
Returns the global io_service instance for the calling thread.