producer-base.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2024, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "PSync/producer-base.hpp"
21 #include "PSync/detail/util.hpp"
22 
23 #include <ndn-cxx/util/exception.hpp>
24 #include <ndn-cxx/util/logger.hpp>
25 
26 namespace psync {
27 
28 NDN_LOG_INIT(psync.ProducerBase);
29 
31  ndn::KeyChain& keyChain,
32  size_t expectedNumEntries,
33  const ndn::Name& syncPrefix,
34  ndn::time::milliseconds syncReplyFreshness,
35  CompressionScheme ibltCompression,
36  CompressionScheme contentCompression)
37  : m_face(face)
38  , m_keyChain(keyChain)
39  , m_scheduler(m_face.getIoContext())
40  , m_rng(ndn::random::getRandomNumberEngine())
41  , m_iblt(expectedNumEntries, ibltCompression)
42  , m_segmentPublisher(m_face, m_keyChain)
43  , m_expectedNumEntries(expectedNumEntries)
44  , m_threshold(expectedNumEntries / 2)
45  , m_syncPrefix(syncPrefix)
46  , m_syncReplyFreshness(syncReplyFreshness)
47  , m_ibltCompression(ibltCompression)
48  , m_contentCompression(contentCompression)
49 {
50 }
51 
52 bool
53 ProducerBase::addUserNode(const ndn::Name& prefix)
54 {
55  if (m_prefixes.find(prefix) == m_prefixes.end()) {
56  m_prefixes[prefix] = 0;
57  return true;
58  }
59  else {
60  return false;
61  }
62 }
63 
64 void
65 ProducerBase::removeUserNode(const ndn::Name& prefix)
66 {
67  auto it = m_prefixes.find(prefix);
68  if (it != m_prefixes.end()) {
69  uint64_t seqNo = it->second;
70  m_prefixes.erase(it);
71 
72  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seqNo);
73  auto hashIt = m_biMap.right.find(prefixWithSeq);
74  if (hashIt != m_biMap.right.end()) {
75  m_iblt.erase(hashIt->second);
76  m_biMap.right.erase(hashIt);
77  }
78  }
79 }
80 
81 void
82 ProducerBase::updateSeqNo(const ndn::Name& prefix, uint64_t seq)
83 {
84  NDN_LOG_DEBUG("UpdateSeq: " << prefix << " " << seq);
85 
86  uint64_t oldSeq;
87  auto it = m_prefixes.find(prefix);
88  if (it != m_prefixes.end()) {
89  oldSeq = it->second;
90  }
91  else {
92  NDN_LOG_WARN("Prefix not found in m_prefixes");
93  return;
94  }
95 
96  if (oldSeq >= seq) {
97  NDN_LOG_WARN("Update has lower/equal seq no for prefix, doing nothing!");
98  return;
99  }
100 
101  // Delete the last sequence prefix from the iblt
102  // Because we don't insert zeroth prefix in IBF so no need to delete that
103  if (oldSeq != 0) {
104  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(oldSeq);
105  auto hashIt = m_biMap.right.find(prefixWithSeq);
106  if (hashIt != m_biMap.right.end()) {
107  m_iblt.erase(hashIt->second);
108  m_biMap.right.erase(hashIt);
109  }
110  }
111 
112  // Insert the new seq no in m_prefixes, m_biMap, and m_iblt
113  it->second = seq;
114  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seq);
115  auto newHash = detail::murmurHash3(detail::N_HASHCHECK, prefixWithSeq);
116  m_biMap.insert({newHash, prefixWithSeq});
117  m_iblt.insert(newHash);
118 
119  m_numOwnElements += (seq - oldSeq);
120 }
121 
122 void
123 ProducerBase::sendApplicationNack(const ndn::Name& name)
124 {
125  NDN_LOG_DEBUG("Sending application nack");
126 
127  ndn::Name dataName(name);
128  m_iblt.appendToName(dataName);
129  dataName.appendSegment(0);
130  ndn::Data data(dataName);
131  data.setContentType(ndn::tlv::ContentType_Nack)
132  .setFreshnessPeriod(m_syncReplyFreshness)
133  .setFinalBlock(dataName[-1]);
134 
135  m_keyChain.sign(data);
136  m_face.put(data);
137 }
138 
139 void
140 ProducerBase::onRegisterFailed(const ndn::Name& prefix, const std::string& msg)
141 {
142  NDN_LOG_ERROR("onRegisterFailed(" << prefix << "): " << msg);
143  NDN_THROW(Error(msg));
144 }
145 
146 } // namespace psync
Base class for PartialProducer and FullProducer.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
const ndn::time::milliseconds m_syncReplyFreshness
void removeUserNode(const ndn::Name &prefix)
Remove the user node from synchronization.
bool addUserNode(const ndn::Name &prefix)
Adds a user node for synchronization.
std::map< ndn::Name, uint64_t > m_prefixes
HashNameBiMap m_biMap
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
ProducerBase(ndn::Face &face, ndn::KeyChain &keyChain, size_t expectedNumEntries, const ndn::Name &syncPrefix, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::NONE, CompressionScheme contentCompression=CompressionScheme::NONE)
Constructor.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
ndn::KeyChain & m_keyChain
void insert(uint32_t key)
Definition: iblt.cpp:126
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:138
void erase(uint32_t key)
Definition: iblt.cpp:132
uint32_t murmurHash3(const void *key, size_t len, uint32_t seed)
Definition: util.cpp:58
constexpr size_t N_HASHCHECK
Definition: iblt.hpp:87
Definition: common.hpp:34
CompressionScheme
Definition: common.hpp:43