Skip to content

Commit

Permalink
KAFKA-18081 Remove isKRaftTest from the kraft-only tests (apache#17934)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
Yunyung authored Nov 27, 2024
1 parent 3710add commit 434fe7c
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,14 +504,10 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head
instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))

if (isKRaftTest()) {
recreateBrokers(startup = true)
assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException])
// Normally the exception is thrown as part of the TearDown method of the parent class(es). We would like to not do this.
faultHandler.setIgnore(true)
} else {
assertThrows(classOf[ConfigException], () => recreateBrokers(startup = true))
}
recreateBrokers(startup = true)
assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException])
// Normally the exception is thrown as part of the TearDown method of the parent class(es). We would like to not do this.
faultHandler.setIgnore(true)
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import kafka.utils.TestInfoUtils
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.server.config.ReplicationConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Expand All @@ -32,13 +31,6 @@ import scala.jdk.CollectionConverters._

class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTest {

override protected def brokerPropertyOverrides(properties: Properties): Unit = {
// legacy message formats are only supported with IBP < 3.0
// KRaft mode is not supported for inter.broker.protocol.version = 2.8, The minimum version required is 3.0-IV1"
if (!isKRaftTest())
properties.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8")
}

@nowarn("cat=deprecation")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
Expand Down Expand Up @@ -91,22 +83,16 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
assertEquals(20, timestampTopic1P1.timestamp)
assertEquals(Optional.of(0), timestampTopic1P1.leaderEpoch)

if (!isKRaftTest()) {
assertNull(timestampOffsets.get(new TopicPartition(topic2, 0)), "null should be returned when message format is 0.9.0")
assertNull(timestampOffsets.get(new TopicPartition(topic2, 1)), "null should be returned when message format is 0.9.0")
}
else {
// legacy message formats are supported for IBP version < 3.0 and KRaft runs on minimum version 3.0-IV1
val timestampTopic2P0 = timestampOffsets.get(new TopicPartition(topic2, 0))
assertEquals(40, timestampTopic2P0.offset)
assertEquals(40, timestampTopic2P0.timestamp)
assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)

val timestampTopic2P1 = timestampOffsets.get(new TopicPartition(topic2, 1))
assertEquals(60, timestampTopic2P1.offset)
assertEquals(60, timestampTopic2P1.timestamp)
assertEquals(Optional.of(0), timestampTopic2P1.leaderEpoch)
}
// legacy message formats are supported for IBP version < 3.0 and KRaft runs on minimum version 3.0-IV1
val timestampTopic2P0 = timestampOffsets.get(new TopicPartition(topic2, 0))
assertEquals(40, timestampTopic2P0.offset)
assertEquals(40, timestampTopic2P0.timestamp)
assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)

val timestampTopic2P1 = timestampOffsets.get(new TopicPartition(topic2, 1))
assertEquals(60, timestampTopic2P1.offset)
assertEquals(60, timestampTopic2P1.timestamp)
assertEquals(Optional.of(0), timestampTopic2P1.leaderEpoch)

val timestampTopic3P0 = timestampOffsets.get(new TopicPartition(topic3, 0))
assertEquals(80, timestampTopic3P0.offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ import javax.management.ObjectName
import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
import kafka.log.UnifiedLog
import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
import kafka.security.JaasTestUtils
import kafka.utils._
import kafka.utils.Implicits._
import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
Expand All @@ -52,12 +50,10 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.provider.FileConfigProvider
import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataRequestData
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetric, MetricsContext, MetricsReporter, Quota}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.MetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
Expand All @@ -69,7 +65,7 @@ import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.kafka.test.TestSslUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -333,14 +329,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertFalse(reporter.kafkaMetrics.isEmpty, "No metrics found")
}

if (!isKRaftTest()) {
// fetch from ZK, values should be unresolved
val props = fetchBrokerConfigsFromZooKeeper(servers.head)
assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK")
assertTrue(props.getProperty(configPrefix + SSL_TRUSTSTORE_TYPE_CONFIG) == SslTruststoreTypeVal, "store type is not updated in ZK")
assertTrue(props.getProperty(configPrefix + SSL_KEYSTORE_PASSWORD_CONFIG) == SslKeystorePasswordVal, "keystore password is not updated in ZK")
}

// verify the update
// 1. verify update not occurring if the value of property is same.
alterConfigsUsingConfigCommand(updatedProps)
Expand Down Expand Up @@ -459,23 +447,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifyProduceConsume(producer, consumer, 10, topic)
}

