consumer.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2024, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #ifndef PSYNC_CONSUMER_HPP
21 #define PSYNC_CONSUMER_HPP
22 
23 #include "PSync/common.hpp"
26 
27 #include <ndn-cxx/face.hpp>
28 #include <ndn-cxx/util/random.hpp>
29 #include <ndn-cxx/util/scheduler.hpp>
30 #include <ndn-cxx/util/segment-fetcher.hpp>
31 
32 #include <map>
33 
34 namespace psync {
35 
36 using ReceiveHelloCallback = std::function<void(const std::map<ndn::Name, uint64_t>&)>;
37 
55 class Consumer
56 {
57 public:
61  struct Options
62  {
64  ReceiveHelloCallback onHelloData = [] (const auto&) {};
66  UpdateCallback onUpdate = [] (const auto&) {};
68  uint32_t bfCount = 6;
70  double bfFalsePositive = 0.001;
74  ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME;
75  };
76 
84  Consumer(ndn::Face& face, const ndn::Name& syncPrefix, const Options& opts);
85 
86  [[deprecated]]
87  Consumer(const ndn::Name& syncPrefix,
88  ndn::Face& face,
89  const ReceiveHelloCallback& onReceiveHelloData,
90  const UpdateCallback& onUpdate,
91  unsigned int count,
92  double falsePositive = 0.001,
93  ndn::time::milliseconds helloInterestLifetime = HELLO_INTEREST_LIFETIME,
94  ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME);
95 
101  void
103 
109  void
111 
124  bool
125  addSubscription(const ndn::Name& prefix, uint64_t seqNo, bool callSyncDataCb = true);
126 
133  bool
134  removeSubscription(const ndn::Name& prefix);
135 
136  std::set<ndn::Name>
138  {
139  return m_subscriptionList;
140  }
141 
142  bool
143  isSubscribed(const ndn::Name& prefix) const
144  {
145  return m_subscriptionList.find(prefix) != m_subscriptionList.end();
146  }
147 
148  std::optional<uint64_t>
149  getSeqNo(const ndn::Name& prefix) const
150  {
151  auto it = m_prefixes.find(prefix);
152  if (it == m_prefixes.end()) {
153  return std::nullopt;
154  }
155  return it->second;
156  }
157 
161  void
162  stop();
163 
164 private:
177  void
178  onHelloData(const ndn::ConstBufferPtr& bufferPtr);
179 
190  void
191  onSyncData(const ndn::ConstBufferPtr& bufferPtr);
192 
194  ndn::Face& m_face;
195  ndn::Scheduler m_scheduler;
196 
197  ndn::Name m_syncPrefix;
198  ndn::Name m_helloInterestPrefix;
199  ndn::Name m_syncInterestPrefix;
200  ndn::Name m_iblt;
201  ndn::Name m_helloDataName;
202  ndn::Name m_syncDataName;
203  uint32_t m_syncDataContentType;
204 
205  ReceiveHelloCallback m_onReceiveHelloData;
206 
207  // Called when new sync update is received from producer.
208  UpdateCallback m_onUpdate;
209 
210  // Bloom filter is used to store application/user's subscription list.
211  detail::BloomFilter m_bloomFilter;
212 
213  ndn::time::milliseconds m_helloInterestLifetime;
214  ndn::time::milliseconds m_syncInterestLifetime;
215 
216  // Store sequence number for the prefix.
217  std::map<ndn::Name, uint64_t> m_prefixes;
218  std::set<ndn::Name> m_subscriptionList;
219 
220  ndn::random::RandomNumberEngine& m_rng;
221  std::uniform_int_distribution<> m_rangeUniformRandom;
222  std::shared_ptr<ndn::SegmentFetcher> m_helloFetcher;
223  std::shared_ptr<ndn::SegmentFetcher> m_syncFetcher;
224 };
225 
226 } // namespace psync
227 
228 #endif // PSYNC_CONSUMER_HPP
#define PSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE
Consumer logic to subscribe to producer's data.
Definition: consumer.hpp:56
bool addSubscription(const ndn::Name &prefix, uint64_t seqNo, bool callSyncDataCb=true)
Add prefix to subscription list.
Definition: consumer.cpp:62
bool removeSubscription(const ndn::Name &prefix)
Remove prefix from subscription list.
Definition: consumer.cpp:82
void stop()
Stop segment fetcher to stop the sync and free resources.
Definition: consumer.cpp:102
std::optional< uint64_t > getSeqNo(const ndn::Name &prefix) const
Definition: consumer.hpp:149
void sendHelloInterest()
send hello interest /<sync-prefix>/hello/
Definition: consumer.cpp:119
std::set< ndn::Name > getSubscriptionList() const
Definition: consumer.hpp:137
Consumer(ndn::Face &face, const ndn::Name &syncPrefix, const Options &opts)
Constructor.
Definition: consumer.cpp:30
void sendSyncInterest()
send sync interest /<sync-prefix>/sync/<BF>/<producers-IBF>
Definition: consumer.cpp:193
bool isSubscribed(const ndn::Name &prefix) const
Definition: consumer.hpp:143
Definition: common.hpp:34
constexpr ndn::time::milliseconds SYNC_INTEREST_LIFETIME
Definition: common.hpp:40
constexpr ndn::time::milliseconds HELLO_INTEREST_LIFETIME
Definition: common.hpp:38
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
Definition: common.hpp:71
std::function< void(const std::map< ndn::Name, uint64_t > &)> ReceiveHelloCallback
Definition: consumer.hpp:36
Constructor options.
Definition: consumer.hpp:62
double bfFalsePositive
Bloom filter false positive probability.
Definition: consumer.hpp:70
ReceiveHelloCallback onHelloData
Callback to give hello data back to application.
Definition: consumer.hpp:64
uint32_t bfCount
Number of expected elements (subscriptions) in Bloom filter.
Definition: consumer.hpp:68
ndn::time::milliseconds syncInterestLifetime
Lifetime of sync Interest.
Definition: consumer.hpp:74
ndn::time::milliseconds helloInterestLifetime
Lifetime of hello Interest.
Definition: consumer.hpp:72
UpdateCallback onUpdate
Callback to give sync data back to application.
Definition: consumer.hpp:66