producer-base.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2019, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  **/
19 
20 #include "PSync/producer-base.hpp"
21 
22 #include <ndn-cxx/util/logger.hpp>
23 #include <boost/algorithm/string.hpp>
24 
25 #include <cstring>
26 #include <limits>
27 #include <functional>
28 
29 namespace psync {
30 
31 NDN_LOG_INIT(psync.ProducerBase);
32 
33 ProducerBase::ProducerBase(size_t expectedNumEntries,
34  ndn::Face& face,
35  const ndn::Name& syncPrefix,
36  const ndn::Name& userPrefix,
37  ndn::time::milliseconds syncReplyFreshness,
38  ndn::time::milliseconds helloReplyFreshness)
39  : m_iblt(expectedNumEntries)
40  , m_expectedNumEntries(expectedNumEntries)
41  , m_threshold(expectedNumEntries/2)
42  , m_face(face)
43  , m_scheduler(m_face.getIoService())
44  , m_syncPrefix(syncPrefix)
45  , m_userPrefix(userPrefix)
46  , m_syncReplyFreshness(syncReplyFreshness)
47  , m_helloReplyFreshness(helloReplyFreshness)
48  , m_segmentPublisher(m_face, m_keyChain)
49  , m_rng(ndn::random::getRandomNumberEngine())
50 {
51  addUserNode(userPrefix);
52 }
53 
54 bool
55 ProducerBase::addUserNode(const ndn::Name& prefix)
56 {
57  if (m_prefixes.find(prefix) == m_prefixes.end()) {
58  m_prefixes[prefix] = 0;
59  return true;
60  }
61  else {
62  return false;
63  }
64 }
65 
66 void
67 ProducerBase::removeUserNode(const ndn::Name& prefix)
68 {
69  auto it = m_prefixes.find(prefix);
70  if (it != m_prefixes.end()) {
71  uint64_t seqNo = it->second;
72  m_prefixes.erase(it);
73 
74  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seqNo);
75  auto hashIt = m_prefix2hash.find(prefixWithSeq);
76  if (hashIt != m_prefix2hash.end()) {
77  uint32_t hash = hashIt->second;
78  m_prefix2hash.erase(hashIt);
79  m_hash2prefix.erase(hash);
80  m_iblt.erase(hash);
81  }
82  }
83 }
84 
85 void
86 ProducerBase::updateSeqNo(const ndn::Name& prefix, uint64_t seq)
87 {
88  NDN_LOG_DEBUG("UpdateSeq: " << prefix << " " << seq);
89 
90  uint64_t oldSeq;
91  auto it = m_prefixes.find(prefix);
92  if (it != m_prefixes.end()) {
93  oldSeq = it->second;
94  }
95  else {
96  NDN_LOG_WARN("Prefix not found in m_prefixes");
97  return;
98  }
99 
100  if (oldSeq >= seq) {
101  NDN_LOG_WARN("Update has lower/equal seq no for prefix, doing nothing!");
102  return;
103  }
104 
105  // Delete the last sequence prefix from the iblt
106  // Because we don't insert zeroth prefix in IBF so no need to delete that
107  if (oldSeq != 0) {
108  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(oldSeq);
109  auto hashIt = m_prefix2hash.find(prefixWithSeq);
110  if (hashIt != m_prefix2hash.end()) {
111  uint32_t hash = hashIt->second;
112  m_prefix2hash.erase(hashIt);
113  m_hash2prefix.erase(hash);
114  m_iblt.erase(hash);
115  }
116  }
117 
118  // Insert the new seq no
119  it->second = seq;
120  ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seq);
121  uint32_t newHash = murmurHash3(N_HASHCHECK, prefixWithSeq.toUri());
122  m_prefix2hash[prefixWithSeq] = newHash;
123  m_hash2prefix[newHash] = prefix;
124  m_iblt.insert(newHash);
125 }
126 
127 void
128 ProducerBase::sendApplicationNack(const ndn::Name& name)
129 {
130  NDN_LOG_DEBUG("Sending application nack");
131  ndn::Name dataName(name);
132  m_iblt.appendToName(dataName);
133 
134  dataName.appendSegment(0);
135  ndn::Data data(dataName);
136  data.setFreshnessPeriod(m_syncReplyFreshness);
137  data.setContentType(ndn::tlv::ContentType_Nack);
138  data.setFinalBlock(dataName[-1]);
139  m_keyChain.sign(data);
140  m_face.put(data);
141 }
142 
143 void
144 ProducerBase::onRegisterFailed(const ndn::Name& prefix, const std::string& msg) const
145 {
146  NDN_LOG_ERROR("ProduerBase::onRegisterFailed " << prefix << " " << msg);
147  BOOST_THROW_EXCEPTION(Error(msg));
148 }
149 
150 } // namespace psync
void onRegisterFailed(const ndn::Name &prefix, const std::string &msg) const
Logs a message if setting an interest filter fails.
void sendApplicationNack(const ndn::Name &name)
Sends a data packet with content type nack.
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
uint32_t murmurHash3(uint32_t nHashSeed, const std::vector< unsigned char > &vDataToHash)
Definition: util.cpp:37
const size_t N_HASHCHECK
bool addUserNode(const ndn::Name &prefix)
Adds a user node for synchronization.
void removeUserNode(const ndn::Name &prefix)
Remove the user node from synchronization.
void erase(uint32_t key)
Definition: iblt.cpp:132
void insert(uint32_t key)
Definition: iblt.cpp:126
ndn::KeyChain m_keyChain
ProducerBase(size_t expectedNumEntries, ndn::Face &face, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, ndn::time::milliseconds helloReplyFreshness=HELLO_REPLY_FRESHNESS)
constructor
std::map< uint32_t, ndn::Name > m_hash2prefix
std::map< ndn::Name, uint32_t > m_prefix2hash
ndn::time::milliseconds m_syncReplyFreshness
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:229
Base class for PartialProducer and FullProducer.
std::map< ndn::Name, uint64_t > m_prefixes