def verifyBrokerToControllerCall(controller: KafkaServer): Unit = {
val nonControllerBroker = servers.find(_.config.brokerId != controller.config.brokerId).get
val brokerToControllerManager = nonControllerBroker.clientToControllerChannelManager
val completionHandler = new TestControllerRequestCompletionHandler()
brokerToControllerManager.sendRequest(new MetadataRequest.Builder(new MetadataRequestData()), completionHandler)
TestUtils.waitUntilTrue(() => {
completionHandler.completed.get() || completionHandler.timedOut.get()
}, "Timed out while waiting for broker to controller API call")
// we do not expect a timeout from broker to controller request
assertFalse(completionHandler.timedOut.get(), "broker to controller request is timeout")
assertTrue(completionHandler.actualResponse.isDefined, "No response recorded even though request is completed")
val response = completionHandler.actualResponse.get
assertNull(response.authenticationException(), s"Request failed due to authentication error ${response.authenticationException}")
assertNull(response.versionMismatch(), s"Request failed due to unsupported version error ${response.versionMismatch}")
assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
}

val group_id = new AtomicInteger(1)
def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}"

Expand Down Expand Up @@ -518,18 +489,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, next_group_name())
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))

if (!isKRaftTest()) {
val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
val controllerChannelManager = controller.kafkaController.controllerChannelManager
val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
brokerStateInfo(0).networkClient.disconnect("0")
TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)

// validate that the brokerToController request works fine
verifyBrokerToControllerCall(controller)
}
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
Expand Down
18 changes: 2 additions & 16 deletions core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.apache.kafka.server.common.AdminOperationException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -116,21 +115,8 @@ class AddPartitionsTest extends BaseRequestTest {
admin.createPartitions(Collections.singletonMap(topic1,
NewPartitions.increaseTo(3, singletonList(asList(0, 1, 2))))).all().get()).getCause
assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
if (isKRaftTest()) {
assertTrue(cause.getMessage.contains("Attempted to add 2 additional partition(s), but only 1 assignment(s) " +
"were specified."), "Unexpected error message: " + cause.getMessage)
} else {
assertTrue(cause.getMessage.contains("Increasing the number of partitions by 2 but 1 assignments provided."),
"Unexpected error message: " + cause.getMessage)
}
if (!isKRaftTest()) {
// In ZK mode, test the raw AdminZkClient method as well.
val e = assertThrows(classOf[AdminOperationException], () => adminZkClient.addPartitions(
topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
assertTrue(e.getMessage.contains("Unexpected existing replica assignment for topic 'new-topic5', partition " +
"id 0 is missing"))
}
assertTrue(cause.getMessage.contains("Attempted to add 2 additional partition(s), but only 1 assignment(s) " +
"were specified."), "Unexpected error message: " + cause.getMessage)
}

@ParameterizedTest
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
@ValueSource(strings = Array("kraft"))
def testSessionExpireListenerMetrics(quorum: String): Unit = {
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
val expectedNumMetrics = if (isKRaftTest()) 0 else 1
val expectedNumMetrics = 0
assertEquals(expectedNumMetrics, metrics.keySet.asScala.
count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=SessionState"))
assertEquals(expectedNumMetrics, metrics.keySet.asScala.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
}
}

if (!isKRaftTest()) {
// Verify controller broker has the correct metadata
verifyMetadata(controllerSocketServer)
}
if (!request.data.validateOnly) {
// Wait until metadata is propagated and validate non-controller broker has the correct metadata
TestUtils.waitForPartitionMetadata(brokers, topic.name(), 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,9 @@ abstract class AbstractMetadataRequestTest extends BaseRequestTest {
}

protected def checkAutoCreatedTopic(autoCreatedTopic: String, response: MetadataResponse): Unit = {
if (isKRaftTest()) {
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors.get(autoCreatedTopic))
for (i <- 0 until brokers.head.config.numPartitions) {
TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
}
} else {
assertEquals(Errors.LEADER_NOT_AVAILABLE, response.errors.get(autoCreatedTopic))
assertEquals(Some(brokers.head.config.numPartitions), zkClient.getTopicPartitionCount(autoCreatedTopic))
for (i <- 0 until brokers.head.config.numPartitions) {
TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
}
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors.get(autoCreatedTopic))
for (i <- 0 until brokers.head.config.numPartitions) {
TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,24 +387,17 @@ class ControllerMutationQuotaTest extends BaseRequestTest {

private def waitUserQuota(user: String, expectedQuota: Double): Unit = {
val quotaManager = brokers.head.quotaManagers.controllerMutation
val controllerQuotaManager =
if (isKRaftTest()) Option(controllerServers.head.quotaManagers.controllerMutation)
else Option.empty
val controllerQuotaManager = controllerServers.head.quotaManagers.controllerMutation
var actualQuota = Double.MinValue

TestUtils.waitUntilTrue(() => {
actualQuota = quotaManager.quota(user, "").bound()
if (controllerQuotaManager.isDefined)
expectedQuota == actualQuota && expectedQuota == controllerQuotaManager.get.quota(user, "").bound()
else
expectedQuota == actualQuota
expectedQuota == actualQuota && expectedQuota == controllerQuotaManager.quota(user, "").bound()
}, s"Quota of $user is not $expectedQuota but $actualQuota")
}

private def quotaMetric(user: String): Option[KafkaMetric] = {
val metrics =
if (isKRaftTest()) controllerServers.head.metrics
else brokers.head.metrics
val metrics = controllerServers.head.metrics
val metricName = metrics.metricName(
"tokens",
QuotaType.CONTROLLER_MUTATION.toString,
Expand Down Expand Up @@ -449,7 +442,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
connectAndReceive[AlterClientQuotasResponse](
request,
destination = controllerSocketServer,
if (isKRaftTest()) ListenerName.normalised("CONTROLLER") else listenerName
ListenerName.normalised("CONTROLLER")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,31 +104,19 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS,
Some("Topic 'existing-topic' already exists."))))

var errorMsg = if (isKRaftTest()) {
"Unable to replicate the partition 4 time(s): The target replication factor of 4 cannot be reached because only 3 broker(s) are registered."
} else {
"Replication factor: 4 larger than available brokers: 3."
}
var errorMsg = "Unable to replicate the partition 4 time(s): The target replication factor of 4 cannot be reached because only 3 broker(s) are registered."
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly = true),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some(errorMsg))))

errorMsg = if (isKRaftTest()) {
"Replication factor must be larger than 0, or -1 to use the default value."
} else {
"Replication factor must be larger than 0."
}
errorMsg = "Replication factor must be larger than 0, or -1 to use the default value."
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication2",
numPartitions = 10, replicationFactor = -2)), validateOnly = true),
Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some(errorMsg))))

