26 #ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
42 template<
class Protocol>
66 doSend(
const Block& packet)
override;
80 size_t nBytesReceived);
103 uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
104 size_t m_receiveBufferSize;
105 std::queue<Block> m_sendQueue;
106 size_t m_sendQueueBytes;
112 : m_socket(std::move(socket))
113 , m_receiveBufferSize(0)
114 , m_sendQueueBytes(0)
129 NFD_LOG_FACE_WARN(
"Failed to obtain send queue length from socket: " << std::strerror(errno));
131 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
140 if (m_socket.is_open()) {
144 boost::system::error_code error;
145 m_socket.cancel(error);
146 m_socket.shutdown(protocol::socket::shutdown_both, error);
175 boost::system::error_code error;
176 m_socket.close(error);
190 bool wasQueueEmpty = m_sendQueue.empty();
191 m_sendQueue.push(packet);
192 m_sendQueueBytes += packet.size();
202 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
203 [
this] (
auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
212 return processErrorCode(error);
216 BOOST_ASSERT(!m_sendQueue.empty());
217 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
218 m_sendQueueBytes -= nBytesSent;
221 if (!m_sendQueue.empty())
231 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
232 ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
233 [
this] (
auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
239 size_t nBytesReceived)
242 return processErrorCode(error);
246 m_receiveBufferSize += nBytesReceived;
247 auto bufferView = ndn::make_span(m_receiveBuffer, m_receiveBufferSize);
250 while (offset < bufferView.size()) {
252 std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
256 offset += element.size();
257 BOOST_ASSERT(offset <= bufferView.size());
259 this->receive(element);
262 if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
270 if (offset != m_receiveBufferSize) {
271 std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
272 m_receiveBufferSize -= offset;
275 m_receiveBufferSize = 0;
291 error == boost::asio::error::operation_aborted ||
292 error == boost::asio::error::shut_down)
303 if (error == boost::asio::error::eof) {
317 m_receiveBufferSize = 0;
324 std::queue<Block> emptyQueue;
325 std::swap(emptyQueue, m_sendQueue);
326 m_sendQueueBytes = 0;
333 return m_sendQueueBytes;
Implements Transport for stream-based protocols.
virtual void handleError(const boost::system::error_code &error)
void doClose() override
performs Transport specific operations to close the transport
void resetReceiveBuffer()
size_t getSendQueueBytes() const
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
protocol::socket m_socket
ssize_t getSendQueueLength() override
void doSend(const Block &packet) override
performs Transport specific operations to send a packet
void processErrorCode(const boost::system::error_code &error)
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
The lower half of a Face.
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_MEMBER_DECL()
@ CLOSED
the transport is closed, and can be safely deallocated
@ CLOSING
the transport is being closed gracefully, either by the peer or by a call to close()
@ FAILED
the transport is being closed due to a failure
@ UP
the transport is up and can transmit packets
const ssize_t QUEUE_ERROR
indicates that the transport was unable to retrieve the queue capacity/length
ssize_t getTxQueueLength(int fd)
obtain send queue length from a specified system socket
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents,...
boost::asio::io_service & getGlobalIoService()
Returns the global io_service instance for the calling thread.