Source code for pyndn.util.common

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2019 Regents of the University of California.
# Author: Jeff Thompson <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

"""
This module defines the Common class which has static utility functions.
"""

import sys
import datetime
import base64
from io import BytesIO
have_mmh3 = True
try:
    import mmh3
except ImportError:
    have_mmh3 = False

# _BytesIOValueIsStr is True if BytesIO.getvalue would return a str.
_BytesIOValueIsStr = type(BytesIO().getvalue()) is str

# _bytesElementIsInt if an element of a bytes is an int.
_bytesElementIsInt = type(bytes([0])[0]) is int

def _chr_ord(x):
    """
    This is a private utility function for getBytesIOString to return
    chr(ord(x))
    """
    return chr(ord(x))

# If an element of a bytes is an int, then we can convert simply with chr.
# Otherwise, converting with chr(ord(x)) seems to work on other versions of
#   Python 3.
_bytesElementToChr = chr if _bytesElementIsInt else _chr_ord

# This should only be true in Python 2.
_haveTypeUnicode = False
try:
    x = unicode
    _haveTypeUnicode = True
except:
    pass

[docs]class Common(object):
[docs] @staticmethod def getNowMilliseconds(): """ Get the current time in milliseconds. :return: The current time in milliseconds since 1/1/1970 UTC, including fractions of a millisecond. :rtype: float """ return (datetime.datetime.utcnow() - Common.epoch_).total_seconds() * 1000.0
[docs] @staticmethod def datetimeFromTimestamp(msSince1970): """ Get the Python datetime.datetime object for the UTC timestamp. We use this common implementation for consistency throughout the library which uses milliseconds. :param msSince1970: time in milliseconds since 1/1/1970 UTC, including fractions of a millisecond. :type msSince1970: float or int :return: The Python datetime.datetime object :rtype: datetime.datetime """ # Use timedelta because it works on platforms where time_t is 32 bits. return datetime.datetime.utcfromtimestamp(0) + datetime.timedelta( seconds = msSince1970 / 1000.0)
[docs] @staticmethod def getBytesIOString(bytesIO): """ Return bytesIO.getvalue(), making sure the result is a str. This is necessary because getvalue() returns a bytes object in Python 3. """ if _BytesIOValueIsStr: # We don't need to convert. return bytesIO.getvalue() else: # Assume value is a Python 3 bytes object. Convert to str. return "".join(map(_bytesElementToChr, bytesIO.getvalue()))
[docs] @staticmethod def typeIsString(obj): """ Check if obj has type str or (in Python 2) unicode. This is necessary because Python 2 has two string types, str and unicode, but Python 3 doesn't have type unicode so we have to be careful to check for type(obj) is unicode. :param any obj: The object to check if it is str or unicode. :return: True if obj is str or unicode, otherwise false :rtype: bool """ return type(obj) is str or _haveTypeUnicode and type(obj) is unicode
[docs] @staticmethod def stringToUtf8Array(input): """ If the input has type str (in Python 3) or unicode (in Python 2), then encode it as UTF8 and return an array of integers. If the input is str (in Python 2) then treat it as a "raw" string and just convert each element to int. Otherwise, if the input is not str or unicode, just return the input. This is necessary because in Python 3 doesn't have the unicode type and the elements in a string a Unicode characters. But in Python 2 only the unicode type has Unicode characters, and str elements are bytes with value 0 to 255 (and often used to carry binary data). """ if _haveTypeUnicode: # Assume this is Python 2. if type(input) is str: # Convert the raw string to an int array. return map(ord, input) elif type(input) is unicode: # In Python 2, the result of encode is a str, so convert to int array. return map(ord, input.encode('utf-8')) else: return input else: if type(input) is str: return input.encode('utf-8') else: return input
[docs] @staticmethod def nonNegativeFloatOrNone(value): """ If value is None or negative, return None, otherwise return float(value). This is used in "setter" methods to ensure the correct type. :param float value: The value to check. :return: value or None. :rtype: float """ return None if value == None or value < 0 else float(value)
[docs] @staticmethod def nonNegativeIntOrNone(value): """ If value is None or negative, return None, otherwise return int(value). This is used in "setter" methods to ensure the correct type. :param int value: The value to check. :return: value or None. :rtype: int """ return None if value == None or value < 0 else int(value)
[docs] @staticmethod def base64Encode(input, addNewlines = False): """ Encode the input as base64. :param input: The bytes to encode. :type input: A byte array suitable for base64.b64encode (str in Python 2, bytes in Python 3). :param bool addNewlines: (optional) If True, add newlines to the encoding (good for writing to a file). If omitted or False, do not add newlines. :return: The encoding. :rtype: str """ base64Str = base64.b64encode(input) if not type(base64Str) is str: base64Str = "".join(map(chr, base64Str)) if not addNewlines: return base64Str result = "" i = 0 while i < len(base64Str): result += base64Str[i:i + 64] + "\n" i += 64 return result
[docs] @staticmethod def murmurHash3Uint32(nHashSeed, value): """ Interpret the unsigned integer value as a 4-byte little endian array for MurmurHash3. :param int nHashSeed: The hash seed. :param int value: The integer value, interpreted as a 4-byte array. :return: The hash value. :rtype: int """ if not have_mmh3: raise RuntimeError( "murmurHash3Uint32: Need to 'sudo python -m pip install mmh3'") buffer = [ value & 0xff, (value >> 8) & 0xff, (value >> 16) & 0xff, (value >> 24) & 0xff] # Imitate Blob.toBytes. if sys.version_info[0] > 2: valueBytes = bytes(buffer) else: valueBytes = "".join(map(chr, buffer)) return mmh3.hash(valueBytes, nHashSeed, signed = False)
[docs] @staticmethod def murmurHash3Blob(nHashSeed, value): """ Interpret the value as a Blob and use it to compute the MurmurHash3. :param int nHashSeed: The hash seed. :param Blob value: If value is not already a Blob, use Blob(value) :return: The hash value. :rtype: int """ if not have_mmh3: raise RuntimeError( "murmurHash3Blob: Need to 'sudo python -m pip install mmh3'") if not isinstance(value, Blob): value = Blob(value, False) return mmh3.hash(value.toBytes(), nHashSeed, signed = False)
""" The practical limit of the size of a network-layer packet. If a packet is larger than this, the library or application MAY drop it. This constant is defined in this low-level header file so that internal code can use it, but applications should use the static API method Face.getMaxNdnPacketSize() which is equivalent. """ MAX_NDN_PACKET_SIZE = 8800 epoch_ = datetime.datetime.utcfromtimestamp(0)
# Import this at the end of the file to avoid circular references. from pyndn.util.blob import Blob