Skip to content

Commit

Permalink
KAFKA-17917: Convert Kafka core system tests to use KRaft (apache#17847)
Browse files Browse the repository at this point in the history
- Remove some unused Zookeeper code

- Migrate group mode transactions, security rolling upgrade, and throttling tests to using KRaft

- Add KRaft downgrade tests to kraft_upgrade_test.py

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
kevin-wu24 authored Nov 21, 2024
1 parent 5fba067 commit 38aca3a
Show file tree
Hide file tree
Showing 22 changed files with 158 additions and 241 deletions.
12 changes: 2 additions & 10 deletions tests/kafkatest/tests/core/authorizer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from ducktape.tests.test import Test

from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.security.kafka_acls import ACLs

class AuthorizerTest(Test):
Expand Down Expand Up @@ -47,15 +46,8 @@ def setUp(self):
def test_authorizer(self, metadata_quorum, authorizer_class):
topics = {"test_topic": {"partitions": 1, "replication-factor": 1}}

if (authorizer_class == KafkaService.KRAFT_ACL_AUTHORIZER):
self.zk = None
else:
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()

self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
topics=topics, controller_num_nodes_override=1,
allow_zk_with_kraft=True)
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None,
topics=topics, controller_num_nodes_override=1)

broker_security_protocol = "SSL"
broker_principal = "User:CN=systemtest"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.kafka import config_property
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
Expand All @@ -33,10 +32,6 @@ def __init__(self, test_context):

def setUp(self):
self.topic = "test_topic"
self.zk = ZookeeperService(self.test_context, num_nodes=1) if quorum.for_test(self.test_context) == quorum.zk else None

if self.zk:
self.zk.start()

# Producer and consumer
self.producer_throughput = 10000
Expand Down Expand Up @@ -67,7 +62,7 @@ def setUp(self):
@matrix(producer_version=[str(LATEST_3_9)], consumer_version=[str(LATEST_3_9)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
def test_compatibility(self, producer_version, consumer_version, compression_types, timestamp_type=None, metadata_quorum=quorum.zk):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None, version=DEV_BRANCH, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
'configs': {"min.insync.replicas": 2}}},
Expand Down
20 changes: 7 additions & 13 deletions tests/kafkatest/tests/core/consume_bench_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
from kafkatest.services.trogdor.task_spec import TaskSpec
from kafkatest.services.trogdor.trogdor import TrogdorService
from kafkatest.services.zookeeper import ZookeeperService


class ConsumeBenchTest(Test):
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(ConsumeBenchTest, self).__init__(test_context)
self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
self.kafka = KafkaService(test_context, num_nodes=3, zk=None)
self.producer_workload_service = ProduceBenchWorkloadService(test_context, self.kafka)
self.consumer_workload_service = ConsumeBenchWorkloadService(test_context, self.kafka)
self.consumer_workload_service_2 = ConsumeBenchWorkloadService(test_context, self.kafka)
Expand All @@ -42,15 +40,11 @@ def __init__(self, test_context):

def setUp(self):
self.trogdor.start()
if self.zk:
self.zk.start()
self.kafka.start()

def teardown(self):
self.trogdor.stop()
self.kafka.stop()
if self.zk:
self.zk.stop()

