partial-producer.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
21 #include "PSync/detail/state.hpp"
22 
23 #include <ndn-cxx/util/logger.hpp>
24 
25 #include <cstring>
26 
27 namespace psync {
28 
29 NDN_LOG_INIT(psync.PartialProducer);
30 
31 const ndn::name::Component HELLO{"hello"};
32 const ndn::name::Component SYNC{"sync"};
33 
35  ndn::KeyChain& keyChain,
36  size_t expectedNumEntries,
37  const ndn::Name& syncPrefix,
38  const ndn::Name& userPrefix,
39  ndn::time::milliseconds helloReplyFreshness,
40  ndn::time::milliseconds syncReplyFreshness,
41  CompressionScheme ibltCompression)
42  : ProducerBase(face, keyChain, expectedNumEntries, syncPrefix, userPrefix,
43  syncReplyFreshness, ibltCompression, CompressionScheme::NONE)
44  , m_helloReplyFreshness(helloReplyFreshness)
45 {
46  m_registeredPrefix = m_face.registerPrefix(m_syncPrefix,
47  [this] (const auto&) {
48  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append(HELLO),
49  std::bind(&PartialProducer::onHelloInterest, this, _1, _2));
50  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append(SYNC),
51  std::bind(&PartialProducer::onSyncInterest, this, _1, _2));
52  },
53  [] (auto&&... args) { onRegisterFailed(std::forward<decltype(args)>(args)...); });
54 }
55 
56 void
57 PartialProducer::publishName(const ndn::Name& prefix, std::optional<uint64_t> seq)
58 {
59  if (m_prefixes.find(prefix) == m_prefixes.end()) {
60  return;
61  }
62 
63  uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
64  NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
65  updateSeqNo(prefix, newSeq);
66  satisfyPendingSyncInterests(prefix);
67 }
68 
69 void
70 PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
71 {
72  const auto& name = interest.getName();
74  return;
75  }
76 
77  // Last component or fourth last component (in case of interest with version and segment)
78  // needs to be hello
79  if (name.get(name.size() - 1) != HELLO && name.get(name.size() - 4) != HELLO) {
80  return;
81  }
82 
83  NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest);
84 
85  detail::State state;
86  for (const auto& p : m_prefixes) {
87  state.addContent(ndn::Name(p.first).appendNumber(p.second));
88  }
89  NDN_LOG_DEBUG("sending content p: " << state);
90 
91  ndn::Name helloDataName = prefix;
92  m_iblt.appendToName(helloDataName);
93 
94  m_segmentPublisher.publish(interest.getName(), helloDataName,
95  state.wireEncode(), m_helloReplyFreshness);
96 }
97 
98 void
99 PartialProducer::onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest)
100 {
101  if (m_segmentPublisher.replyFromStore(interest.getName())) {
102  return;
103  }
104 
105  NDN_LOG_DEBUG("Sync Interest Received, nonce: " << interest.getNonce() <<
106  " hash: " << std::hash<ndn::Name>{}(interest.getName()));
107 
108  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefix.size());
109  ndn::Name interestName;
110 
111  if (nameWithoutSyncPrefix.size() == 4) {
112  // Get /<prefix>/BF/IBF/ from /<prefix>/BF/IBF (3 components of BF + 1 for IBF)
113  interestName = interest.getName();
114  }
115  else if (nameWithoutSyncPrefix.size() == 6) {
116  // Get <prefix>/BF/IBF/ from /<prefix>/BF/IBF/<version>/<segment-no>
117  interestName = interest.getName().getPrefix(-2);
118  }
119  else {
120  return;
121  }
122 
123  ndn::name::Component bfName, ibltName;
124  unsigned int projectedCount;
125  double falsePositiveProb;
126  try {
127  projectedCount = interestName.get(interestName.size()-4).toNumber();
128  falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
129  bfName = interestName.get(interestName.size()-2);
130 
131  ibltName = interestName.get(interestName.size()-1);
132  }
133  catch (const std::exception& e) {
134  NDN_LOG_ERROR("Cannot extract bloom filter and IBF from sync interest: " << e.what());
135  NDN_LOG_ERROR("Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
136  return;
137  }
138 
139  detail::BloomFilter bf;
140  detail::IBLT iblt(m_expectedNumEntries, m_ibltCompression);
141  try {
142  bf = detail::BloomFilter(projectedCount, falsePositiveProb, bfName);
143  iblt.initialize(ibltName);
144  }
145  catch (const std::exception& e) {
146  NDN_LOG_WARN(e.what());
147  return;
148  }
149 
150  // get the difference
151  auto diff = m_iblt - iblt;
152 
153  // non-empty positive means we have some elements that the others don't
154  std::set<uint32_t> positive;
155  std::set<uint32_t> negative;
156 
157  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
158 
159  bool peel = diff.listEntries(positive, negative);
160 
161  NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
162 
163  if (!peel) {
164  NDN_LOG_DEBUG("Can't decode the difference, sending application Nack");
165  sendApplicationNack(interestName);
166  return;
167  }
168 
169  // generate content for Sync reply
170  detail::State state;
171  NDN_LOG_TRACE("Size of positive set " << positive.size());
172  NDN_LOG_TRACE("Size of negative set " << negative.size());
173  for (const auto& hash : positive) {
174  auto nameIt = m_biMap.left.find(hash);
175  if (nameIt != m_biMap.left.end()) {
176  if (bf.contains(nameIt->second.getPrefix(-1))) {
177  // generate data
178  state.addContent(nameIt->second);
179  NDN_LOG_DEBUG("Content: " << nameIt->second << " " << nameIt->first);
180  }
181  }
182  }
183 
184  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
185 
186  if (positive.size() + negative.size() >= m_threshold || !state.getContent().empty()) {
187 
188  // send back data
189  ndn::Name syncDataName = interestName;
190  m_iblt.appendToName(syncDataName);
191 
192  m_segmentPublisher.publish(interest.getName(), syncDataName,
193  state.wireEncode(), m_syncReplyFreshness);
194  return;
195  }
196 
197  auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{bf, iblt, {}}).first->second;
198  entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
199  [this, interest] {
200  NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
201  m_pendingEntries.erase(interest.getName());
202  });
203 }
204 
205 void
206 PartialProducer::satisfyPendingSyncInterests(const ndn::Name& prefix) {
207  NDN_LOG_TRACE("size of pending interest: " << m_pendingEntries.size());
208 
209  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
210  const PendingEntryInfo& entry = it->second;
211 
212  auto diff = m_iblt - entry.iblt;
213  std::set<uint32_t> positive;
214  std::set<uint32_t> negative;
215 
216  bool peel = diff.listEntries(positive, negative);
217 
218  NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
219 
220  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
221  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
222 
223  if (!peel) {
224  NDN_LOG_TRACE("Decoding of differences with stored IBF unsuccessful, deleting pending interest");
225  m_pendingEntries.erase(it++);
226  continue;
227  }
228 
229  detail::State state;
230  if (entry.bf.contains(prefix) || positive.size() + negative.size() >= m_threshold) {
231  if (entry.bf.contains(prefix)) {
232  state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
233  NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
234  }
235  else {
236  NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
237  }
238 
239  // generate sync data and cancel the event
240  ndn::Name syncDataName = it->first;
241  m_iblt.appendToName(syncDataName);
242 
243  m_segmentPublisher.publish(it->first, syncDataName,
244  state.wireEncode(), m_syncReplyFreshness);
245 
246  m_pendingEntries.erase(it++);
247  }
248  else {
249  ++it;
250  }
251  }
252 }
253 
254 } // namespace psync
Partial sync logic to publish data names.
PartialProducer(ndn::Face &face, ndn::KeyChain &keyChain, size_t expectedNumEntries, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, ndn::time::milliseconds helloReplyFreshness=HELLO_REPLY_FRESHNESS, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::NONE)
Constructor.
void publishName(const ndn::Name &prefix, std::optional< uint64_t > seq=std::nullopt)
Publish name to let subscribed consumers know.
Base class for PartialProducer and FullProducer.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
const ndn::Name m_syncPrefix
const ndn::time::milliseconds m_syncReplyFreshness
const size_t m_expectedNumEntries
std::map< ndn::Name, uint64_t > m_prefixes
const size_t m_threshold
HashNameBiMap m_biMap
ndn::Scheduler m_scheduler
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
const CompressionScheme m_ibltCompression
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
SegmentPublisher m_segmentPublisher
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, ndn::span< const uint8_t > buffer, ndn::time::milliseconds freshness)
Put all the segments in memory.
bool listEntries(std::set< uint32_t > &positive, std::set< uint32_t > &negative) const
List all the entries in the IBLT.
Definition: iblt.cpp:133
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:183
Definition: common.hpp:34
const ndn::name::Component SYNC
CompressionScheme
Definition: common.hpp:43
const ndn::name::Component HELLO