Source code for pyndn.data

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

"""
This module defines the NDN Data class.
"""

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from pyndn.encoding.wire_format import WireFormat
from pyndn.util.blob import Blob
from pyndn.util.signed_blob import SignedBlob
from pyndn.util.change_counter import ChangeCounter
from pyndn.name import Name
from pyndn.meta_info import MetaInfo
from pyndn.sha256_with_rsa_signature import Sha256WithRsaSignature
from pyndn.lp.incoming_face_id import IncomingFaceId
from pyndn.lp.congestion_mark import CongestionMark

[docs]class Data(object): def __init__(self, value = None): if isinstance(value, Data): # Copy the values. self._name = ChangeCounter(Name(value.getName())) self._metaInfo = ChangeCounter(MetaInfo(value.getMetaInfo())) self._signature = ChangeCounter(value.getSignature().clone()) self._content = value._content self._defaultWireEncoding = value.getDefaultWireEncoding() self._defaultFullName = Name(value._defaultFullName) self._defaultWireEncodingFormat = value._defaultWireEncodingFormat else: self._name = ChangeCounter(Name(value) if isinstance(value, Name) else Name()) self._metaInfo = ChangeCounter(MetaInfo()) self._signature = ChangeCounter(Sha256WithRsaSignature()) self._content = Blob() self._defaultWireEncoding = SignedBlob() self._defaultFullName = Name() self._defaultWireEncodingFormat = None self._getDefaultWireEncodingChangeCount = 0 self._changeCount = 0 self._lpPacket = None
[docs] def wireEncode(self, wireFormat = None): """ Encode this Data for a particular wire format. If wireFormat is the default wire format, also set the defaultWireEncoding field to the encoded result. :param wireFormat: (optional) A WireFormat object used to encode this Data object. If omitted, use WireFormat.getDefaultWireFormat(). :type wireFormat: A subclass of WireFormat :return: The encoded buffer in a SignedBlob object. :rtype: SignedBlob """ if wireFormat == None: # Don't use a default argument since getDefaultWireFormat can change. wireFormat = WireFormat.getDefaultWireFormat() if (not self.getDefaultWireEncoding().isNull() and self.getDefaultWireEncodingFormat() == wireFormat): # We already have an encoding in the desired format. return self.getDefaultWireEncoding() (encoding, signedPortionBeginOffset, signedPortionEndOffset) = \ wireFormat.encodeData(self) wireEncoding = SignedBlob( encoding, signedPortionBeginOffset, signedPortionEndOffset) if wireFormat == WireFormat.getDefaultWireFormat(): # This is the default wire encoding. self._setDefaultWireEncoding( wireEncoding, WireFormat.getDefaultWireFormat()) return wireEncoding
[docs] def wireDecode(self, input, wireFormat = None): """ Decode the input using a particular wire format and update this Data. If wireFormat is the default wire format, also set the defaultWireEncoding to another pointer to the input. :param input: The array with the bytes to decode. If input is not a Blob, then copy the bytes to save the defaultWireEncoding (otherwise take another pointer to the same Blob). :type input: A Blob or an array type with int elements :param wireFormat: (optional) A WireFormat object used to decode this Data object. If omitted, use WireFormat.getDefaultWireFormat(). :type wireFormat: A subclass of WireFormat """ if wireFormat == None: # Don't use a default argument since getDefaultWireFormat can change. wireFormat = WireFormat.getDefaultWireFormat() if isinstance(input, Blob): # Input is a blob, so get its buf() and set copy False. result = wireFormat.decodeData(self, input.buf(), False) else: result = wireFormat.decodeData(self, input, True) (signedPortionBeginOffset, signedPortionEndOffset) = result if wireFormat == WireFormat.getDefaultWireFormat(): # This is the default wire encoding. In the Blob constructor, set # copy true, but if input is already a Blob, it won't copy. self._setDefaultWireEncoding(SignedBlob( Blob(input, True), signedPortionBeginOffset, signedPortionEndOffset), WireFormat.getDefaultWireFormat()) else: self._setDefaultWireEncoding(SignedBlob(), None)
[docs] def getName(self): """ Get the data packet's name. :return: The name. :rtype: Name """ return self._name.get()
[docs] def getMetaInfo(self): """ Get the data packet's meta info. :return: The meta info. :rtype: MetaInfo """ return self._metaInfo.get()
[docs] def getSignature(self): """ Get the data packet's signature object. :return: The signature object. :rtype: a subclass of Signature such as Sha256WithRsaSignature """ return self._signature.get()
[docs] def getContent(self): """ Get the data packet's content. :return: The content as a Blob, which isNull() if unspecified. :rtype: Blob """ return self._content
[docs] def getDefaultWireEncoding(self): """ Return the default wire encoding, which was encoded with getDefaultWireEncodingFormat(). :return: The default wire encoding, whose isNull() may be true if there is no default wire encoding. :rtype: SignedBlob """ if self._getDefaultWireEncodingChangeCount != self.getChangeCount(): # The values have changed, so the default wire encoding is # invalidated. self._defaultWireEncoding = SignedBlob() self._defaultWireEncodingFormat = None self._getDefaultWireEncodingChangeCount = self.getChangeCount() return self._defaultWireEncoding
[docs] def getDefaultWireEncodingFormat(self): """ Get the WireFormat which is used by getDefaultWireEncoding(). :return: The WireFormat, which is only meaningful if the getDefaultWireEncoding() is not isNull(). :rtype: WireFormat """ return self._defaultWireEncodingFormat
[docs] def getIncomingFaceId(self): """ Get the incoming face ID according to the incoming packet header. :return: The incoming face ID. If not specified, return None. :rtype: int """ field = (None if self._lpPacket == None else IncomingFaceId.getFirstHeader(self._lpPacket)) return None if field == None else field.getFaceId()
[docs] def getCongestionMark(self): """ Get the congestion mark according to the incoming packet header. :return: The congestion mark. If not specified, return 0. :rtype: int """ field = (None if self._lpPacket == None else CongestionMark.getFirstHeader(self._lpPacket)) return 0 if field == None else field.getCongestionMark()
[docs] def getFullName(self, wireFormat = None): """ Get the Data packet's full name, which includes the final ImplicitSha256Digest component based on the wire encoding for a particular wire format. :param wireFormat: (optional) A WireFormat object used to encode this object. If omitted, use WireFormat.getDefaultWireFormat(). :type wireFormat: A subclass of WireFormat :return: The full name. You must not change the Name object - if you need to change it then make a copy. :rtype: Name """ if wireFormat == None: # Don't use a default argument since getDefaultWireFormat can change. wireFormat = WireFormat.getDefaultWireFormat() # The default full name depends on the default wire encoding. if (not self.getDefaultWireEncoding().isNull() and self._defaultFullName.size() > 0 and self.getDefaultWireEncodingFormat() == wireFormat): # We already have a full name. A non-null default wire encoding # means that the Data packet fields have not changed. return self._defaultFullName fullName = Name(self.getName()) sha256 = hashes.Hash(hashes.SHA256(), backend=default_backend()) sha256.update(self.wireEncode(wireFormat).toBytes()) fullName.appendImplicitSha256Digest( Blob(bytearray(sha256.finalize()), False)) if wireFormat == WireFormat.getDefaultWireFormat(): # wireEncode has already set defaultWireEncodingFormat_. self._defaultFullName = fullName return fullName
[docs] def setName(self, name): """ Set name to a copy of the given Name. :param Name name: The Name which is copied. :return: This Data so that you can chain calls to update values. :rtype: Data """ self._name.set(name if isinstance(name, Name) else Name(name)) self._changeCount += 1 return self
[docs] def setMetaInfo(self, metaInfo): """ Set metaInfo to a copy of the given MetaInfo. :param MetaInfo metaInfo: The MetaInfo which is copied. :return: This Data so that you can chain calls to update values. :rtype: Data """ self._metaInfo.set(MetaInfo() if metaInfo == None else MetaInfo(metaInfo)) self._changeCount += 1 return self
[docs] def setSignature(self, signature): """ Set the signature to a copy of the given signature. :param signature: The signature object which is cloned. :type signature: a subclass of Signature such as Sha256WithRsaSignature :return: This Data so that you can chain calls to update values. :rtype: Data """ self._signature.set(Sha256WithRsaSignature() if signature == None else signature.clone()) self._changeCount += 1 return self
[docs] def setContent(self, content): """ Set the content to the given value. :param content: The array with the content bytes. If content is not a Blob, then create a new Blob to copy the bytes (otherwise take another pointer to the same Blob). :type content: A Blob or an array type with int elements """ self._content = content if isinstance(content, Blob) else Blob(content) self._changeCount += 1
[docs] def setLpPacket(self, lpPacket): """ An internal library method to set the LpPacket for an incoming packet. The application should not call this. :param LpPacket lpPacket: The LpPacket. This does not make a copy. :return: This Data so that you can chain calls to update values. :rtype: Data :note: This is an experimental feature. This API may change in the future. """ self._lpPacket = lpPacket # Don't update _changeCount since this doesn't affect the wire encoding. return self
[docs] def getChangeCount(self): """ Get the change count, which is incremented each time this object (or a child object) is changed. :return: The change count. :rtype: int """ # Make sure each of the checkChanged is called. changed = self._name.checkChanged() changed = self._metaInfo.checkChanged() or changed changed = self._signature.checkChanged() or changed if changed: # A child object has changed, so update the change count. self._changeCount += 1 return self._changeCount
def _setDefaultWireEncoding( self, defaultWireEncoding, defaultWireEncodingFormat): self._defaultWireEncoding = defaultWireEncoding self._defaultWireEncodingFormat = defaultWireEncodingFormat # Set _getDefaultWireEncodingChangeCount so that the next call to # getDefaultWireEncoding() won't clear _defaultWireEncoding. self._getDefaultWireEncodingChangeCount = self.getChangeCount() # Create managed properties for read/write properties of the class for more pythonic syntax. name = property(getName, setName) metaInfo = property(getMetaInfo, setMetaInfo) signature = property(getSignature, setSignature) content = property(getContent, setContent)