producer-base.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2020, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
20 #include "PSync/producer-base.hpp"
21 #include "PSync/detail/util.hpp"
22 
23 #include <ndn-cxx/util/exception.hpp>
24 #include <ndn-cxx/util/logger.hpp>
25 
26 #include <cstring>
27 #include <limits>
28 
29 namespace psync {
30 
32 
33 ProducerBase::ProducerBase(size_t expectedNumEntries,
34  ndn::Face& face,
35  const ndn::Name& syncPrefix,
36  const ndn::Name& userPrefix,
37  ndn::time::milliseconds syncReplyFreshness,
38  CompressionScheme ibltCompression,
39  CompressionScheme contentCompression)
40  : m_iblt(expectedNumEntries, ibltCompression)
41  , m_expectedNumEntries(expectedNumEntries)
42  , m_threshold(expectedNumEntries / 2)
43  , m_face(face)
44  , m_scheduler(m_face.getIoService())
45  , m_syncPrefix(syncPrefix)
46  , m_userPrefix(userPrefix)
47  , m_syncReplyFreshness(syncReplyFreshness)
48  , m_segmentPublisher(m_face, m_keyChain)
49  , m_rng(ndn::random::getRandomNumberEngine())
50  , m_ibltCompression(ibltCompression)
51  , m_contentCompression(contentCompression)
52 {
53  addUserNode(userPrefix);
54 }
55 
56 bool
57 ProducerBase::addUserNode(const ndn::Name& prefix)
58 {
59  if (m_prefixes.find(prefix) == m_prefixes.end()) {
60  m_prefixes[prefix] = 0;
61  return true;
62  }
63  else {
64  return false;
65  }
66 }
67 
68 void
69 ProducerBase::removeUserNode(const ndn::Name& prefix)
70 {
71  auto it = m_prefixes.find(prefix);
72  if (it != m_prefixes.end()) {
73  uint64_t seqNo = it->second;
74  m_prefixes.erase(it);
75 
76  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seqNo);
77  auto hashIt = m_biMap.right.find(prefixWithSeq);
78  if (hashIt != m_biMap.right.end()) {
79  m_iblt.erase(hashIt->second);
80  m_biMap.right.erase(hashIt);
81  }
82  }
83 }
84 
85 void
86 ProducerBase::updateSeqNo(const ndn::Name& prefix, uint64_t seq)
87 {
88  NDN_LOG_DEBUG("UpdateSeq: " << prefix << " " << seq);
89 
90  uint64_t oldSeq;
91  auto it = m_prefixes.find(prefix);
92  if (it != m_prefixes.end()) {
93  oldSeq = it->second;
94  }
95  else {
96  NDN_LOG_WARN("Prefix not found in m_prefixes");
97  return;
98  }
99 
100  if (oldSeq >= seq) {
101  NDN_LOG_WARN("Update has lower/equal seq no for prefix, doing nothing!");
102  return;
103  }
104 
105  // Delete the last sequence prefix from the iblt
106  // Because we don't insert zeroth prefix in IBF so no need to delete that
107  if (oldSeq != 0) {
108  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(oldSeq);
109  auto hashIt = m_biMap.right.find(prefixWithSeq);
110  if (hashIt != m_biMap.right.end()) {
111  m_iblt.erase(hashIt->second);
112  m_biMap.right.erase(hashIt);
113  }
114  }
115 
116  // Insert the new seq no in m_prefixes, m_biMap, and m_iblt
117  it->second = seq;
118  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seq);
119  auto newHash = detail::murmurHash3(detail::N_HASHCHECK, prefixWithSeq.toUri());
120  m_biMap.insert({newHash, prefixWithSeq});
121  m_iblt.insert(newHash);
122 }
123 
124 void
125 ProducerBase::sendApplicationNack(const ndn::Name& name)
126 {
127  NDN_LOG_DEBUG("Sending application nack");
128  ndn::Name dataName(name);
129  m_iblt.appendToName(dataName);
130 
131  dataName.appendSegment(0);
132  ndn::Data data(dataName);
133  data.setFreshnessPeriod(m_syncReplyFreshness);
134  data.setContentType(ndn::tlv::ContentType_Nack);
135  data.setFinalBlock(dataName[-1]);
136  m_keyChain.sign(data);
137  m_face.put(data);
138 }
139 
140 void
141 ProducerBase::onRegisterFailed(const ndn::Name& prefix, const std::string& msg) const
142 {
143  NDN_LOG_ERROR("ProducerBase::onRegisterFailed(" << prefix << "): " << msg);
144  NDN_THROW(Error(msg));
145 }
146 
147 } // namespace psync
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:187
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
Definition: common.hpp:33
NDN_LOG_INIT(psync.Consumer)
HashNameBiMap m_biMap
void insert(uint32_t key)
Definition: iblt.cpp:125
ProducerBase(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::NONE, CompressionScheme contentCompression=CompressionScheme::NONE)
constructor
bool addUserNode(const ndn::Name &prefix)
Adds a user node for synchronization.
void removeUserNode(const ndn::Name &prefix)
Remove the user node from synchronization.
CompressionScheme
Definition: common.hpp:35
ndn::KeyChain m_keyChain
void erase(uint32_t key)
Definition: iblt.cpp:131
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
const size_t N_HASHCHECK
ndn::time::milliseconds m_syncReplyFreshness
Base class for PartialProducer and FullProducer.
uint32_t murmurHash3(const void *key, size_t len, uint32_t seed)
Definition: util.cpp:61
std::map< ndn::Name, uint64_t > m_prefixes