Skip to content

Commit

Permalink
Move get_consumer_offsets out of kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
iliakur committed Dec 4, 2024
1 parent bc5ff1e commit c26ad0a
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 87 deletions.
71 changes: 0 additions & 71 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, KafkaException, TopicPartition
from confluent_kafka.admin import AdminClient

from datadog_checks.kafka_consumer.constants import OFFSET_INVALID


class KafkaClient:
def __init__(self, config, log) -> None:
self.config = config
self.log = log
self._kafka_client = None
self._consumer = None
self.topic_partition_cache = {}

@property
def kafka_client(self):
Expand Down Expand Up @@ -133,46 +130,6 @@ def list_consumer_groups(self):
self.log.error("Failed to collect consumer groups: %s", e)
return groups

def get_consumer_offsets(self):
# {(consumer_group, topic, partition): offset}
self.log.debug('Getting consumer offsets')
consumer_offsets = {}

consumer_groups = self._get_consumer_groups()
self.log.debug('Identified %s consumer groups', len(consumer_groups))

offsets = self._get_offsets_for_groups(consumer_groups)
self.log.debug('%s futures to be waited on', len(offsets))

for consumer_group, topic_partitions in offsets:

self.log.debug('RESULT CONSUMER GROUP: %s', consumer_group)

for topic, partition, offset in topic_partitions:
self.log.debug('RESULTS TOPIC: %s', topic)
self.log.debug('RESULTS PARTITION: %s', partition)
self.log.debug('RESULTS OFFSET: %s', offset)

if offset == OFFSET_INVALID:
continue

if self.config._monitor_unlisted_consumer_groups or not self.config._consumer_groups_compiled_regex:
consumer_offsets[(consumer_group, topic, partition)] = offset
else:
to_match = f"{consumer_group},{topic},{partition}"
if self.config._consumer_groups_compiled_regex.match(to_match):
consumer_offsets[(consumer_group, topic, partition)] = offset

self.log.debug('Got %s consumer offsets', len(consumer_offsets))
return consumer_offsets

def _get_consumer_groups(self):
# Get all consumer groups to monitor
if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex:
return [grp for grp in self.list_consumer_groups() if grp]
else:
return self.config._consumer_groups