def produce_messages(self, topics, max_messages=10000):
produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
Expand Down Expand Up @@ -85,7 +79,7 @@ def produce_messages(self, topics, max_messages=10000):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_consume_bench(self, topics, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_consume_bench(self, topics, metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
Runs a ConsumeBench workload to consume messages
"""
Expand Down Expand Up @@ -115,7 +109,7 @@ def test_consume_bench(self, topics, metadata_quorum=quorum.zk, use_new_coordina
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_single_partition(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_single_partition(self, metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
Run a ConsumeBench against a single partition
"""
Expand Down Expand Up @@ -146,7 +140,7 @@ def test_single_partition(self, metadata_quorum=quorum.zk, use_new_coordinator=F
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_multiple_consumers_random_group_topics(self, metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
Runs multiple consumers group to read messages from topics.
Since a consumerGroup isn't specified, each consumer should read from all topics independently
Expand Down Expand Up @@ -178,7 +172,7 @@ def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk,
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_two_consumers_specified_group_topics(self, metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
Runs two consumers in the same consumer group to read messages from topics.
Since a consumerGroup is specified, each consumer should dynamically get assigned a partition from group
Expand Down Expand Up @@ -211,7 +205,7 @@ def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk, u
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_multiple_consumers_random_group_partitions(self, metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
Runs multiple consumers in to read messages from specific partitions.
Since a consumerGroup isn't specified, each consumer will get assigned a random group
Expand Down Expand Up @@ -244,7 +238,7 @@ def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
Runs multiple consumers in the same group to read messages from specific partitions.
It is an invalid configuration to provide a consumer group and specific partitions.
Expand Down
15 changes: 4 additions & 11 deletions tests/kafkatest/tests/core/consumer_group_command_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from ducktape.mark import matrix
from ducktape.mark.resource import cluster

from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
Expand All @@ -40,23 +39,17 @@ class ConsumerGroupCommandTest(Test):

def __init__(self, test_context):
super(ConsumerGroupCommandTest, self).__init__(test_context)
self.num_zk = 1
self.num_brokers = 1
self.topics = {
TOPIC: {'partitions': 1, 'replication-factor': 1}
}
self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None

def setUp(self):
if self.zk:
self.zk.start()

def start_kafka(self, security_protocol, interbroker_security_protocol):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
self.zk, security_protocol=security_protocol,
None, security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
controller_num_nodes_override=self.num_zk)
controller_num_nodes_override=self.num_brokers)
self.kafka.start()

def start_consumer(self, group_protocol=None):
Expand Down Expand Up @@ -102,7 +95,7 @@ def setup_and_verify(self, security_protocol, group=None, group_protocol=None):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
"""
Tests if ConsumerGroupCommand is listing correct consumer groups
:return: None
Expand All @@ -121,7 +114,7 @@ def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quor
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_describe_consumer_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_describe_consumer_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
"""
Tests if ConsumerGroupCommand is describing a consumer group correctly
:return: None
Expand Down
17 changes: 2 additions & 15 deletions tests/kafkatest/tests/core/controller_mutation_quota_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,7 @@
from ducktape.mark import matrix
from ducktape.tests.test import Test

from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
from kafkatest.services.trogdor.task_spec import TaskSpec
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.trogdor.trogdor import TrogdorService
from kafkatest.services.zookeeper import ZookeeperService

import time


class ControllerMutationQuotaTest(Test):
"""Tests throttled partition changes via the kafka-topics CLI as follows:
Expand Down Expand Up @@ -54,31 +46,26 @@ class ControllerMutationQuotaTest(Test):
def __init__(self, test_context):
super(ControllerMutationQuotaTest, self).__init__(test_context=test_context)
self.test_context = test_context
self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
self.window_num = 10
self.window_size_seconds = 200 # must be long enough such that all CLI commands fit into it

self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None,
server_prop_overrides=[
["quota.window.num", "%s" % self.window_num],
["controller.quota.window.size.seconds", "%s" % self.window_size_seconds]
],
controller_num_nodes_override=1)

def setUp(self):
if self.zk:
self.zk.start()
self.kafka.start()

def teardown(self):
# Need to increase the timeout due to partition count
self.kafka.stop()
if self.zk:
self.zk.stop()

@cluster(num_nodes=2)
@matrix(metadata_quorum=quorum.all_kraft)
def test_controller_mutation_quota(self, metadata_quorum=quorum.zk):
def test_controller_mutation_quota(self, metadata_quorum):
self.partition_count = 10
mutation_rate = 3 * self.partition_count / (self.window_num * self.window_size_seconds)

Expand Down
11 changes: 2 additions & 9 deletions tests/kafkatest/tests/core/delegation_token_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import config_property, KafkaService, quorum
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.delegation_tokens import DelegationTokens
from kafkatest.services.verifiable_producer import VerifiableProducer
Expand All @@ -35,8 +34,7 @@ def __init__(self, test_context):

self.test_context = test_context
self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka",
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None,
topics={self.topic: {"partitions": 1, "replication-factor": 1}},
server_prop_overrides=[
[config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS, "604800000"],
Expand Down Expand Up @@ -66,11 +64,6 @@ def __init__(self, test_context):
self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'
self.kafka.interbroker_sasl_mechanism = 'GSSAPI'


def setUp(self):
if self.zk:
self.zk.start()

def tearDown(self):
self.producer.nodes[0].account.remove(self.jaas_deleg_conf_path)
self.consumer.nodes[0].account.remove(self.jaas_deleg_conf_path)
Expand Down Expand Up @@ -114,7 +107,7 @@ def renew_delegation_token(self):

@cluster(num_nodes=5)
@matrix(metadata_quorum=quorum.all_non_upgrade)
def test_delegation_token_lifecycle(self, metadata_quorum=quorum.zk):
def test_delegation_token_lifecycle(self, metadata_quorum):
self.kafka.start()
self.delegation_tokens = DelegationTokens(self.kafka, self.test_context)

Expand Down
8 changes: 2 additions & 6 deletions tests/kafkatest/tests/core/fetch_from_follower_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.monitor.jmx import JmxTool
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int

Expand All @@ -37,10 +36,9 @@ def __init__(self, test_context):
super(FetchFromFollowerTest, self).__init__(test_context=test_context)
self.jmx_tool = JmxTool(test_context, jmx_poll_ms=100)
self.topic = "test_topic"
self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context,
num_nodes=3,
zk=self.zk,
zk=None,
topics={
self.topic: {
"partitions": 1,
Expand All @@ -65,8 +63,6 @@ def min_cluster_size(self):
return super(FetchFromFollowerTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2

def setUp(self):
if self.zk:
self.zk.start()
self.kafka.start()

@cluster(num_nodes=9)
Expand All @@ -79,7 +75,7 @@ def setUp(self):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_consumer_preferred_read_replica(self, metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
This test starts up brokers with "broker.rack" and "replica.selector.class" configurations set. The replica
selector is set to the rack-aware implementation. One of the brokers has a different rack than the other two.
Expand Down
10 changes: 1 addition & 9 deletions tests/kafkatest/tests/core/get_offset_shell_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from ducktape.mark.resource import cluster

from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.console_consumer import ConsoleConsumer

Expand Down Expand Up @@ -51,7 +50,6 @@ class GetOffsetShellTest(Test):
"""
def __init__(self, test_context):
super(GetOffsetShellTest, self).__init__(test_context)
self.num_zk = 1
self.num_brokers = 1
self.messages_received_count = 0
self.topics = {
Expand All @@ -64,16 +62,10 @@ def __init__(self, test_context):
TOPIC_TEST_TOPIC_PARTITIONS2: {'partitions': 2, 'replication-factor': REPLICATION_FACTOR}
}

self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None

def setUp(self):
if self.zk:
self.zk.start()

def start_kafka(self, security_protocol, interbroker_security_protocol):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
self.zk, security_protocol=security_protocol,
None, security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
self.kafka.start()

Expand Down
Loading

0 comments on commit 38aca3a

Please sign in to comment.