errorMsg = if (isKRaftTest()) {
"Number of partitions was set to an invalid non-positive value."
} else {
"Number of partitions must be larger than 0."
}
errorMsg = "Number of partitions was set to an invalid non-positive value."
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions",
numPartitions = -2, replicationFactor = 1)), validateOnly = true),
Map("error-partitions" -> error(Errors.INVALID_PARTITIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
connectAndReceive[DeleteTopicsResponse](
request,
controllerSocketServer,
if (isKRaftTest()) ListenerName.normalised("CONTROLLER") else listenerName
ListenerName.normalised("CONTROLLER")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ class DescribeClusterRequestTest extends BaseRequestTest {
.setRack(server.config.rack.orNull)
}.toSet

var expectedControllerId = 0
if (!isKRaftTest()) {
// in KRaft mode DescribeClusterRequest will return a random broker id as the controllerId (KIP-590)
expectedControllerId = servers.filter(_.kafkaController.isActive).last.config.brokerId
}
val expectedClusterId = brokers.last.clusterId

val expectedClusterAuthorizedOperations = if (includeClusterAuthorizedOperations) {
Expand All @@ -92,11 +87,7 @@ class DescribeClusterRequestTest extends BaseRequestTest {
.build(version.toShort)
val describeClusterResponse = sentDescribeClusterRequest(describeClusterRequest)

if (isKRaftTest()) {
assertTrue(0 to brokerCount contains describeClusterResponse.data.controllerId)
} else {
assertEquals(expectedControllerId, describeClusterResponse.data.controllerId)
}
assertTrue(0 to brokerCount contains describeClusterResponse.data.controllerId)
assertEquals(expectedClusterId, describeClusterResponse.data.clusterId)
assertEquals(expectedClusterAuthorizedOperations, describeClusterResponse.data.clusterAuthorizedOperations)
assertEquals(expectedBrokers, describeClusterResponse.data.brokers.asScala.toSet)
Expand Down
Loading

0 comments on commit 434fe7c

Please sign in to comment.