You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
from __future__ import absolute_import
from kafka.vendor import six
from .base import Partitioner
class Murmur2Partitioner(Partitioner): """
Implements a partitioner which selects the target partition based on the hash of the key. Attempts to apply the same hashing function as mainline java client. """
def __call__(self, key, partitions=None, available=None): if available: return self.partition(key, available) return self.partition(key, partitions)
def partition(self, key, partitions=None): if not partitions: partitions = self.partitions
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69 idx = (murmur2(key) & 0x7fffffff) % len(partitions)
return partitions[idx]
class LegacyPartitioner(object): """DEPRECATED -- See Issue 374
Implements a partitioner which selects the target partition based on the hash of the key """
def __init__(self, partitions): self.partitions = partitions
def partition(self, key, partitions=None): if not partitions: partitions = self.partitions size = len(partitions) idx = hash(key) % size
return partitions[idx]
# Default will change to Murmur2 in 0.10 release HashedPartitioner = LegacyPartitioner
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 def murmur2(data): """Pure-python Murmur2 implementation.
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
Args: data (bytes): opaque bytes
Returns: MurmurHash2 of data """
# Python2 bytes is really a str, causing the bitwise operations below to fail # so convert to bytearray. if six.PY2: data = bytearray(bytes(data))
length = len(data) seed = 0x9747b28c # 'm' and 'r' are mixing constants generated offline. # They're not really 'magic', they just happen to work well. m = 0x5bd1e995 r = 24
# Initialize the hash to a random value h = seed ^ length length4 = length // 4
for i in range(length4): i4 = i * 4 k = ((data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24)) k &= 0xffffffff k *= m k &= 0xffffffff k ^= (k % 0x100000000) >> r # k ^= k >>> r k &= 0xffffffff k *= m k &= 0xffffffff
h *= m h &= 0xffffffff h ^= k h &= 0xffffffff
# Handle the last few bytes of the input array extra_bytes = length % 4 if extra_bytes >= 3: h ^= (data[(length & ~3) + 2] & 0xff) << 16 h &= 0xffffffff if extra_bytes >= 2: h ^= (data[(length & ~3) + 1] & 0xff) << 8 h &= 0xffffffff if extra_bytes >= 1: h ^= (data[length & ~3] & 0xff) h &= 0xffffffff h *= m h &= 0xffffffff
h ^= (h % 0x100000000) >> 13 # h >>> 13; h &= 0xffffffff h *= m h &= 0xffffffff h ^= (h % 0x100000000) >> 15 # h >>> 15; h &= 0xffffffff
return h
|