Loading...
Searching...
No Matches
partial-producer.cpp
Go to the documentation of this file.
1/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2024, The University of Memphis
4 *
5 * This file is part of PSync.
6 * See AUTHORS.md for complete list of PSync authors and contributors.
7 *
8 * PSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU Lesser General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License along with
17 * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 **/
19
22
23#include <ndn-cxx/util/logger.hpp>
24
25#include <cstring>
26
27namespace psync {
28
29NDN_LOG_INIT(psync.PartialProducer);
30
31const ndn::name::Component HELLO{"hello"};
32const ndn::name::Component SYNC{"sync"};
33
35 ndn::KeyChain& keyChain,
36 const ndn::Name& syncPrefix,
37 const Options& opts)
38 : ProducerBase(face, keyChain, opts.ibfCount, syncPrefix, opts.syncDataFreshness,
39 opts.ibfCompression, CompressionScheme::NONE)
40 , m_helloReplyFreshness(opts.helloDataFreshness)
41{
42 m_registeredPrefix = m_face.registerPrefix(m_syncPrefix,
43 [this] (const auto&) {
44 m_face.setInterestFilter(ndn::Name(m_syncPrefix).append(HELLO),
45 std::bind(&PartialProducer::onHelloInterest, this, _1, _2));
46 m_face.setInterestFilter(ndn::Name(m_syncPrefix).append(SYNC),
47 std::bind(&PartialProducer::onSyncInterest, this, _1, _2));
48 },
49 [] (auto&&... args) { onRegisterFailed(std::forward<decltype(args)>(args)...); });
50}
51
53 ndn::KeyChain& keyChain,
54 size_t expectedNumEntries,
55 const ndn::Name& syncPrefix,
56 const ndn::Name& userPrefix,
57 ndn::time::milliseconds helloReplyFreshness,
58 ndn::time::milliseconds syncReplyFreshness,
59 CompressionScheme ibltCompression)
60 : PartialProducer(face, keyChain, syncPrefix,
61 Options{static_cast<uint32_t>(expectedNumEntries), ibltCompression,
62 helloReplyFreshness, syncReplyFreshness})
63{
64 addUserNode(userPrefix);
65}
66
67void
68PartialProducer::publishName(const ndn::Name& prefix, std::optional<uint64_t> seq)
69{
70 if (m_prefixes.find(prefix) == m_prefixes.end()) {
71 return;
72 }
73
74 uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
75 NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
76 updateSeqNo(prefix, newSeq);
77 satisfyPendingSyncInterests(prefix);
78}
79
80void
81PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
82{
83 const auto& name = interest.getName();
85 return;
86 }
87
88 // Last component or fourth last component (in case of interest with version and segment)
89 // needs to be hello
90 if (name.get(name.size() - 1) != HELLO && name.get(name.size() - 4) != HELLO) {
91 return;
92 }
93
94 NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest);
95
96 detail::State state;
97 for (const auto& p : m_prefixes) {
98 state.addContent(ndn::Name(p.first).appendNumber(p.second));
99 }
100 NDN_LOG_DEBUG("sending content p: " << state);
101
102 ndn::Name helloDataName = prefix;
103 m_iblt.appendToName(helloDataName);
104
105 m_segmentPublisher.publish(interest.getName(), helloDataName,
106 state.wireEncode(), m_helloReplyFreshness);
107}
108
109void
110PartialProducer::onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest)
111{
112 if (m_segmentPublisher.replyFromStore(interest.getName())) {
113 return;
114 }
115
116 NDN_LOG_DEBUG("Sync Interest Received, nonce: " << interest.getNonce() <<
117 " hash: " << std::hash<ndn::Name>{}(interest.getName()));
118
119 ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefix.size());
120 ndn::Name interestName;
121
122 if (nameWithoutSyncPrefix.size() == 4) {
123 // Get /<prefix>/BF/IBF/ from /<prefix>/BF/IBF (3 components of BF + 1 for IBF)
124 interestName = interest.getName();
125 }
126 else if (nameWithoutSyncPrefix.size() == 6) {
127 // Get <prefix>/BF/IBF/ from /<prefix>/BF/IBF/<version>/<segment-no>
128 interestName = interest.getName().getPrefix(-2);
129 }
130 else {
131 return;
132 }
133
134 ndn::name::Component bfName, ibltName;
135 unsigned int projectedCount;
136 double falsePositiveProb;
137 try {
138 projectedCount = interestName.get(interestName.size()-4).toNumber();
139 falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
140 bfName = interestName.get(interestName.size()-2);
141
142 ibltName = interestName.get(interestName.size()-1);
143 }
144 catch (const std::exception& e) {
145 NDN_LOG_ERROR("Cannot extract bloom filter and IBF from sync interest: " << e.what());
146 NDN_LOG_ERROR("Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
147 return;
148 }
149
150 detail::BloomFilter bf;
151 detail::IBLT iblt(m_expectedNumEntries, m_ibltCompression);
152 try {
153 bf = detail::BloomFilter(projectedCount, falsePositiveProb, bfName);
154 iblt.initialize(ibltName);
155 }
156 catch (const std::exception& e) {
157 NDN_LOG_WARN(e.what());
158 return;
159 }
160
161 // get the difference
162 // non-empty positive means we have some elements that the others don't
163 auto diff = m_iblt - iblt;
164
165 NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
166
167 NDN_LOG_TRACE("diff.canDecode: " << diff.canDecode);
168
169 if (!diff.canDecode) {
170 NDN_LOG_DEBUG("Can't decode the difference, sending application Nack");
171 sendApplicationNack(interestName);
172 return;
173 }
174
175 // generate content for Sync reply
176 detail::State state;
177 NDN_LOG_TRACE("Size of positive set " << diff.positive.size());
178 NDN_LOG_TRACE("Size of negative set " << diff.negative.size());
179 for (const auto& hash : diff.positive) {
180 auto nameIt = m_biMap.left.find(hash);
181 if (nameIt != m_biMap.left.end()) {
182 if (bf.contains(nameIt->second.getPrefix(-1))) {
183 // generate data
184 state.addContent(nameIt->second);
185 NDN_LOG_DEBUG("Content: " << nameIt->second << " " << nameIt->first);
186 }
187 }
188 }
189
190 NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << diff.positive.size() + diff.negative.size());
191
192 if (diff.positive.size() + diff.negative.size() >= m_threshold || !state.getContent().empty()) {
193
194 // send back data
195 ndn::Name syncDataName = interestName;
196 m_iblt.appendToName(syncDataName);
197
198 m_segmentPublisher.publish(interest.getName(), syncDataName,
199 state.wireEncode(), m_syncReplyFreshness);
200 return;
201 }
202
203 auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{bf, iblt, {}}).first->second;
204 entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
205 [this, interest] {
206 NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
207 m_pendingEntries.erase(interest.getName());
208 });
209}
210
211void
212PartialProducer::satisfyPendingSyncInterests(const ndn::Name& prefix) {
213 NDN_LOG_TRACE("size of pending interest: " << m_pendingEntries.size());
214
215 for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
216 const PendingEntryInfo& entry = it->second;
217
218 auto diff = m_iblt - entry.iblt;
219
220 NDN_LOG_TRACE("diff.canDecode: " << diff.canDecode);
221
222 NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
223 NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << diff.positive.size() + diff.negative.size());
224
225 if (!diff.canDecode) {
226 NDN_LOG_TRACE("Decoding of differences with stored IBF unsuccessful, deleting pending interest");
227 m_pendingEntries.erase(it++);
228 continue;
229 }
230
231 detail::State state;
232 if (entry.bf.contains(prefix) || diff.positive.size() + diff.negative.size() >= m_threshold) {
233 if (entry.bf.contains(prefix)) {
234 state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
235 NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
236 }
237 else {
238 NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
239 }
240
241 // generate sync data and cancel the event
242 ndn::Name syncDataName = it->first;
243 m_iblt.appendToName(syncDataName);
244
245 m_segmentPublisher.publish(it->first, syncDataName,
246 state.wireEncode(), m_syncReplyFreshness);
247
248 m_pendingEntries.erase(it++);
249 }
250 else {
251 ++it;
252 }
253 }
254}
255
256} // namespace psync
Partial sync logic to publish data names.
PartialProducer(ndn::Face &face, ndn::KeyChain &keyChain, const ndn::Name &syncPrefix, const Options &opts)
Constructor.
void publishName(const ndn::Name &prefix, std::optional< uint64_t > seq=std::nullopt)
Publish name to let subscribed consumers know.
Base class for PartialProducer and FullProducer.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
const ndn::Name m_syncPrefix
const ndn::time::milliseconds m_syncReplyFreshness
const size_t m_expectedNumEntries
bool addUserNode(const ndn::Name &prefix)
Adds a user node for synchronization.
std::map< ndn::Name, uint64_t > m_prefixes
ndn::Scheduler m_scheduler
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
const CompressionScheme m_ibltCompression
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
SegmentPublisher m_segmentPublisher
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, ndn::span< const uint8_t > buffer, ndn::time::milliseconds freshness)
Put all the segments in memory.
void appendToName(ndn::Name &name) const
Appends self to name.
Definition iblt.cpp:138
const ndn::name::Component SYNC
CompressionScheme
Definition common.hpp:43
const ndn::name::Component HELLO