图片解析应用
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

134 lines
5.0 KiB

from __future__ import absolute_import
import threading
import time
from kafka.errors import QuotaViolationError
from kafka.metrics import KafkaMetric
class Sensor(object):
"""
A sensor applies a continuous sequence of numerical values
to a set of associated metrics. For example a sensor on
message size would record a sequence of message sizes using
the `record(double)` api and would maintain a set
of metrics about request sizes such as the average or max.
"""
def __init__(self, registry, name, parents, config,
inactive_sensor_expiration_time_seconds):
if not name:
raise ValueError('name must be non-empty')
self._lock = threading.RLock()
self._registry = registry
self._name = name
self._parents = parents or []
self._metrics = []
self._stats = []
self._config = config
self._inactive_sensor_expiration_time_ms = (
inactive_sensor_expiration_time_seconds * 1000)
self._last_record_time = time.time() * 1000
self._check_forest(set())
def _check_forest(self, sensors):
"""Validate that this sensor doesn't end up referencing itself."""
if self in sensors:
raise ValueError('Circular dependency in sensors: %s is its own'
'parent.' % (self.name,))
sensors.add(self)
for parent in self._parents:
parent._check_forest(sensors)
@property
def name(self):
"""
The name this sensor is registered with.
This name will be unique among all registered sensors.
"""
return self._name
@property
def metrics(self):
return tuple(self._metrics)
def record(self, value=1.0, time_ms=None):
"""
Record a value at a known time.
Arguments:
value (double): The value we are recording
time_ms (int): A POSIX timestamp in milliseconds.
Default: The time when record() is evaluated (now)
Raises:
QuotaViolationException: if recording this value moves a
metric beyond its configured maximum or minimum bound
"""
if time_ms is None:
time_ms = time.time() * 1000
self._last_record_time = time_ms
with self._lock: # XXX high volume, might be performance issue
# increment all the stats
for stat in self._stats:
stat.record(self._config, value, time_ms)
self._check_quotas(time_ms)
for parent in self._parents:
parent.record(value, time_ms)
def _check_quotas(self, time_ms):
"""
Check if we have violated our quota for any metric that
has a configured quota
"""
for metric in self._metrics:
if metric.config and metric.config.quota:
value = metric.value(time_ms)
if not metric.config.quota.is_acceptable(value):
raise QuotaViolationError("'%s' violated quota. Actual: "
"%d, Threshold: %d" %
(metric.metric_name,
value,
metric.config.quota.bound))
def add_compound(self, compound_stat, config=None):
"""
Register a compound statistic with this sensor which
yields multiple measurable quantities (like a histogram)
Arguments:
stat (AbstractCompoundStat): The stat to register
config (MetricConfig): The configuration for this stat.
If None then the stat will use the default configuration
for this sensor.
"""
if not compound_stat:
raise ValueError('compound stat must be non-empty')
self._stats.append(compound_stat)
for named_measurable in compound_stat.stats():
metric = KafkaMetric(named_measurable.name, named_measurable.stat,
config or self._config)
self._registry.register_metric(metric)
self._metrics.append(metric)
def add(self, metric_name, stat, config=None):
"""
Register a metric with this sensor
Arguments:
metric_name (MetricName): The name of the metric
stat (AbstractMeasurableStat): The statistic to keep
config (MetricConfig): A special configuration for this metric.
If None use the sensor default configuration.
"""
with self._lock:
metric = KafkaMetric(metric_name, stat, config or self._config)
self._registry.register_metric(metric)
self._metrics.append(metric)
self._stats.append(stat)
def has_expired(self):
"""
Return True if the Sensor is eligible for removal due to inactivity.
"""
return ((time.time() * 1000 - self._last_record_time) >
self._inactive_sensor_expiration_time_ms)