23 #include <ndn-cxx/util/logger.hpp>
31 const ndn::name::Component
HELLO{
"hello"};
32 const ndn::name::Component
SYNC{
"sync"};
35 ndn::KeyChain& keyChain,
36 const ndn::Name& syncPrefix,
38 :
ProducerBase(face, keyChain, opts.ibfCount, syncPrefix, opts.syncDataFreshness,
40 , m_helloReplyFreshness(opts.helloDataFreshness)
43 [
this] (
const auto&) {
45 std::bind(&PartialProducer::onHelloInterest,
this, _1, _2));
47 std::bind(&PartialProducer::onSyncInterest,
this, _1, _2));
49 [] (
auto&&... args) {
onRegisterFailed(std::forward<decltype(args)>(args)...); });
53 ndn::KeyChain& keyChain,
54 size_t expectedNumEntries,
55 const ndn::Name& syncPrefix,
56 const ndn::Name& userPrefix,
57 ndn::time::milliseconds helloReplyFreshness,
58 ndn::time::milliseconds syncReplyFreshness,
61 Options{static_cast<uint32_t>(expectedNumEntries), ibltCompression,
62 helloReplyFreshness, syncReplyFreshness})
74 uint64_t newSeq = seq.value_or(
m_prefixes[prefix] + 1);
75 NDN_LOG_INFO(
"Publish: " << prefix <<
"/" << newSeq);
77 satisfyPendingSyncInterests(prefix);
81 PartialProducer::onHelloInterest(
const ndn::Name& prefix,
const ndn::Interest& interest)
83 const auto& name = interest.getName();
90 if (name.get(name.size() - 1) !=
HELLO && name.get(name.size() - 4) !=
HELLO) {
94 NDN_LOG_DEBUG(
"Hello Interest Received, nonce: " << interest);
98 state.addContent(ndn::Name(p.first).appendNumber(p.second));
100 NDN_LOG_DEBUG(
"sending content p: " << state);
102 ndn::Name helloDataName = prefix;
106 state.wireEncode(), m_helloReplyFreshness);
110 PartialProducer::onSyncInterest(
const ndn::Name& prefix,
const ndn::Interest& interest)
116 NDN_LOG_DEBUG(
"Sync Interest Received, nonce: " << interest.getNonce() <<
117 " hash: " << std::hash<ndn::Name>{}(interest.getName()));
119 ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefix.size());
120 ndn::Name interestName;
122 if (nameWithoutSyncPrefix.size() == 4) {
124 interestName = interest.getName();
126 else if (nameWithoutSyncPrefix.size() == 6) {
128 interestName = interest.getName().getPrefix(-2);
134 ndn::name::Component bfName, ibltName;
135 unsigned int projectedCount;
136 double falsePositiveProb;
138 projectedCount = interestName.get(interestName.size()-4).toNumber();
139 falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
140 bfName = interestName.get(interestName.size()-2);
142 ibltName = interestName.get(interestName.size()-1);
144 catch (
const std::exception& e) {
145 NDN_LOG_ERROR(
"Cannot extract bloom filter and IBF from sync interest: " << e.what());
146 NDN_LOG_ERROR(
"Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
150 detail::BloomFilter bf;
153 bf = detail::BloomFilter(projectedCount, falsePositiveProb, bfName);
154 iblt.initialize(ibltName);
156 catch (
const std::exception& e) {
157 NDN_LOG_WARN(e.what());
163 auto diff =
m_iblt - iblt;
165 NDN_LOG_TRACE(
"Number elements in IBF: " <<
m_prefixes.size());
167 NDN_LOG_TRACE(
"diff.canDecode: " << diff.canDecode);
169 if (!diff.canDecode) {
170 NDN_LOG_DEBUG(
"Can't decode the difference, sending application Nack");
177 NDN_LOG_TRACE(
"Size of positive set " << diff.positive.size());
178 NDN_LOG_TRACE(
"Size of negative set " << diff.negative.size());
179 for (
const auto& hash : diff.positive) {
180 auto nameIt =
m_biMap.left.find(hash);
181 if (nameIt !=
m_biMap.left.end()) {
182 if (bf.contains(nameIt->second.getPrefix(-1))) {
184 state.addContent(nameIt->second);
185 NDN_LOG_DEBUG(
"Content: " << nameIt->second <<
" " << nameIt->first);
190 NDN_LOG_TRACE(
"m_threshold: " <<
m_threshold <<
" Total: " << diff.positive.size() + diff.negative.size());
192 if (diff.positive.size() + diff.negative.size() >=
m_threshold || !state.getContent().empty()) {
195 ndn::Name syncDataName = interestName;
203 auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{bf, iblt, {}}).first->second;
204 entry.expirationEvent =
m_scheduler.schedule(interest.getInterestLifetime(),
206 NDN_LOG_TRACE(
"Erase Pending Interest " << interest.getNonce());
207 m_pendingEntries.erase(interest.getName());
212 PartialProducer::satisfyPendingSyncInterests(
const ndn::Name& prefix) {
213 NDN_LOG_TRACE(
"size of pending interest: " << m_pendingEntries.size());
215 for (
auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
216 const PendingEntryInfo& entry = it->second;
218 auto diff =
m_iblt - entry.iblt;
220 NDN_LOG_TRACE(
"diff.canDecode: " << diff.canDecode);
222 NDN_LOG_TRACE(
"Number elements in IBF: " <<
m_prefixes.size());
223 NDN_LOG_TRACE(
"m_threshold: " <<
m_threshold <<
" Total: " << diff.positive.size() + diff.negative.size());
225 if (!diff.canDecode) {
226 NDN_LOG_TRACE(
"Decoding of differences with stored IBF unsuccessful, deleting pending interest");
227 m_pendingEntries.erase(it++);
232 if (entry.bf.contains(prefix) || diff.positive.size() + diff.negative.size() >=
m_threshold) {
233 if (entry.bf.contains(prefix)) {
234 state.addContent(ndn::Name(prefix).appendNumber(
m_prefixes[prefix]));
235 NDN_LOG_DEBUG(
"sending sync content " << prefix <<
" " << std::to_string(
m_prefixes[prefix]));
238 NDN_LOG_DEBUG(
"Sending with empty content to send latest IBF to consumer");
242 ndn::Name syncDataName = it->first;
248 m_pendingEntries.erase(it++);
Partial sync logic to publish data names.
PartialProducer(ndn::Face &face, ndn::KeyChain &keyChain, const ndn::Name &syncPrefix, const Options &opts)
Constructor.
void publishName(const ndn::Name &prefix, std::optional< uint64_t > seq=std::nullopt)
Publish name to let subscribed consumers know.
Base class for PartialProducer and FullProducer.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
const ndn::Name m_syncPrefix
const ndn::time::milliseconds m_syncReplyFreshness
const size_t m_expectedNumEntries
bool addUserNode(const ndn::Name &prefix)
Adds a user node for synchronization.
std::map< ndn::Name, uint64_t > m_prefixes
ndn::Scheduler m_scheduler
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
const CompressionScheme m_ibltCompression
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
SegmentPublisher m_segmentPublisher
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, ndn::span< const uint8_t > buffer, ndn::time::milliseconds freshness)
Put all the segments in memory.
void appendToName(ndn::Name &name) const
Appends self to name.
const ndn::name::Component SYNC
const ndn::name::Component HELLO