partial-producer.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2019, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
21 #include "PSync/detail/state.hpp"
22 
23 #include <ndn-cxx/util/logger.hpp>
24 
25 #include <cstring>
26 #include <limits>
27 
28 namespace psync {
29 
30 NDN_LOG_INIT(psync.PartialProducer);
31 
32 PartialProducer::PartialProducer(size_t expectedNumEntries,
33  ndn::Face& face,
34  const ndn::Name& syncPrefix,
35  const ndn::Name& userPrefix,
36  ndn::time::milliseconds syncReplyFreshness,
37  ndn::time::milliseconds helloReplyFreshness)
38  : ProducerBase(expectedNumEntries, face, syncPrefix,
39  userPrefix, syncReplyFreshness, helloReplyFreshness)
40 {
41  m_registeredPrefix = m_face.registerPrefix(m_syncPrefix,
42  [this] (const ndn::Name& syncPrefix) {
43  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("hello"),
44  std::bind(&PartialProducer::onHelloInterest, this, _1, _2));
45  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("sync"),
46  std::bind(&PartialProducer::onSyncInterest, this, _1, _2));
47  },
48  std::bind(&PartialProducer::onRegisterFailed, this, _1, _2));
49 }
50 
51 void
52 PartialProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
53 {
54  if (m_prefixes.find(prefix) == m_prefixes.end()) {
55  return;
56  }
57 
58  uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
59 
60  NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
61 
62  updateSeqNo(prefix, newSeq);
63 
64  satisfyPendingSyncInterests(prefix);
65 }
66 
67 void
68 PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
69 {
70  if (m_segmentPublisher.replyFromStore(interest.getName())) {
71  return;
72  }
73 
74  // Last component or fourth last component (in case of interest with version and segment)
75  // needs to be hello
76  if (interest.getName().get(interest.getName().size()-1).toUri() != "hello" &&
77  interest.getName().get(interest.getName().size()-4).toUri() != "hello") {
78  return;
79  }
80 
81  NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest);
82 
83  State state;
84 
85  for (const auto& prefix : m_prefixes) {
86  state.addContent(ndn::Name(prefix.first).appendNumber(prefix.second));
87  }
88  NDN_LOG_DEBUG("sending content p: " << state);
89 
90  ndn::Name helloDataName = prefix;
91  m_iblt.appendToName(helloDataName);
92 
93  m_segmentPublisher.publish(interest.getName(), helloDataName,
95 }
96 
97 void
98 PartialProducer::onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest)
99 {
100  if (m_segmentPublisher.replyFromStore(interest.getName())) {
101  return;
102  }
103 
104  NDN_LOG_DEBUG("Sync Interest Received, nonce: " << interest.getNonce() <<
105  " hash: " << std::hash<std::string>{}(interest.getName().toUri()));
106 
107  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefix.size());
108  ndn::Name interestName;
109 
110  if (nameWithoutSyncPrefix.size() == 4) {
111  // Get /<prefix>/BF/IBF/ from /<prefix>/BF/IBF (3 components of BF + 1 for IBF)
112  interestName = interest.getName();
113  }
114  else if (nameWithoutSyncPrefix.size() == 6) {
115  // Get <prefix>/BF/IBF/ from /<prefix>/BF/IBF/<version>/<segment-no>
116  interestName = interest.getName().getPrefix(-2);
117  }
118  else {
119  return;
120  }
121 
122  ndn::name::Component bfName, ibltName;
123  unsigned int projectedCount;
124  double falsePositiveProb;
125  try {
126  projectedCount = interestName.get(interestName.size()-4).toNumber();
127  falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
128  bfName = interestName.get(interestName.size()-2);
129 
130  ibltName = interestName.get(interestName.size()-1);
131  }
132  catch (const std::exception& e) {
133  NDN_LOG_ERROR("Cannot extract bloom filter and IBF from sync interest: " << e.what());
134  NDN_LOG_ERROR("Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
135  return;
136  }
137 
138  BloomFilter bf;
140 
141  try {
142  bf = BloomFilter(projectedCount, falsePositiveProb, bfName);
143  iblt.initialize(ibltName);
144  }
145  catch (const std::exception& e) {
146  NDN_LOG_WARN(e.what());
147  return;
148  }
149 
150  // get the difference
151  IBLT diff = m_iblt - iblt;
152 
153  // non-empty positive means we have some elements that the others don't
154  std::set<uint32_t> positive;
155  std::set<uint32_t> negative;
156 
157  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
158 
159  bool peel = diff.listEntries(positive, negative);
160 
161  NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
162 
163  if (!peel) {
164  NDN_LOG_DEBUG("Can't decode the difference, sending application Nack");
165  sendApplicationNack(interestName);
166  return;
167  }
168 
169  // generate content for Sync reply
170  State state;
171  NDN_LOG_TRACE("Size of positive set " << positive.size());
172  NDN_LOG_TRACE("Size of negative set " << negative.size());
173  for (const auto& hash : positive) {
174  ndn::Name prefix = m_hash2prefix[hash];
175  if (bf.contains(prefix.toUri())) {
176  // generate data
177  state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
178  NDN_LOG_DEBUG("Content: " << prefix << " " << std::to_string(m_prefixes[prefix]));
179  }
180  }
181 
182  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
183 
184  if (positive.size() + negative.size() >= m_threshold || !state.getContent().empty()) {
185 
186  // send back data
187  ndn::Name syncDataName = interestName;
188  m_iblt.appendToName(syncDataName);
189 
190  m_segmentPublisher.publish(interest.getName(), syncDataName,
192  return;
193  }
194 
195  auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{bf, iblt, {}}).first->second;
196  entry.expirationEvent = m_scheduler.scheduleEvent(interest.getInterestLifetime(),
197  [this, interest] {
198  NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
199  m_pendingEntries.erase(interest.getName());
200  });
201 }
202 
203 void
204 PartialProducer::satisfyPendingSyncInterests(const ndn::Name& prefix) {
205  NDN_LOG_TRACE("size of pending interest: " << m_pendingEntries.size());
206 
207  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
208  const PendingEntryInfo& entry = it->second;
209 
210  IBLT diff = m_iblt - entry.iblt;
211  std::set<uint32_t> positive;
212  std::set<uint32_t> negative;
213 
214  bool peel = diff.listEntries(positive, negative);
215 
216  NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
217 
218  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
219  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
220 
221  if (!peel) {
222  NDN_LOG_TRACE("Decoding of differences with stored IBF unsuccessful, deleting pending interest");
223  m_pendingEntries.erase(it++);
224  continue;
225  }
226 
227  State state;
228  if (entry.bf.contains(prefix.toUri()) || positive.size() + negative.size() >= m_threshold) {
229  if (entry.bf.contains(prefix.toUri())) {
230  state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
231  NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
232  }
233  else {
234  NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
235  }
236 
237  // generate sync data and cancel the event
238  ndn::Name syncDataName = it->first;
239  m_iblt.appendToName(syncDataName);
240 
241  m_segmentPublisher.publish(it->first, syncDataName,
243 
244  m_pendingEntries.erase(it++);
245  }
246  else {
247  ++it;
248  }
249  }
250 }
251 
252 } // namespace psync
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
void initialize(const ndn::name::Component &ibltName)
Populate the hash table using the vector representation of IBLT.
Definition: iblt.cpp:92
void addContent(const ndn::Name &prefix)
Definition: state.cpp:30
std::vector< ndn::Name > getContent() const
Definition: state.hpp:46
const ndn::Block & wireEncode() const
Definition: state.cpp:36
bool listEntries(std::set< uint32_t > &positive, std::set< uint32_t > &negative) const
List all the entries in the IBLT.
Definition: iblt.cpp:138
bool contains(const std::string &key) const
Partial sync logic to publish data names.
Invertible Bloom Lookup Table (Invertible Bloom Filter)
Definition: iblt.hpp:80
void publish(const ndn::Name &interestName, const ndn::Name &dataName, const ndn::Block &block, ndn::time::milliseconds freshness, const ndn::security::SigningInfo &signingInfo=ndn::security::v2::KeyChain::getDefaultSigningInfo())
Put all the segments in memory.
void publishName(const ndn::Name &prefix, ndn::optional< uint64_t > seq=ndn::nullopt)
Publish name to let subscribed consumers know.
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
std::map< uint32_t, ndn::Name > m_hash2prefix
PartialProducer(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, ndn::time::milliseconds helloReplyFreshness=HELLO_REPLY_FRESHNESS, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS)
constructor
ndn::Scheduler m_scheduler
ndn::time::milliseconds m_syncReplyFreshness
SegmentPublisher m_segmentPublisher
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:229
Base class for PartialProducer and FullProducer.
ndn::time::milliseconds m_helloReplyFreshness
std::map< ndn::Name, uint64_t > m_prefixes