Loading...
Searching...
No Matches
producer-base.cpp
Go to the documentation of this file.
1/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2024, The University of Memphis
4 *
5 * This file is part of PSync.
6 * See AUTHORS.md for complete list of PSync authors and contributors.
7 *
8 * PSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU Lesser General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public License along with
17 * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 */
19
21#include "PSync/detail/util.hpp"
22
23#include <ndn-cxx/util/exception.hpp>
24#include <ndn-cxx/util/logger.hpp>
25
26namespace psync {
27
28NDN_LOG_INIT(psync.ProducerBase);
29
31 ndn::KeyChain& keyChain,
32 size_t expectedNumEntries,
33 const ndn::Name& syncPrefix,
34 ndn::time::milliseconds syncReplyFreshness,
35 CompressionScheme ibltCompression,
36 CompressionScheme contentCompression)
37 : m_face(face)
38 , m_keyChain(keyChain)
39 , m_scheduler(m_face.getIoContext())
40 , m_rng(ndn::random::getRandomNumberEngine())
41 , m_iblt(expectedNumEntries, ibltCompression)
42 , m_segmentPublisher(m_face, m_keyChain)
43 , m_expectedNumEntries(expectedNumEntries)
44 , m_threshold(expectedNumEntries / 2)
45 , m_syncPrefix(syncPrefix)
46 , m_syncReplyFreshness(syncReplyFreshness)
47 , m_ibltCompression(ibltCompression)
48 , m_contentCompression(contentCompression)
49{
50}
51
52bool
53ProducerBase::addUserNode(const ndn::Name& prefix)
54{
55 if (m_prefixes.find(prefix) == m_prefixes.end()) {
56 m_prefixes[prefix] = 0;
57 return true;
58 }
59 else {
60 return false;
61 }
62}
63
64void
65ProducerBase::removeUserNode(const ndn::Name& prefix)
66{
67 auto it = m_prefixes.find(prefix);
68 if (it != m_prefixes.end()) {
69 uint64_t seqNo = it->second;
70 m_prefixes.erase(it);
71
72 ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seqNo);
73 auto hashIt = m_biMap.right.find(prefixWithSeq);
74 if (hashIt != m_biMap.right.end()) {
75 m_iblt.erase(hashIt->second);
76 m_biMap.right.erase(hashIt);
77 }
78 }
79}
80
81void
82ProducerBase::updateSeqNo(const ndn::Name& prefix, uint64_t seq)
83{
84 NDN_LOG_DEBUG("UpdateSeq: " << prefix << " " << seq);
85
86 uint64_t oldSeq;
87 auto it = m_prefixes.find(prefix);
88 if (it != m_prefixes.end()) {
89 oldSeq = it->second;
90 }
91 else {
92 NDN_LOG_WARN("Prefix not found in m_prefixes");
93 return;
94 }
95
96 if (oldSeq >= seq) {
97 NDN_LOG_WARN("Update has lower/equal seq no for prefix, doing nothing!");
98 return;
99 }
100
101 // Delete the last sequence prefix from the iblt
102 // Because we don't insert zeroth prefix in IBF so no need to delete that
103 if (oldSeq != 0) {
104 ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(oldSeq);
105 auto hashIt = m_biMap.right.find(prefixWithSeq);
106 if (hashIt != m_biMap.right.end()) {
107 m_iblt.erase(hashIt->second);
108 m_biMap.right.erase(hashIt);
109 }
110 }
111
112 // Insert the new seq no in m_prefixes, m_biMap, and m_iblt
113 it->second = seq;
114 ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seq);
115 auto newHash = detail::murmurHash3(detail::N_HASHCHECK, prefixWithSeq);
116 m_biMap.insert({newHash, prefixWithSeq});
117 m_iblt.insert(newHash);
118
119 m_numOwnElements += (seq - oldSeq);
120}
121
122void
124{
125 NDN_LOG_DEBUG("Sending application nack");
126
127 ndn::Name dataName(name);
128 m_iblt.appendToName(dataName);
129 dataName.appendSegment(0);
130 ndn::Data data(dataName);
131 data.setContentType(ndn::tlv::ContentType_Nack)
132 .setFreshnessPeriod(m_syncReplyFreshness)
133 .setFinalBlock(dataName[-1]);
134
135 m_keyChain.sign(data);
136 m_face.put(data);
137}
138
139void
140ProducerBase::onRegisterFailed(const ndn::Name& prefix, const std::string& msg)
141{
142 NDN_LOG_ERROR("onRegisterFailed(" << prefix << "): " << msg);
143 NDN_THROW(Error(msg));
144}
145
146} // namespace psync
Base class for PartialProducer and FullProducer.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
const ndn::time::milliseconds m_syncReplyFreshness
void removeUserNode(const ndn::Name &prefix)
Remove the user node from synchronization.
bool addUserNode(const ndn::Name &prefix)
Adds a user node for synchronization.
std::map< ndn::Name, uint64_t > m_prefixes
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
ProducerBase(ndn::Face &face, ndn::KeyChain &keyChain, size_t expectedNumEntries, const ndn::Name &syncPrefix, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::NONE, CompressionScheme contentCompression=CompressionScheme::NONE)
Constructor.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
ndn::KeyChain & m_keyChain
void insert(uint32_t key)
Definition iblt.cpp:126
void appendToName(ndn::Name &name) const
Appends self to name.
Definition iblt.cpp:138
void erase(uint32_t key)
Definition iblt.cpp:132
uint32_t murmurHash3(const void *key, size_t len, uint32_t seed)
Definition util.cpp:58
constexpr size_t N_HASHCHECK
Definition iblt.hpp:87
CompressionScheme
Definition common.hpp:43