28 #include <ndn-cxx/lp/pit-token.hpp>
29 #include <ndn-cxx/lp/tags.hpp>
39 tlv::sizeOfVarNumber(
sizeof(uint64_t)) +
40 tlv::sizeOfNonNegativeInteger(UINT64_MAX);
44 , m_fragmenter(m_options.fragmenterOptions, this)
45 , m_reassembler(m_options.reassemblerOptions, this)
46 , m_reliability(m_options.reliabilityOptions, this)
48 , m_nextMarkTime(time::steady_clock::time_point::max())
49 , m_nMarkedSinceInMarkingState(0)
85 GenericLinkService::requestIdlePacket()
90 this->sendLpPacket({});
94 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
103 checkCongestionLevel(pkt);
106 auto block = pkt.wireEncode();
107 if (mtu !=
MTU_UNLIMITED && block.size() >
static_cast<size_t>(mtu)) {
116 GenericLinkService::doSendInterest(
const Interest& interest)
118 lp::Packet lpPacket(interest.wireEncode());
120 encodeLpFields(interest, lpPacket);
122 this->sendNetPacket(std::move(lpPacket),
true);
126 GenericLinkService::doSendData(
const Data& data)
128 lp::Packet lpPacket(data.wireEncode());
130 encodeLpFields(data, lpPacket);
132 this->sendNetPacket(std::move(lpPacket),
false);
136 GenericLinkService::doSendNack(
const lp::Nack& nack)
138 lp::Packet lpPacket(nack.getInterest().wireEncode());
139 lpPacket.add<lp::NackField>(nack.getHeader());
141 encodeLpFields(nack, lpPacket);
143 this->sendNetPacket(std::move(lpPacket),
false);
147 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
149 std::for_each(pkts.begin(), pkts.end(), [
this] (lp::Packet& pkt) {
150 pkt.set<lp::SequenceField>(++m_lastSeqNo);
155 GenericLinkService::encodeLpFields(
const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
158 auto incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
159 if (incomingFaceIdTag !=
nullptr) {
160 lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
164 auto congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
165 if (congestionMarkTag !=
nullptr) {
166 lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
170 auto nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
171 if (nonDiscoveryTag !=
nullptr) {
172 lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
175 auto prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
176 if (prefixAnnouncementTag !=
nullptr) {
177 lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
181 auto pitToken = netPkt.getTag<lp::PitToken>();
182 if (pitToken !=
nullptr) {
183 lpPacket.add<lp::PitTokenField>(*pitToken);
188 GenericLinkService::sendNetPacket(lp::Packet&& pkt,
bool isInterest)
190 std::vector<lp::Packet> frags;
216 frags.push_back(pkt);
219 frags.push_back(std::move(pkt));
223 if (frags.size() == 1) {
226 BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
227 BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
233 this->assignSequences(frags);
240 for (lp::Packet& frag : frags) {
241 this->sendLpPacket(std::move(frag));
246 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
250 if (sendQueueLength < 0) {
254 if (sendQueueLength > 0) {
262 const auto now = time::steady_clock::now();
264 if (m_nextMarkTime == time::steady_clock::time_point::max()) {
268 else if (now >= m_nextMarkTime) {
269 pkt.set<lp::CongestionMarkField>(1);
273 ++m_nMarkedSinceInMarkingState;
276 time::nanoseconds interval(
static_cast<time::nanoseconds::rep
>(
278 std::sqrt(m_nMarkedSinceInMarkingState + 1)));
279 m_nextMarkTime += interval;
282 else if (m_nextMarkTime != time::steady_clock::time_point::max()) {
285 m_nextMarkTime = time::steady_clock::time_point::max();
286 m_nMarkedSinceInMarkingState = 0;
291 GenericLinkService::doReceivePacket(
const Block& packet,
const EndpointId& endpoint)
294 lp::Packet pkt(packet);
304 if (!pkt.has<lp::FragmentField>()) {
309 if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
315 bool isReassembled =
false;
318 std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.
receiveFragment(endpoint, pkt);
320 this->decodeNetPacket(netPkt, firstPkt, endpoint);
323 catch (
const tlv::Error& e) {
330 GenericLinkService::decodeNetPacket(
const Block& netPkt,
const lp::Packet& firstPkt,
334 switch (netPkt.type()) {
336 if (firstPkt.has<lp::NackField>()) {
337 this->decodeNack(netPkt, firstPkt, endpointId);
340 this->decodeInterest(netPkt, firstPkt, endpointId);
344 this->decodeData(netPkt, firstPkt, endpointId);
348 NFD_LOG_FACE_WARN(
"unrecognized network-layer packet TLV-TYPE " << netPkt.type() <<
": DROP");
352 catch (
const tlv::Error& e) {
359 GenericLinkService::decodeInterest(
const Block& netPkt,
const lp::Packet& firstPkt,
362 BOOST_ASSERT(netPkt.type() == tlv::Interest);
363 BOOST_ASSERT(!firstPkt.has<lp::NackField>());
366 auto interest = make_shared<Interest>(netPkt);
368 if (firstPkt.has<lp::NextHopFaceIdField>()) {
370 interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
378 if (firstPkt.has<lp::CachePolicyField>()) {
384 if (firstPkt.has<lp::IncomingFaceIdField>()) {
388 if (firstPkt.has<lp::CongestionMarkField>()) {
389 interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
392 if (firstPkt.has<lp::NonDiscoveryField>()) {
394 interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
401 if (firstPkt.has<lp::PrefixAnnouncementField>()) {
407 if (firstPkt.has<lp::PitTokenField>()) {
408 interest->setTag(make_shared<lp::PitToken>(firstPkt.get<lp::PitTokenField>()));
415 GenericLinkService::decodeData(
const Block& netPkt,
const lp::Packet& firstPkt,
418 BOOST_ASSERT(netPkt.type() == tlv::Data);
421 auto data = make_shared<Data>(netPkt);
423 if (firstPkt.has<lp::NackField>()) {
429 if (firstPkt.has<lp::NextHopFaceIdField>()) {
435 if (firstPkt.has<lp::CachePolicyField>()) {
439 data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
442 if (firstPkt.has<lp::IncomingFaceIdField>()) {
446 if (firstPkt.has<lp::CongestionMarkField>()) {
447 data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
450 if (firstPkt.has<lp::NonDiscoveryField>()) {
456 if (firstPkt.has<lp::PrefixAnnouncementField>()) {
458 data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
461 NFD_LOG_FACE_WARN(
"received PrefixAnnouncement, but self-learning disabled: IGNORE");
469 GenericLinkService::decodeNack(
const Block& netPkt,
const lp::Packet& firstPkt,
472 BOOST_ASSERT(netPkt.type() == tlv::Interest);
473 BOOST_ASSERT(firstPkt.has<lp::NackField>());
475 lp::Nack nack((Interest(netPkt)));
476 nack.setHeader(firstPkt.get<lp::NackField>());
478 if (firstPkt.has<lp::NextHopFaceIdField>()) {
484 if (firstPkt.has<lp::CachePolicyField>()) {
490 if (firstPkt.has<lp::IncomingFaceIdField>()) {
494 if (firstPkt.has<lp::CongestionMarkField>()) {
495 nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
498 if (firstPkt.has<lp::NonDiscoveryField>()) {
504 if (firstPkt.has<lp::PrefixAnnouncementField>()) {
Options that control the behavior of GenericLinkService.
LpReassembler::Options reassemblerOptions
options for reassembly
LpFragmenter::Options fragmenterOptions
options for fragmentation
bool allowReassembly
enables reassembly
bool allowSelfLearning
enables self-learning forwarding support
LpReliability::Options reliabilityOptions
options for reliability
bool allowCongestionMarking
enables send queue congestion detection and marking
bool allowFragmentation
enables fragmentation
ssize_t overrideMtu
overrides MTU provided by Transport
size_t defaultCongestionThreshold
default congestion threshold in bytes
time::nanoseconds baseCongestionMarkingInterval
starting value for congestion marking interval
bool allowLocalFields
enables encoding of IncomingFaceId, and decoding of NextHopFaceId and CachePolicy
PacketCounter nInNetInvalid
count of invalid reassembled network-layer packets dropped
PacketCounter nDuplicateSequence
count of LpPackets dropped due to duplicate Sequence numbers
SizeCounter< LpReassembler > nReassembling
count of network-layer packets currently being reassembled
PacketCounter nFragmentationErrors
count of failed fragmentations
PacketCounter nCongestionMarked
count of outgoing LpPackets that were marked with congestion marks
PacketCounter nOutOverMtu
count of outgoing LpPackets dropped due to exceeding MTU limit
PacketCounter nInLpInvalid
count of invalid LpPackets dropped before reassembly
PacketCounter nReassemblyTimeouts
count of dropped partial network-layer packets due to reassembly timeout
ssize_t getEffectiveMtu() const final
void setOptions(const Options &options)
sets Options used by GenericLinkService
GenericLinkService(const Options &options={})
bool canOverrideMtuTo(ssize_t mtu) const
Whether MTU can be overridden to the specified value.
const Transport * getTransport() const
void receiveNack(const lp::Nack &nack, const EndpointId &endpoint)
delivers received Nack to forwarding
void receiveInterest(const Interest &interest, const EndpointId &endpoint)
delivers received Interest to forwarding
void notifyDroppedInterest(const Interest &packet)
void receiveData(const Data &data, const EndpointId &endpoint)
delivers received Data to forwarding
void sendPacket(const Block &packet)
send a lower-layer packet via Transport
void setOptions(const Options &options)
set options for fragmenter
std::tuple< bool, std::vector< lp::Packet > > fragmentPacket(const lp::Packet &packet, size_t mtu)
fragments a network-layer packet into link-layer packets
signal::Signal< LpReassembler, EndpointId, size_t > beforeTimeout
signals before a partial packet is dropped due to timeout
void setOptions(const Options &options)
set options for reassembler
std::tuple< bool, Block, lp::Packet > receiveFragment(EndpointId remoteEndpoint, const lp::Packet &packet)
adds received fragment to the buffer
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + lp::Sequence (8 octets)
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission
bool processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue
void setOptions(const Options &options)
set options for reliability
virtual ssize_t getSendQueueLength()
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_INIT(name)
uint64_t EndpointId
Identifies a remote endpoint on the link.
const ssize_t MIN_MTU
Minimum MTU that may be set.
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
constexpr size_t CONGESTION_MARK_SIZE
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents,...
bool isEnabled
enables link-layer reliability