def list_consumer_group_offsets(self, groups):
"""
For every group and (optionally) its topics and partitions retrieve consumer offsets.
Expand Down Expand Up @@ -227,31 +184,3 @@ def describe_consumer_groups(self, consumer_group):

def close_admin_client(self):
self._kafka_client = None

def _get_offsets_for_groups(self, consumer_groups):
groups = []

# If either monitoring all consumer groups or regex, return all consumer group offsets (can filter later)
if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex:
for consumer_group in consumer_groups:
groups.append((consumer_group, None))
return self.list_consumer_group_offsets(groups)

for consumer_group in consumer_groups:
# If topics are specified
topics = consumer_groups.get(consumer_group)
if not topics:
groups.append((consumer_group, None))
continue

for topic, partitions in topics.items():
if not partitions:
if topic in self.topic_partition_cache:
partitions = self.topic_partition_cache[topic]
else:
partitions = self.topic_partition_cache[topic] = self.get_partitions_for_topic(topic)
topic_partitions = [(topic, p) for p in partitions]

groups.append((consumer_group, topic_partitions))

return self.list_consumer_group_offsets(groups)
75 changes: 72 additions & 3 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datadog_checks.base import AgentCheck, is_affirmative
from datadog_checks.kafka_consumer.client import KafkaClient
from datadog_checks.kafka_consumer.config import KafkaConfig
from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS
from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS, OFFSET_INVALID

MAX_TIMESTAMPS = 1000

Expand All @@ -23,6 +23,7 @@ def __init__(self, name, init_config, instances):
self._data_streams_enabled = is_affirmative(self.instance.get('data_streams_enabled', False))
self._max_timestamps = int(self.instance.get('timestamp_history_size', MAX_TIMESTAMPS))
self.client = KafkaClient(self.config, self.log)
self.topic_partition_cache = {}
self.check_initializations.insert(0, self.config.validate_config)

def check(self, _):
Expand All @@ -41,7 +42,7 @@ def check(self, _):
try:
# Fetch consumer offsets
# Expected format: {(consumer_group, topic, partition): offset}
consumer_offsets = self.client.get_consumer_offsets()
consumer_offsets = self.get_consumer_offsets()
except Exception:
self.log.exception("There was a problem collecting consumer offsets from Kafka.")
# don't raise because we might get valid broker offsets
Expand Down Expand Up @@ -96,6 +97,74 @@ def check(self, _):
if self.config._close_admin_client:
self.client.close_admin_client()

def get_consumer_offsets(self):
# {(consumer_group, topic, partition): offset}
self.log.debug('Getting consumer offsets')
consumer_offsets = {}

consumer_groups = self._get_consumer_groups()
self.log.debug('Identified %s consumer groups', len(consumer_groups))

offsets = self._get_offsets_for_groups(consumer_groups)
self.log.debug('%s futures to be waited on', len(offsets))

for consumer_group, topic_partitions in offsets:

self.log.debug('RESULT CONSUMER GROUP: %s', consumer_group)

for topic, partition, offset in topic_partitions:
self.log.debug('RESULTS TOPIC: %s', topic)
self.log.debug('RESULTS PARTITION: %s', partition)
self.log.debug('RESULTS OFFSET: %s', offset)

if offset == OFFSET_INVALID:
continue

if self.config._monitor_unlisted_consumer_groups or not self.config._consumer_groups_compiled_regex:
consumer_offsets[(consumer_group, topic, partition)] = offset
else:
to_match = f"{consumer_group},{topic},{partition}"
if self.config._consumer_groups_compiled_regex.match(to_match):
consumer_offsets[(consumer_group, topic, partition)] = offset

self.log.debug('Got %s consumer offsets', len(consumer_offsets))
return consumer_offsets

def _get_consumer_groups(self):
# Get all consumer groups to monitor
if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex:
return [grp for grp in self.client.list_consumer_groups() if grp]
else:
return self.config._consumer_groups

def _get_offsets_for_groups(self, consumer_groups):
groups = []

# If either monitoring all consumer groups or regex, return all consumer group offsets (can filter later)
if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex:
for consumer_group in consumer_groups:
groups.append((consumer_group, None))
return self.client.list_consumer_group_offsets(groups)

for consumer_group in consumer_groups:
# If topics are specified
topics = consumer_groups.get(consumer_group)
if not topics:
groups.append((consumer_group, None))
continue

for topic, partitions in topics.items():
if not partitions:
if topic in self.topic_partition_cache:
partitions = self.topic_partition_cache[topic]
else:
partitions = self.topic_partition_cache[topic] = self.client.get_partitions_for_topic(topic)
topic_partitions = [(topic, p) for p in partitions]

groups.append((consumer_group, topic_partitions))

return self.client.list_consumer_group_offsets(groups)

def _load_broker_timestamps(self, persistent_cache_key):
"""Loads broker timestamps from persistent cache."""
broker_timestamps = defaultdict(dict)
Expand Down Expand Up @@ -208,7 +277,7 @@ def report_consumer_offsets_and_lag(
self.gauge('estimated_consumer_lag', lag, tags=consumer_group_tags)
reported_contexts += 1
else:
if partitions is None:
if not partitions:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions "
"in the cluster, so skipping reporting these offsets."
Expand Down
23 changes: 10 additions & 13 deletions kafka_consumer/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ def fake_consumer_offsets_for_times(partitions):
def seed_mock_client():
"""Set some common defaults for the mock client to kafka."""
client = mock.create_autospec(KafkaClient)
# consumer_offset = {(consumer_group, topic, partition): offset}
consumer_offset = {("consumer_group1", "topic1", "partition1"): 2}
client.get_consumer_offsets.return_value = consumer_offset
client.list_consumer_groups.return_value = ["consumer_group1"]
client.get_partitions_for_topic.return_value = ['partition1']
client.list_consumer_group_offsets.return_value = [("consumer_group1", [("topic1", "partition1", 2)])]
client.describe_consumer_groups.return_value = ('consumer_group', 'STABLE')
client.consumer_get_cluster_id_and_list_topics.return_value = (
"cluster_id",
Expand Down Expand Up @@ -184,7 +183,7 @@ def test_when_consumer_lag_less_than_zero_then_emit_event(check, kafka_instance,
# Given
mock_client = seed_mock_client()
# We need the consumer offset to be higher than the highwater offset.
mock_client.get_consumer_offsets.return_value = {("consumer_group1", "topic1", "partition1"): 81}
mock_client.list_consumer_group_offsets.return_value = [("consumer_group1", [("topic1", "partition1", 81)])]
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.client = mock_client

Expand Down Expand Up @@ -238,12 +237,12 @@ def test_when_consumer_lag_less_than_zero_then_emit_event(check, kafka_instance,
)


def test_when_partition_is_none_then_emit_warning_log(check, kafka_instance, dd_run_check, aggregator, caplog):
def test_when_no_partitions_then_emit_warning_log(check, kafka_instance, dd_run_check, aggregator, caplog):
# Given
caplog.set_level(logging.WARNING)

mock_client = seed_mock_client()
mock_client.get_partitions_for_topic.return_value = None
mock_client.get_partitions_for_topic.return_value = []
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.client = mock_client

Expand Down Expand Up @@ -321,9 +320,6 @@ def test_when_highwater_metric_count_hit_context_limit_then_no_more_highwater_me
caplog.set_level(logging.WARNING)

mock_client = seed_mock_client()
# consumer_offset = {(consumer_group, topic, partition): offset}
consumer_offset = {("consumer_group1", "topic1", "partition1"): 2}
mock_client.get_consumer_offsets.return_value = consumer_offset
kafka_consumer_check = check(kafka_instance, init_config={'max_partition_contexts': 2})
kafka_consumer_check.client = mock_client

Expand All @@ -347,9 +343,10 @@ def test_when_consumer_metric_count_hit_context_limit_then_no_more_consumer_metr
caplog.set_level(logging.DEBUG)

mock_client = seed_mock_client()
# consumer_offset = {(consumer_group, topic, partition): offset}
consumer_offset = {("consumer_group1", "topic1", "partition1"): 2, ("consumer_group1", "topic2", "partition2"): 2}
mock_client.get_consumer_offsets.return_value = consumer_offset
mock_client.list_consumer_group_offsets.return_value = [
("consumer_group1", [("topic1", "partition1", 2)]),
("consumer_group1", [("topic2", "partition2", 2)]),
]
kafka_consumer_check = check(kafka_instance, init_config={'max_partition_contexts': 3})
kafka_consumer_check.client = mock_client

Expand All @@ -375,7 +372,7 @@ def test_when_empty_string_consumer_group_then_skip(kafka_instance):
return_value=["", "my_consumer"],
):
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
assert kafka_consumer_check.client._get_consumer_groups() == ["my_consumer"]
assert kafka_consumer_check._get_consumer_groups() == ["my_consumer"]


def test_get_interpolated_timestamp():
Expand Down

0 comments on commit c26ad0a

Please sign in to comment.