full-producer.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2024, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #ifndef PSYNC_FULL_PRODUCER_HPP
21 #define PSYNC_FULL_PRODUCER_HPP
22 
23 #include "PSync/producer-base.hpp"
24 
25 #include <random>
26 #include <set>
27 
28 #include <ndn-cxx/util/segment-fetcher.hpp>
29 
30 namespace psync {
31 
41 class FullProducer : public ProducerBase
42 {
43 public:
47  struct Options
48  {
50  UpdateCallback onUpdate = [] (const auto&) {};
52  uint32_t ibfCount = 80;
56  ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME;
58  ndn::time::milliseconds syncDataFreshness = SYNC_REPLY_FRESHNESS;
61  };
62 
71  FullProducer(ndn::Face& face,
72  ndn::KeyChain& keyChain,
73  const ndn::Name& syncPrefix,
74  const Options& opts);
75 
76  [[deprecated]]
77  FullProducer(ndn::Face& face,
78  ndn::KeyChain& keyChain,
79  size_t expectedNumEntries,
80  const ndn::Name& syncPrefix,
81  const ndn::Name& userPrefix,
82  UpdateCallback onUpdateCallBack,
83  ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME,
84  ndn::time::milliseconds syncReplyFreshness = SYNC_REPLY_FRESHNESS,
86  CompressionScheme contentCompression = CompressionScheme::DEFAULT);
87 
88  ~FullProducer();
89 
101  void
102  publishName(const ndn::Name& prefix, std::optional<uint64_t> seq = std::nullopt);
103 
112  void
113  sendSyncInterest();
114 
115  void
116  processWaitingInterests();
117 
118  void
119  scheduleProcessWaitingInterests();
120 
136  void
137  onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest,
138  bool isTimedProcessing = false);
139 
151  void
152  sendSyncData(const ndn::Name& name, const ndn::Block& block,
153  ndn::time::milliseconds syncReplyFreshness);
154 
170  void
171  onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr);
172 
173 private:
183  void
184  satisfyPendingInterests(const ndn::Name& updatedPrefixWithSeq);
185 
189  void
190  deletePendingInterests(const ndn::Name& interestName);
191 
198  bool
199  isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative);
200 
201 #ifdef PSYNC_WITH_TESTS
202 public:
203  size_t nIbfDecodeFailuresAboveThreshold = 0;
204  size_t nIbfDecodeFailuresBelowThreshold = 0;
205 #endif // PSYNC_WITH_TESTS
206 
207 private:
208  struct PendingEntryInfo
209  {
210  detail::IBLT iblt;
211  ndn::scheduler::ScopedEventId expirationEvent;
212  };
213 
214  struct WaitingEntryInfo
215  {
216  uint16_t numTries = 0;
217  ndn::Interest::Nonce nonce;
218  };
219 
220  ndn::time::milliseconds m_syncInterestLifetime;
221  UpdateCallback m_onUpdate;
222  ndn::scheduler::ScopedEventId m_scheduledSyncInterestId;
223  static constexpr int MIN_JITTER = 100;
224  static constexpr int MAX_JITTER = 500;
225  std::uniform_int_distribution<> m_jitter{MIN_JITTER, MAX_JITTER};
226  ndn::time::system_clock::time_point m_lastInterestSentTime;
227  ndn::Name m_outstandingInterestName;
228  ndn::ScopedRegisteredPrefixHandle m_registeredPrefix;
229  std::shared_ptr<ndn::SegmentFetcher> m_fetcher;
230  uint64_t m_incomingFace = 0;
231  std::map<ndn::Name, WaitingEntryInfo> m_waitingForProcessing;
232  bool m_inNoNewDataWaitOutPeriod = false;
233  ndn::scheduler::ScopedEventId m_interestDelayTimerId;
234 
236  std::map<ndn::Name, PendingEntryInfo> m_pendingEntries;
237 };
238 
239 } // namespace psync
240 
241 #endif // PSYNC_FULL_PRODUCER_HPP
#define PSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE
Full sync logic to synchronize with other nodes where all nodes wants to get all names prefixes synce...
void publishName(const ndn::Name &prefix, std::optional< uint64_t > seq=std::nullopt)
Publish name to let others know.
FullProducer(ndn::Face &face, ndn::KeyChain &keyChain, const ndn::Name &syncPrefix, const Options &opts)
Constructor.
Base class for PartialProducer and FullProducer.
Invertible Bloom Lookup Table (Invertible Bloom Filter)
Definition: iblt.hpp:95
Definition: common.hpp:34
CompressionScheme
Definition: common.hpp:43
constexpr ndn::time::milliseconds SYNC_INTEREST_LIFETIME
Definition: common.hpp:40
constexpr ndn::time::milliseconds SYNC_REPLY_FRESHNESS
Definition: common.hpp:41
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
Definition: common.hpp:71
Constructor options.
uint32_t ibfCount
Expected number of entries in IBF.
UpdateCallback onUpdate
Callback to be invoked when there is new data.
ndn::time::milliseconds syncInterestLifetime
Lifetime of sync Interest.
ndn::time::milliseconds syncDataFreshness
FreshnessPeriod of sync Data.
CompressionScheme ibfCompression
Compression scheme to use for IBF.
CompressionScheme contentCompression
Compression scheme to use for Data content.