# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
This module defines the NDN Data class.
"""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from pyndn.encoding.wire_format import WireFormat
from pyndn.util.blob import Blob
from pyndn.util.signed_blob import SignedBlob
from pyndn.util.change_counter import ChangeCounter
from pyndn.name import Name
from pyndn.meta_info import MetaInfo
from pyndn.sha256_with_rsa_signature import Sha256WithRsaSignature
from pyndn.lp.incoming_face_id import IncomingFaceId
from pyndn.lp.congestion_mark import CongestionMark
[docs]class Data(object):
def __init__(self, value = None):
if isinstance(value, Data):
# Copy the values.
self._name = ChangeCounter(Name(value.getName()))
self._metaInfo = ChangeCounter(MetaInfo(value.getMetaInfo()))
self._signature = ChangeCounter(value.getSignature().clone())
self._content = value._content
self._defaultWireEncoding = value.getDefaultWireEncoding()
self._defaultFullName = Name(value._defaultFullName)
self._defaultWireEncodingFormat = value._defaultWireEncodingFormat
else:
self._name = ChangeCounter(Name(value) if isinstance(value, Name)
else Name())
self._metaInfo = ChangeCounter(MetaInfo())
self._signature = ChangeCounter(Sha256WithRsaSignature())
self._content = Blob()
self._defaultWireEncoding = SignedBlob()
self._defaultFullName = Name()
self._defaultWireEncodingFormat = None
self._getDefaultWireEncodingChangeCount = 0
self._changeCount = 0
self._lpPacket = None
[docs] def wireEncode(self, wireFormat = None):
"""
Encode this Data for a particular wire format. If wireFormat is the
default wire format, also set the defaultWireEncoding field to the
encoded result.
:param wireFormat: (optional) A WireFormat object used to encode this
Data object. If omitted, use WireFormat.getDefaultWireFormat().
:type wireFormat: A subclass of WireFormat
:return: The encoded buffer in a SignedBlob object.
:rtype: SignedBlob
"""
if wireFormat == None:
# Don't use a default argument since getDefaultWireFormat can change.
wireFormat = WireFormat.getDefaultWireFormat()
if (not self.getDefaultWireEncoding().isNull() and
self.getDefaultWireEncodingFormat() == wireFormat):
# We already have an encoding in the desired format.
return self.getDefaultWireEncoding()
(encoding, signedPortionBeginOffset, signedPortionEndOffset) = \
wireFormat.encodeData(self)
wireEncoding = SignedBlob(
encoding, signedPortionBeginOffset, signedPortionEndOffset)
if wireFormat == WireFormat.getDefaultWireFormat():
# This is the default wire encoding.
self._setDefaultWireEncoding(
wireEncoding, WireFormat.getDefaultWireFormat())
return wireEncoding
[docs] def wireDecode(self, input, wireFormat = None):
"""
Decode the input using a particular wire format and update this Data.
If wireFormat is the default wire format, also set the
defaultWireEncoding to another pointer to the input.
:param input: The array with the bytes to decode. If input is not a
Blob, then copy the bytes to save the defaultWireEncoding (otherwise
take another pointer to the same Blob).
:type input: A Blob or an array type with int elements
:param wireFormat: (optional) A WireFormat object used to decode this
Data object. If omitted, use WireFormat.getDefaultWireFormat().
:type wireFormat: A subclass of WireFormat
"""
if wireFormat == None:
# Don't use a default argument since getDefaultWireFormat can change.
wireFormat = WireFormat.getDefaultWireFormat()
if isinstance(input, Blob):
# Input is a blob, so get its buf() and set copy False.
result = wireFormat.decodeData(self, input.buf(), False)
else:
result = wireFormat.decodeData(self, input, True)
(signedPortionBeginOffset, signedPortionEndOffset) = result
if wireFormat == WireFormat.getDefaultWireFormat():
# This is the default wire encoding. In the Blob constructor, set
# copy true, but if input is already a Blob, it won't copy.
self._setDefaultWireEncoding(SignedBlob(
Blob(input, True),
signedPortionBeginOffset, signedPortionEndOffset),
WireFormat.getDefaultWireFormat())
else:
self._setDefaultWireEncoding(SignedBlob(), None)
[docs] def getName(self):
"""
Get the data packet's name.
:return: The name.
:rtype: Name
"""
return self._name.get()
[docs] def getSignature(self):
"""
Get the data packet's signature object.
:return: The signature object.
:rtype: a subclass of Signature such as Sha256WithRsaSignature
"""
return self._signature.get()
[docs] def getContent(self):
"""
Get the data packet's content.
:return: The content as a Blob, which isNull() if unspecified.
:rtype: Blob
"""
return self._content
[docs] def getDefaultWireEncoding(self):
"""
Return the default wire encoding, which was encoded with
getDefaultWireEncodingFormat().
:return: The default wire encoding, whose isNull() may be true if there
is no default wire encoding.
:rtype: SignedBlob
"""
if self._getDefaultWireEncodingChangeCount != self.getChangeCount():
# The values have changed, so the default wire encoding is
# invalidated.
self._defaultWireEncoding = SignedBlob()
self._defaultWireEncodingFormat = None
self._getDefaultWireEncodingChangeCount = self.getChangeCount()
return self._defaultWireEncoding
[docs] def getIncomingFaceId(self):
"""
Get the incoming face ID according to the incoming packet header.
:return: The incoming face ID. If not specified, return None.
:rtype: int
"""
field = (None if self._lpPacket == None
else IncomingFaceId.getFirstHeader(self._lpPacket))
return None if field == None else field.getFaceId()
[docs] def getCongestionMark(self):
"""
Get the congestion mark according to the incoming packet header.
:return: The congestion mark. If not specified, return 0.
:rtype: int
"""
field = (None if self._lpPacket == None
else CongestionMark.getFirstHeader(self._lpPacket))
return 0 if field == None else field.getCongestionMark()
[docs] def getFullName(self, wireFormat = None):
"""
Get the Data packet's full name, which includes the final
ImplicitSha256Digest component based on the wire encoding for a
particular wire format.
:param wireFormat: (optional) A WireFormat object used to encode
this object. If omitted, use WireFormat.getDefaultWireFormat().
:type wireFormat: A subclass of WireFormat
:return: The full name. You must not change the Name object - if you
need to change it then make a copy.
:rtype: Name
"""
if wireFormat == None:
# Don't use a default argument since getDefaultWireFormat can change.
wireFormat = WireFormat.getDefaultWireFormat()
# The default full name depends on the default wire encoding.
if (not self.getDefaultWireEncoding().isNull() and
self._defaultFullName.size() > 0 and
self.getDefaultWireEncodingFormat() == wireFormat):
# We already have a full name. A non-null default wire encoding
# means that the Data packet fields have not changed.
return self._defaultFullName
fullName = Name(self.getName())
sha256 = hashes.Hash(hashes.SHA256(), backend=default_backend())
sha256.update(self.wireEncode(wireFormat).toBytes())
fullName.appendImplicitSha256Digest(
Blob(bytearray(sha256.finalize()), False))
if wireFormat == WireFormat.getDefaultWireFormat():
# wireEncode has already set defaultWireEncodingFormat_.
self._defaultFullName = fullName
return fullName
[docs] def setName(self, name):
"""
Set name to a copy of the given Name.
:param Name name: The Name which is copied.
:return: This Data so that you can chain calls to update values.
:rtype: Data
"""
self._name.set(name if isinstance(name, Name) else Name(name))
self._changeCount += 1
return self
[docs] def setSignature(self, signature):
"""
Set the signature to a copy of the given signature.
:param signature: The signature object which is cloned.
:type signature: a subclass of Signature such as Sha256WithRsaSignature
:return: This Data so that you can chain calls to update values.
:rtype: Data
"""
self._signature.set(Sha256WithRsaSignature() if signature == None
else signature.clone())
self._changeCount += 1
return self
[docs] def setContent(self, content):
"""
Set the content to the given value.
:param content: The array with the content bytes. If content is not a
Blob, then create a new Blob to copy the bytes (otherwise
take another pointer to the same Blob).
:type content: A Blob or an array type with int elements
"""
self._content = content if isinstance(content, Blob) else Blob(content)
self._changeCount += 1
[docs] def setLpPacket(self, lpPacket):
"""
An internal library method to set the LpPacket for an incoming packet.
The application should not call this.
:param LpPacket lpPacket: The LpPacket. This does not make a copy.
:return: This Data so that you can chain calls to update values.
:rtype: Data
:note: This is an experimental feature. This API may change in the future.
"""
self._lpPacket = lpPacket
# Don't update _changeCount since this doesn't affect the wire encoding.
return self
[docs] def getChangeCount(self):
"""
Get the change count, which is incremented each time this object
(or a child object) is changed.
:return: The change count.
:rtype: int
"""
# Make sure each of the checkChanged is called.
changed = self._name.checkChanged()
changed = self._metaInfo.checkChanged() or changed
changed = self._signature.checkChanged() or changed
if changed:
# A child object has changed, so update the change count.
self._changeCount += 1
return self._changeCount
def _setDefaultWireEncoding(
self, defaultWireEncoding, defaultWireEncodingFormat):
self._defaultWireEncoding = defaultWireEncoding
self._defaultWireEncodingFormat = defaultWireEncodingFormat
# Set _getDefaultWireEncodingChangeCount so that the next call to
# getDefaultWireEncoding() won't clear _defaultWireEncoding.
self._getDefaultWireEncodingChangeCount = self.getChangeCount()
# Create managed properties for read/write properties of the class for more pythonic syntax.
name = property(getName, setName)
metaInfo = property(getMetaInfo, setMetaInfo)
signature = property(getSignature, setSignature)
content = property(getContent, setContent)