ndn-cxx: NDN C++ Library 0.9.0-33-g832ea91d
Loading...
Searching...
No Matches
stream-transport-impl.hpp
Go to the documentation of this file.
1/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2013-2024 Regents of the University of California.
4 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6 *
7 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20 */
21
22#ifndef NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
23#define NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
24
26
27#include <boost/asio/steady_timer.hpp>
28#include <boost/asio/write.hpp>
29#include <boost/lexical_cast.hpp>
30
31#include <array>
32#include <list>
33#include <queue>
34
35namespace ndn::detail {
36
43template<typename BaseTransport, typename Protocol>
44class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
45{
46protected:
47 using TransmissionQueue = std::queue<Block, std::list<Block>>;
48
49public:
50 StreamTransportImpl(BaseTransport& transport, boost::asio::io_context& ioCtx)
51 : m_transport(transport)
52 , m_socket(ioCtx)
53 , m_connectTimer(ioCtx)
54 {
55 }
56
57 void
58 connect(const typename Protocol::endpoint& endpoint)
59 {
60 if (m_transport.getState() == Transport::State::CONNECTING) {
61 return;
62 }
63
64 m_endpoint = endpoint;
66
67 // Wait at most 4 seconds to connect
69 m_connectTimer.expires_after(std::chrono::seconds(4));
70 m_connectTimer.async_wait([self = this->shared_from_this()] (const auto& ec) {
71 if (ec) // e.g., cancelled timer
72 return;
73
74 self->m_transport.close();
75 NDN_THROW(Transport::Error(boost::system::errc::make_error_code(boost::system::errc::timed_out),
76 "could not connect to NDN forwarder at " +
77 boost::lexical_cast<std::string>(self->m_endpoint)));
78 });
79
80 m_socket.async_connect(m_endpoint, [self = this->shared_from_this()] (const auto& ec) {
81 self->connectHandler(ec);
82 });
83 }
84
85 void
87 {
89
90 m_connectTimer.cancel();
91 boost::system::error_code error; // to silently ignore all errors
92 m_socket.cancel(error);
93 m_socket.close(error);
94
95 TransmissionQueue{}.swap(m_transmissionQueue); // clear the queue
96 }
97
98 void
100 {
101 if (m_transport.getState() == Transport::State::RUNNING) {
102 m_socket.cancel();
104 }
105 }
106
107 void
109 {
110 if (m_transport.getState() == Transport::State::PAUSED) {
112 m_rxBufferSize = 0;
113 asyncReceive();
114 }
115 }
116
117 void
118 send(const Block& block)
119 {
120 m_transmissionQueue.push(block);
121
122 if (m_transport.getState() != Transport::State::CLOSED &&
124 m_transmissionQueue.size() == 1) {
125 asyncWrite();
126 }
127 // if not connected or there's another transmission in progress (m_transmissionQueue.size() > 1),
128 // the next write will be scheduled either in connectHandler or in asyncWriteHandler
129 }
130
131protected:
132 void
133 connectHandler(const boost::system::error_code& error)
134 {
135 m_connectTimer.cancel();
136
137 if (error) {
138 if (error == boost::asio::error::operation_aborted) {
139 // async_connect was explicitly cancelled (e.g., socket close)
140 return;
141 }
142 m_transport.close();
143 NDN_THROW(Transport::Error(error, "could not connect to NDN forwarder at " +
144 boost::lexical_cast<std::string>(m_endpoint)));
145 }
146
148
149 if (!m_transmissionQueue.empty()) {
150 resume();
151 asyncWrite();
152 }
153 }
154
155 void
157 {
158 BOOST_ASSERT(!m_transmissionQueue.empty());
159 boost::asio::async_write(m_socket, boost::asio::buffer(m_transmissionQueue.front()),
160 // capture a copy of the shared_ptr to "this" to prevent deallocation
161 [this, self = this->shared_from_this()] (const auto& error, size_t) {
162 if (error) {
163 if (error == boost::asio::error::operation_aborted) {
164 // async_write was explicitly cancelled (e.g., socket close)
165 return;
166 }
167 m_transport.close();
168 NDN_THROW(Transport::Error(error, "socket write error"));
169 }
170
171 if (m_transport.getState() == Transport::State::CLOSED) {
172 return; // queue has already been cleared
173 }
174
175 BOOST_ASSERT(!m_transmissionQueue.empty());
177
178 if (!m_transmissionQueue.empty()) {
179 asyncWrite();
180 }
181 });
182 }
183
184 void
186 {
187 m_socket.async_receive(boost::asio::buffer(m_rxBuffer.data() + m_rxBufferSize,
188 m_rxBuffer.size() - m_rxBufferSize),
189 // capture a copy of the shared_ptr to "this" to prevent deallocation
190 [this, self = this->shared_from_this()] (const auto& error, size_t nBytesRecvd) {
191 if (error) {
192 if (error == boost::asio::error::operation_aborted) {
193 // async_receive was explicitly cancelled (e.g., socket close)
194 return;
195 }
196 m_transport.close();
197 NDN_THROW(Transport::Error(error, "socket read error"));
198 }
199
200 m_rxBufferSize += nBytesRecvd;
201 auto unparsedBytes = make_span(m_rxBuffer).first(m_rxBufferSize);
202 while (!unparsedBytes.empty()) {
203 auto [isOk, element] = Block::fromBuffer(unparsedBytes);
204 if (!isOk) {
205 break;
206 }
207 unparsedBytes = unparsedBytes.subspan(element.size());
208 m_transport.m_receiveCallback(element);
209 }
210
211 if (unparsedBytes.empty()) {
212 // nothing left in the receive buffer
213 m_rxBufferSize = 0;
214 }
215 else if (unparsedBytes.data() != m_rxBuffer.data()) {
216 // move remaining unparsed bytes to the beginning of the receive buffer
217 std::copy(unparsedBytes.begin(), unparsedBytes.end(), m_rxBuffer.begin());
218 m_rxBufferSize = unparsedBytes.size();
219 }
220 else if (unparsedBytes.size() == m_rxBuffer.size()) {
221 m_transport.close();
222 NDN_THROW(Transport::Error("receive buffer full, but a valid TLV cannot be decoded"));
223 }
224
225 asyncReceive();
226 });
227 }
228
229protected:
230 BaseTransport& m_transport;
231 typename Protocol::endpoint m_endpoint;
232 typename Protocol::socket m_socket;
233 boost::asio::steady_timer m_connectTimer;
235 size_t m_rxBufferSize = 0;
236 std::array<uint8_t, MAX_NDN_PACKET_SIZE> m_rxBuffer;
237};
238
239} // namespace ndn::detail
240
241#endif // NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
Represents a TLV element of the NDN packet format.
Definition block.hpp:45
static std::tuple< bool, Block > fromBuffer(ConstBufferPtr buffer, size_t offset=0)
Try to parse Block from a wire buffer.
Definition block.cpp:165
Implementation detail of a Boost.Asio-based stream-oriented transport.
std::array< uint8_t, MAX_NDN_PACKET_SIZE > m_rxBuffer
StreamTransportImpl(BaseTransport &transport, boost::asio::io_context &ioCtx)
std::queue< Block, std::list< Block > > TransmissionQueue
void connect(const typename Protocol::endpoint &endpoint)
void connectHandler(const boost::system::error_code &error)
#define NDN_THROW(e)
Definition exception.hpp:56
Contains implementation details that are not part of the ndn-cxx public API.