producer-base.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
20 #include "PSync/producer-base.hpp"
21 #include "PSync/detail/util.hpp"
22 
23 #include <ndn-cxx/util/exception.hpp>
24 #include <ndn-cxx/util/logger.hpp>
25 
26 namespace psync {
27 
28 NDN_LOG_INIT(psync.ProducerBase);
29 
31  ndn::KeyChain& keyChain,
32  size_t expectedNumEntries,
33  const ndn::Name& syncPrefix,
34  const ndn::Name& userPrefix,
35  ndn::time::milliseconds syncReplyFreshness,
36  CompressionScheme ibltCompression,
37  CompressionScheme contentCompression)
38  : m_face(face)
39  , m_keyChain(keyChain)
40  , m_scheduler(m_face.getIoService())
41  , m_rng(ndn::random::getRandomNumberEngine())
42  , m_iblt(expectedNumEntries, ibltCompression)
43  , m_segmentPublisher(m_face, m_keyChain)
44  , m_expectedNumEntries(expectedNumEntries)
45  , m_threshold(expectedNumEntries / 2)
46  , m_syncPrefix(syncPrefix)
47  , m_userPrefix(userPrefix)
48  , m_syncReplyFreshness(syncReplyFreshness)
49  , m_ibltCompression(ibltCompression)
50  , m_contentCompression(contentCompression)
51 {
52  addUserNode(userPrefix);
53 }
54 
55 bool
56 ProducerBase::addUserNode(const ndn::Name& prefix)
57 {
58  if (m_prefixes.find(prefix) == m_prefixes.end()) {
59  m_prefixes[prefix] = 0;
60  return true;
61  }
62  else {
63  return false;
64  }
65 }
66 
67 void
68 ProducerBase::removeUserNode(const ndn::Name& prefix)
69 {
70  auto it = m_prefixes.find(prefix);
71  if (it != m_prefixes.end()) {
72  uint64_t seqNo = it->second;
73  m_prefixes.erase(it);
74 
75  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seqNo);
76  auto hashIt = m_biMap.right.find(prefixWithSeq);
77  if (hashIt != m_biMap.right.end()) {
78  m_iblt.erase(hashIt->second);
79  m_biMap.right.erase(hashIt);
80  }
81  }
82 }
83 
84 void
85 ProducerBase::updateSeqNo(const ndn::Name& prefix, uint64_t seq)
86 {
87  NDN_LOG_DEBUG("UpdateSeq: " << prefix << " " << seq);
88 
89  uint64_t oldSeq;
90  auto it = m_prefixes.find(prefix);
91  if (it != m_prefixes.end()) {
92  oldSeq = it->second;
93  }
94  else {
95  NDN_LOG_WARN("Prefix not found in m_prefixes");
96  return;
97  }
98 
99  if (oldSeq >= seq) {
100  NDN_LOG_WARN("Update has lower/equal seq no for prefix, doing nothing!");
101  return;
102  }
103 
104  // Delete the last sequence prefix from the iblt
105  // Because we don't insert zeroth prefix in IBF so no need to delete that
106  if (oldSeq != 0) {
107  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(oldSeq);
108  auto hashIt = m_biMap.right.find(prefixWithSeq);
109  if (hashIt != m_biMap.right.end()) {
110  m_iblt.erase(hashIt->second);
111  m_biMap.right.erase(hashIt);
112  }
113  }
114 
115  // Insert the new seq no in m_prefixes, m_biMap, and m_iblt
116  it->second = seq;
117  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seq);
118  auto newHash = detail::murmurHash3(detail::N_HASHCHECK, prefixWithSeq);
119  m_biMap.insert({newHash, prefixWithSeq});
120  m_iblt.insert(newHash);
121 }
122 
123 void
124 ProducerBase::sendApplicationNack(const ndn::Name& name)
125 {
126  NDN_LOG_DEBUG("Sending application nack");
127 
128  ndn::Name dataName(name);
129  m_iblt.appendToName(dataName);
130  dataName.appendSegment(0);
131  ndn::Data data(dataName);
132  data.setContentType(ndn::tlv::ContentType_Nack)
133  .setFreshnessPeriod(m_syncReplyFreshness)
134  .setFinalBlock(dataName[-1]);
135 
136  m_keyChain.sign(data);
137  m_face.put(data);
138 }
139 
140 void
141 ProducerBase::onRegisterFailed(const ndn::Name& prefix, const std::string& msg)
142 {
143  NDN_LOG_ERROR("onRegisterFailed(" << prefix << "): " << msg);
144  NDN_THROW(Error(msg));
145 }
146 
147 } // namespace psync
Base class for PartialProducer and FullProducer.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
const ndn::time::milliseconds m_syncReplyFreshness
void removeUserNode(const ndn::Name &prefix)
Remove the user node from synchronization.
bool addUserNode(const ndn::Name &prefix)
Adds a user node for synchronization.
std::map< ndn::Name, uint64_t > m_prefixes
ProducerBase(ndn::Face &face, ndn::KeyChain &keyChain, size_t expectedNumEntries, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::NONE, CompressionScheme contentCompression=CompressionScheme::NONE)
Constructor.
HashNameBiMap m_biMap
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
ndn::KeyChain & m_keyChain
void insert(uint32_t key)
Definition: iblt.cpp:121
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:183
void erase(uint32_t key)
Definition: iblt.cpp:127
uint32_t murmurHash3(const void *key, size_t len, uint32_t seed)
Definition: util.cpp:58
constexpr size_t N_HASHCHECK
Definition: iblt.hpp:73
Definition: common.hpp:34
CompressionScheme
Definition: common.hpp:43