22 #ifndef NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
23 #define NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
27 #include <boost/asio/steady_timer.hpp>
28 #include <boost/asio/write.hpp>
29 #include <boost/lexical_cast.hpp>
43 template<
typename BaseTransport,
typename Protocol>
44 class StreamTransportImpl :
public std::enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
58 connect(
const typename Protocol::endpoint& endpoint)
70 m_connectTimer.async_wait([
self = this->shared_from_this()] (
const auto& ec) {
74 self->m_transport.close();
76 "could not connect to NDN forwarder at " +
77 boost::lexical_cast<std::string>(self->m_endpoint)));
80 m_socket.async_connect(
m_endpoint, [
self = this->shared_from_this()] (
const auto& ec) {
81 self->connectHandler(ec);
91 boost::system::error_code error;
138 if (error == boost::asio::error::operation_aborted) {
144 boost::lexical_cast<std::string>(
m_endpoint)));
161 [
this,
self = this->shared_from_this()] (
const auto& error,
size_t) {
163 if (error == boost::asio::error::operation_aborted) {
168 NDN_THROW(Transport::Error(error,
"socket write error"));
187 m_socket.async_receive(boost::asio::buffer(m_rxBuffer.data() + m_rxBufferSize,
188 m_rxBuffer.size() - m_rxBufferSize),
190 [
this,
self = this->shared_from_this()] (
const auto& error,
size_t nBytesRecvd) {
192 if (error == boost::asio::error::operation_aborted) {
197 NDN_THROW(Transport::Error(error,
"socket read error"));
200 m_rxBufferSize += nBytesRecvd;
201 auto unparsedBytes = make_span(m_rxBuffer).first(m_rxBufferSize);
202 while (!unparsedBytes.empty()) {
203 auto [isOk, element] = Block::fromBuffer(unparsedBytes);
207 unparsedBytes = unparsedBytes.subspan(element.size());
208 m_transport.m_receiveCallback(element);
211 if (unparsedBytes.empty()) {
215 else if (unparsedBytes.data() != m_rxBuffer.data()) {
217 std::copy(unparsedBytes.begin(), unparsedBytes.end(), m_rxBuffer.begin());
218 m_rxBufferSize = unparsedBytes.size();
220 else if (unparsedBytes.size() == m_rxBuffer.size()) {
222 NDN_THROW(Transport::Error(
"receive buffer full, but a valid TLV cannot be decoded"));
235 size_t m_rxBufferSize = 0;
Represents a TLV element of the NDN packet format.
Implementation detail of a Boost.Asio-based stream-oriented transport.
std::array< uint8_t, MAX_NDN_PACKET_SIZE > m_rxBuffer
Protocol::socket m_socket
TransmissionQueue m_transmissionQueue
void send(const Block &block)
boost::asio::steady_timer m_connectTimer
StreamTransportImpl(BaseTransport &transport, boost::asio::io_context &ioCtx)
std::queue< Block, std::list< Block > > TransmissionQueue
Protocol::endpoint m_endpoint
void connect(const typename Protocol::endpoint &endpoint)
void connectHandler(const boost::system::error_code &error)
BaseTransport & m_transport
Contains implementation details that are not part of the ndn-cxx public API.
::boost::chrono::seconds seconds