23 #include <ndn-cxx/util/logger.hpp> 34 const ndn::Name& syncPrefix,
35 const ndn::Name& userPrefix,
36 ndn::time::milliseconds helloReplyFreshness,
37 ndn::time::milliseconds syncReplyFreshness,
40 userPrefix, syncReplyFreshness, ibltCompression)
41 , m_helloReplyFreshness(helloReplyFreshness)
44 [
this] (
const ndn::Name& syncPrefix) {
46 std::bind(&PartialProducer::onHelloInterest,
this, _1, _2));
48 std::bind(&PartialProducer::onSyncInterest,
this, _1, _2));
60 uint64_t newSeq = seq.value_or(
m_prefixes[prefix] + 1);
62 NDN_LOG_INFO(
"Publish: " << prefix <<
"/" << newSeq);
66 satisfyPendingSyncInterests(prefix);
70 PartialProducer::onHelloInterest(
const ndn::Name& prefix,
const ndn::Interest& interest)
78 if (interest.getName().get(interest.getName().size()-1).toUri() !=
"hello" &&
79 interest.getName().get(interest.getName().size()-4).toUri() !=
"hello") {
83 NDN_LOG_DEBUG(
"Hello Interest Received, nonce: " << interest);
87 state.
addContent(ndn::Name(p.first).appendNumber(p.second));
89 NDN_LOG_DEBUG(
"sending content p: " << state);
91 ndn::Name helloDataName = prefix;
99 PartialProducer::onSyncInterest(
const ndn::Name& prefix,
const ndn::Interest& interest)
105 NDN_LOG_DEBUG(
"Sync Interest Received, nonce: " << interest.getNonce() <<
106 " hash: " << std::hash<std::string>{}(interest.getName().toUri()));
108 ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefix.size());
109 ndn::Name interestName;
111 if (nameWithoutSyncPrefix.size() == 4) {
113 interestName = interest.getName();
115 else if (nameWithoutSyncPrefix.size() == 6) {
117 interestName = interest.getName().getPrefix(-2);
123 ndn::name::Component bfName, ibltName;
124 unsigned int projectedCount;
125 double falsePositiveProb;
127 projectedCount = interestName.get(interestName.size()-4).toNumber();
128 falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
129 bfName = interestName.get(interestName.size()-2);
131 ibltName = interestName.get(interestName.size()-1);
133 catch (
const std::exception& e) {
134 NDN_LOG_ERROR(
"Cannot extract bloom filter and IBF from sync interest: " << e.what());
135 NDN_LOG_ERROR(
"Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
145 catch (
const std::exception& e) {
146 NDN_LOG_WARN(e.what());
151 auto diff =
m_iblt - iblt;
154 std::set<uint32_t> positive;
155 std::set<uint32_t> negative;
157 NDN_LOG_TRACE(
"Number elements in IBF: " <<
m_prefixes.size());
161 NDN_LOG_TRACE(
"Result of listEntries on the difference: " << peel);
164 NDN_LOG_DEBUG(
"Can't decode the difference, sending application Nack");
171 NDN_LOG_TRACE(
"Size of positive set " << positive.size());
172 NDN_LOG_TRACE(
"Size of negative set " << negative.size());
173 for (
const auto& hash : positive) {
174 auto nameIt =
m_biMap.left.find(hash);
175 if (nameIt !=
m_biMap.left.end()) {
176 if (bf.
contains(nameIt->second.getPrefix(-1).toUri())) {
179 NDN_LOG_DEBUG(
"Content: " << nameIt->second <<
" " << nameIt->first);
184 NDN_LOG_TRACE(
"m_threshold: " <<
m_threshold <<
" Total: " << positive.size() + negative.size());
189 ndn::Name syncDataName = interestName;
197 auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{bf, iblt, {}}).first->second;
198 entry.expirationEvent =
m_scheduler.schedule(interest.getInterestLifetime(),
200 NDN_LOG_TRACE(
"Erase Pending Interest " << interest.getNonce());
201 m_pendingEntries.erase(interest.getName());
206 PartialProducer::satisfyPendingSyncInterests(
const ndn::Name& prefix) {
207 NDN_LOG_TRACE(
"size of pending interest: " << m_pendingEntries.size());
209 for (
auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
210 const PendingEntryInfo& entry = it->second;
212 auto diff =
m_iblt - entry.iblt;
213 std::set<uint32_t> positive;
214 std::set<uint32_t> negative;
218 NDN_LOG_TRACE(
"Result of listEntries on the difference: " << peel);
220 NDN_LOG_TRACE(
"Number elements in IBF: " <<
m_prefixes.size());
221 NDN_LOG_TRACE(
"m_threshold: " <<
m_threshold <<
" Total: " << positive.size() + negative.size());
224 NDN_LOG_TRACE(
"Decoding of differences with stored IBF unsuccessful, deleting pending interest");
225 m_pendingEntries.erase(it++);
230 if (entry.bf.contains(prefix.toUri()) || positive.size() + negative.size() >=
m_threshold) {
231 if (entry.bf.contains(prefix.toUri())) {
233 NDN_LOG_DEBUG(
"sending sync content " << prefix <<
" " << std::to_string(
m_prefixes[prefix]));
236 NDN_LOG_DEBUG(
"Sending with empty content to send latest IBF to consumer");
240 ndn::Name syncDataName = it->first;
246 m_pendingEntries.erase(it++);
void appendToName(ndn::Name &name) const
Appends self to name.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
Invertible Bloom Lookup Table (Invertible Bloom Filter)
const ndn::Block & wireEncode() const
NDN_LOG_INIT(psync.Consumer)
Partial sync logic to publish data names.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, const ndn::Block &block, ndn::time::milliseconds freshness, const ndn::security::SigningInfo &signingInfo=ndn::security::SigningInfo())
Put all the segments in memory.
void addContent(const ndn::Name &prefix)
bool listEntries(std::set< uint32_t > &positive, std::set< uint32_t > &negative) const
List all the entries in the IBLT.
CompressionScheme m_ibltCompression
bool contains(const std::string &key) const
void publishName(const ndn::Name &prefix, ndn::optional< uint64_t > seq=ndn::nullopt)
Publish name to let subscribed consumers know.
uint32_t m_expectedNumEntries
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void initialize(const ndn::name::Component &ibltName)
Populate the hash table using the vector representation of IBLT.
const std::vector< ndn::Name > & getContent() const
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
PartialProducer(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, ndn::time::milliseconds helloReplyFreshness=HELLO_REPLY_FRESHNESS, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::NONE)
constructor
ndn::Scheduler m_scheduler
ndn::time::milliseconds m_syncReplyFreshness
SegmentPublisher m_segmentPublisher
Base class for PartialProducer and FullProducer.
std::map< ndn::Name, uint64_t > m_prefixes