partial-producer.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2024, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
21 #include "PSync/detail/state.hpp"
22 
23 #include <ndn-cxx/util/logger.hpp>
24 
25 #include <cstring>
26 
27 namespace psync {
28 
29 NDN_LOG_INIT(psync.PartialProducer);
30 
31 const ndn::name::Component HELLO{"hello"};
32 const ndn::name::Component SYNC{"sync"};
33 
35  ndn::KeyChain& keyChain,
36  const ndn::Name& syncPrefix,
37  const Options& opts)
38  : ProducerBase(face, keyChain, opts.ibfCount, syncPrefix, opts.syncDataFreshness,
39  opts.ibfCompression, CompressionScheme::NONE)
40  , m_helloReplyFreshness(opts.helloDataFreshness)
41 {
42  m_registeredPrefix = m_face.registerPrefix(m_syncPrefix,
43  [this] (const auto&) {
44  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append(HELLO),
45  std::bind(&PartialProducer::onHelloInterest, this, _1, _2));
46  m_face.setInterestFilter(ndn::Name(m_syncPrefix).append(SYNC),
47  std::bind(&PartialProducer::onSyncInterest, this, _1, _2));
48  },
49  [] (auto&&... args) { onRegisterFailed(std::forward<decltype(args)>(args)...); });
50 }
51 
53  ndn::KeyChain& keyChain,
54  size_t expectedNumEntries,
55  const ndn::Name& syncPrefix,
56  const ndn::Name& userPrefix,
57  ndn::time::milliseconds helloReplyFreshness,
58  ndn::time::milliseconds syncReplyFreshness,
59  CompressionScheme ibltCompression)
60  : PartialProducer(face, keyChain, syncPrefix,
61  Options{static_cast<uint32_t>(expectedNumEntries), ibltCompression,
62  helloReplyFreshness, syncReplyFreshness})
63 {
64  addUserNode(userPrefix);
65 }
66 
67 void
68 PartialProducer::publishName(const ndn::Name& prefix, std::optional<uint64_t> seq)
69 {
70  if (m_prefixes.find(prefix) == m_prefixes.end()) {
71  return;
72  }
73 
74  uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
75  NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
76  updateSeqNo(prefix, newSeq);
77  satisfyPendingSyncInterests(prefix);
78 }
79 
80 void
81 PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
82 {
83  const auto& name = interest.getName();
85  return;
86  }
87 
88  // Last component or fourth last component (in case of interest with version and segment)
89  // needs to be hello
90  if (name.get(name.size() - 1) != HELLO && name.get(name.size() - 4) != HELLO) {
91  return;
92  }
93 
94  NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest);
95 
96  detail::State state;
97  for (const auto& p : m_prefixes) {
98  state.addContent(ndn::Name(p.first).appendNumber(p.second));
99  }
100  NDN_LOG_DEBUG("sending content p: " << state);
101 
102  ndn::Name helloDataName = prefix;
103  m_iblt.appendToName(helloDataName);
104 
105  m_segmentPublisher.publish(interest.getName(), helloDataName,
106  state.wireEncode(), m_helloReplyFreshness);
107 }
108 
109 void
110 PartialProducer::onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest)
111 {
112  if (m_segmentPublisher.replyFromStore(interest.getName())) {
113  return;
114  }
115 
116  NDN_LOG_DEBUG("Sync Interest Received, nonce: " << interest.getNonce() <<
117  " hash: " << std::hash<ndn::Name>{}(interest.getName()));
118 
119  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefix.size());
120  ndn::Name interestName;
121 
122  if (nameWithoutSyncPrefix.size() == 4) {
123  // Get /<prefix>/BF/IBF/ from /<prefix>/BF/IBF (3 components of BF + 1 for IBF)
124  interestName = interest.getName();
125  }
126  else if (nameWithoutSyncPrefix.size() == 6) {
127  // Get <prefix>/BF/IBF/ from /<prefix>/BF/IBF/<version>/<segment-no>
128  interestName = interest.getName().getPrefix(-2);
129  }
130  else {
131  return;
132  }
133 
134  ndn::name::Component bfName, ibltName;
135  unsigned int projectedCount;
136  double falsePositiveProb;
137  try {
138  projectedCount = interestName.get(interestName.size()-4).toNumber();
139  falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
140  bfName = interestName.get(interestName.size()-2);
141 
142  ibltName = interestName.get(interestName.size()-1);
143  }
144  catch (const std::exception& e) {
145  NDN_LOG_ERROR("Cannot extract bloom filter and IBF from sync interest: " << e.what());
146  NDN_LOG_ERROR("Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
147  return;
148  }
149 
150  detail::BloomFilter bf;
151  detail::IBLT iblt(m_expectedNumEntries, m_ibltCompression);
152  try {
153  bf = detail::BloomFilter(projectedCount, falsePositiveProb, bfName);
154  iblt.initialize(ibltName);
155  }
156  catch (const std::exception& e) {
157  NDN_LOG_WARN(e.what());
158  return;
159  }
160 
161  // get the difference
162  // non-empty positive means we have some elements that the others don't
163  auto diff = m_iblt - iblt;
164 
165  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
166 
167  NDN_LOG_TRACE("diff.canDecode: " << diff.canDecode);
168 
169  if (!diff.canDecode) {
170  NDN_LOG_DEBUG("Can't decode the difference, sending application Nack");
171  sendApplicationNack(interestName);
172  return;
173  }
174 
175  // generate content for Sync reply
176  detail::State state;
177  NDN_LOG_TRACE("Size of positive set " << diff.positive.size());
178  NDN_LOG_TRACE("Size of negative set " << diff.negative.size());
179  for (const auto& hash : diff.positive) {
180  auto nameIt = m_biMap.left.find(hash);
181  if (nameIt != m_biMap.left.end()) {
182  if (bf.contains(nameIt->second.getPrefix(-1))) {
183  // generate data
184  state.addContent(nameIt->second);
185  NDN_LOG_DEBUG("Content: " << nameIt->second << " " << nameIt->first);
186  }
187  }
188  }
189 
190  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << diff.positive.size() + diff.negative.size());
191 
192  if (diff.positive.size() + diff.negative.size() >= m_threshold || !state.getContent().empty()) {
193 
194  // send back data
195  ndn::Name syncDataName = interestName;
196  m_iblt.appendToName(syncDataName);
197 
198  m_segmentPublisher.publish(interest.getName(), syncDataName,
199  state.wireEncode(), m_syncReplyFreshness);
200  return;
201  }
202 
203  auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{bf, iblt, {}}).first->second;
204  entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
205  [this, interest] {
206  NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
207  m_pendingEntries.erase(interest.getName());
208  });
209 }
210 
211 void
212 PartialProducer::satisfyPendingSyncInterests(const ndn::Name& prefix) {
213  NDN_LOG_TRACE("size of pending interest: " << m_pendingEntries.size());
214 
215  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
216  const PendingEntryInfo& entry = it->second;
217 
218  auto diff = m_iblt - entry.iblt;
219 
220  NDN_LOG_TRACE("diff.canDecode: " << diff.canDecode);
221 
222  NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
223  NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << diff.positive.size() + diff.negative.size());
224 
225  if (!diff.canDecode) {
226  NDN_LOG_TRACE("Decoding of differences with stored IBF unsuccessful, deleting pending interest");
227  m_pendingEntries.erase(it++);
228  continue;
229  }
230 
231  detail::State state;
232  if (entry.bf.contains(prefix) || diff.positive.size() + diff.negative.size() >= m_threshold) {
233  if (entry.bf.contains(prefix)) {
234  state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
235  NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
236  }
237  else {
238  NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
239  }
240 
241  // generate sync data and cancel the event
242  ndn::Name syncDataName = it->first;
243  m_iblt.appendToName(syncDataName);
244 
245  m_segmentPublisher.publish(it->first, syncDataName,
246  state.wireEncode(), m_syncReplyFreshness);
247 
248  m_pendingEntries.erase(it++);
249  }
250  else {
251  ++it;
252  }
253  }
254 }
255 
256 } // namespace psync
Partial sync logic to publish data names.
PartialProducer(ndn::Face &face, ndn::KeyChain &keyChain, const ndn::Name &syncPrefix, const Options &opts)
Constructor.
void publishName(const ndn::Name &prefix, std::optional< uint64_t > seq=std::nullopt)
Publish name to let subscribed consumers know.
Base class for PartialProducer and FullProducer.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
const ndn::Name m_syncPrefix
const ndn::time::milliseconds m_syncReplyFreshness
const size_t m_expectedNumEntries
bool addUserNode(const ndn::Name &prefix)
Adds a user node for synchronization.
std::map< ndn::Name, uint64_t > m_prefixes
const size_t m_threshold
HashNameBiMap m_biMap
ndn::Scheduler m_scheduler
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
const CompressionScheme m_ibltCompression
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
SegmentPublisher m_segmentPublisher
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, ndn::span< const uint8_t > buffer, ndn::time::milliseconds freshness)
Put all the segments in memory.
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:138
Definition: common.hpp:34
const ndn::name::Component SYNC
CompressionScheme
Definition: common.hpp:43
const ndn::name::Component HELLO