Skip to content

Commit

Permalink
Merge pull request #289 from nais/transaction-canary
Browse files Browse the repository at this point in the history
Transaction canary
  • Loading branch information
sindrerh2 authored Nov 25, 2024
2 parents 044c93f + dda0760 commit 8d3d3ba
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 58 deletions.
4 changes: 4 additions & 0 deletions canary-deployer/canary.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ spec:
value: 1s
- name: CANARY_KAFKA_TOPIC
value: "{{ canary_kafka_topic }}"
- name: CANARY_KAFKA_TRANSACTION_TOPIC
value: "{{ canary_kafka_transaction_topic }}"
- name: CANARY_ENABLE_TRANSACTION
value: "{{ canary_enable_transaction }}"
- name: CANARY_KAFKA_GROUP_ID
value: "{{ groupid }}"
- name: CANARY_LOG_FORMAT
Expand Down
37 changes: 20 additions & 17 deletions canary-deployer/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def pretend_run_process(cmd, logger):

async def _execute_deploy(cluster, resource_name, vars_file_name, settings, logger):
cmd = [
"/app/deploy",
"/app/deploy", # This is nais/deploy
"--cluster", cluster,
"--resource", resource_name,
"--vars", vars_file_name,
Expand Down Expand Up @@ -76,24 +76,27 @@ async def deploy_canary(config: DeployConfig, settings: Settings):
logger.info("Completed deploying canary to %s", config.canary_cluster)


# TODO: ADD TX TOPIC etc
async def deploy_topic(config: DeployConfig, settings: Settings):
topic_name = f"kafka-canary-{config.canary_cluster}"
topics = [f"kafka-canary-{config.canary_cluster}", f"kafka-canary-tx-{config.canary_cluster}"]

logger = logging.getLogger(f"deploy-topic-{config.canary_cluster}")
logger.info("Deploying topic %s to %s", topic_name, config.topic_cluster)
with tempfile.NamedTemporaryFile("w", prefix=f"topic-vars-{config.canary_cluster}", suffix=".yaml") as vars_file:
data = {
"team": settings.team,
"pool": config.pool,
"topic_name": topic_name,
}
json.dump(data, vars_file)
vars_file.flush()
logger.debug(json.dumps(data, indent=2))
try:
await _execute_deploy(config.topic_cluster, "/canary/topic.yaml", vars_file.name, settings, logger)
except CalledProcessError:
raise RuntimeError(f"Error when deploying topic {topic_name} to {config.topic_cluster}") from None
logger.info("Completed deploying topic %s to %s", topic_name, config.topic_cluster)
logger.info("Deploying topics %s and %s to %s", topic_name, tx_topic_name, config.topic_cluster)

for topic in topics:
with tempfile.NamedTemporaryFile("w", prefix=f"topic-vars-{config.canary_cluster}-{topic}", suffix=".yaml") as vars_file:
data = {
"team": settings.team,
"pool": config.pool,
"topic_name": topic
}
json.dump(data, vars_file)
vars_file.flush()
logger.debug(json.dumps(data, indent=2))
try:
await _execute_deploy(config.topic_cluster, "/canary/topic.yaml", vars_file.name, settings, logger)
except CalledProcessError:
raise RuntimeError(f"Error when deploying topic {topic} to {config.topic_cluster}") from None


def generate_topic_cluster_lookup_for_nav(data):
Expand Down
106 changes: 95 additions & 11 deletions cmd/canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,27 @@ const (

// Configuration options
const (
KafkaBrokers = "kafka-brokers"
KafkaCAPath = "kafka-ca-path"
KafkaCertificatePath = "kafka-certificate-path"
KafkaGroupID = "kafka-group-id"
KafkaKeyPath = "kafka-key-path"
KafkaTopic = "kafka-topic"
DeployStartTime = "deploy-start-time"
LogFormat = "log-format"
MessageInterval = "message-interval"
MetricsAddress = "metrics-address"
SlowConsumer = "slow-consumer"
KafkaBrokers = "kafka-brokers"
KafkaCAPath = "kafka-ca-path"
KafkaCertificatePath = "kafka-certificate-path"
KafkaGroupID = "kafka-group-id"
KafkaKeyPath = "kafka-key-path"
KafkaTopic = "kafka-topic"
DeployStartTime = "deploy-start-time"
LogFormat = "log-format"
MessageInterval = "message-interval"
MetricsAddress = "metrics-address"
SlowConsumer = "slow-consumer"
KafkaTransactionTopic = "kafka-transaction-topic"
KafkaTransactionEnable = "enable-transaction"

// TODO: kafta-transaction-canary bits
// topic + enable,
// consumer isolation level,
// producer init-transaction,
// producer transaction id,
// consumer commit-offsets,
// begin-tx, abort-tx, commit-tx flow
)

const (
Expand Down Expand Up @@ -100,12 +110,32 @@ var (
Buckets: prometheus.LinearBuckets(0.01, 0.01, 100),
})

ProduceTxLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "produce_tx_latency",
Namespace: Namespace,
Help: "latency in message production",
Buckets: prometheus.LinearBuckets(0.01, 0.01, 100),
})

ConsumeLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "consume_latency",
Namespace: Namespace,
Help: "latency in message consumption",
Buckets: prometheus.LinearBuckets(0.01, 0.01, 100),
})

TransactionTxLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "transaction_latency",
Namespace: Namespace,
Help: "latency in transactional message consumption",
Buckets: prometheus.LinearBuckets(0.01, 0.01, 100),
})

TransactedNumbers = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "transacted_messages_total",
Namespace: Namespace,
Help: "transacted messages, transcations happen in units of 100 messages in the canary",
})
)

func init() {
Expand All @@ -127,6 +157,9 @@ func init() {
hostname, _ := os.Hostname()
flag.StringSlice(KafkaBrokers, []string{"localhost:9092"}, "Broker addresses for Kafka support")
flag.String(KafkaTopic, "kafkarator-canary", "Topic where Kafkarator canary messages are produced")
// can we use the same topic for this??
flag.String(KafkaTransactionTopic, "kafkarator-transaction-canary", "Topic where Kafkarator canary messages are transcated between")
flag.String(KafkaTransactionEnable, "kafkarator-canary-transactions-enable", "Enable transactions canarying")
flag.String(KafkaGroupID, hostname, "Kafka group ID for storing consumed message positions")
flag.String(KafkaCertificatePath, "kafka.crt", "Path to Kafka client certificate")
flag.String(KafkaKeyPath, "kafka.key", "Path to Kafka client key")
Expand Down Expand Up @@ -156,6 +189,8 @@ func init() {
LeadTime,
ProduceLatency,
StartTimestamp,
TransactedNumbers,
TransactionTxLatency,
)
}

Expand All @@ -180,6 +215,7 @@ func main() {

signals := make(chan os.Signal, 1)
cons := make(chan canarykafka.Message, 32)
consTx := make(chan canarykafka.Message, 32)

logger := log.New()
logfmt, err := formatter(viper.GetString(LogFormat))
Expand Down Expand Up @@ -223,6 +259,20 @@ func main() {

logger.Infof("Started message producer.")

// --> Transaction <--
// produce to kafkaTopic, consume from kafkatopic and put on kafkaTransactionTopic
txCallback := canarykafka.NewCallback(false, consTx)
err = consumer.New(ctx, cancel, consumer.Config{
Brokers: viper.GetStringSlice(KafkaBrokers),
GroupID: viper.GetString(KafkaGroupID),
MaxProcessingTime: time.Second * 1,
RetryInterval: time.Second * 10,
Topic: viper.GetString(KafkaTransactionTopic),
Callback: txCallback.Callback, // May or may not need to have special here
Logger: logger,
TlsConfig: tlsConfig,
})

callback := canarykafka.NewCallback(viper.GetBool(SlowConsumer), cons)
err = consumer.New(ctx, cancel, consumer.Config{
Brokers: viper.GetStringSlice(KafkaBrokers),
Expand Down Expand Up @@ -268,9 +318,38 @@ func main() {
}
}

produceTx := func(ctx context.Context) {
if ctx.Err() != nil {
return
}
timer := time.Now()
var messages []kafka.Message
for i := 0; i < 100; i++ {
messages = append(messages, kafka.Message(timer.Format(time.RFC3339Nano)))
}
partition, offset, err := prod.ProduceTx(messages)
ProduceTxLatency.Observe(time.Now().Sub(timer).Seconds())
if err == nil {
message := canarykafka.Message{
Offset: offset,
TimeStamp: timer,
Partition: partition,
}
logger.Infof("Produced message: %s", message.String())
TransactionTxLatency.Observe(time.Now().Sub(timer).Seconds())
TransactedNumbers.Set(float64(offset))
} else {
logger.Errorf("unable to produce canary message on Kafka: %s", err)
if kafka.IsErrUnauthorized(err) {
cancel()
}
}
}

logger.Infof("Ready.")

produceTicker := time.NewTicker(viper.GetDuration(MessageInterval))
produceTxTicker := time.NewTicker(viper.GetDuration(MessageInterval) + time.Second*10)

for ctx.Err() == nil {
select {
Expand All @@ -281,6 +360,11 @@ func main() {
LastConsumedTimestamp.SetToCurrentTime()
ConsumeLatency.Observe(time.Now().Sub(msg.TimeStamp).Seconds())
LastConsumedOffset.Set(float64(msg.Offset))
case <-produceTxTicker.C:
produceTx(ctx)
case msg := <-consTx:
TransactionTxLatency.Observe(time.Now().Sub(msg.TimeStamp).Seconds())
TransactedNumbers.Set(float64(msg.Offset))
case sig := <-signals:
logger.Infof("exiting due to signal: %s", strings.ToUpper(sig.String()))
cancel()
Expand Down
68 changes: 38 additions & 30 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,45 @@

inputs.nixpkgs.url = "nixpkgs/nixos-unstable";

outputs = {nixpkgs, ...}: let
goOverlay = final: prev: {
go = prev.go.overrideAttrs (old: rec {
version = "1.23.0";
src = prev.fetchurl {
url = "https://go.dev/dl/go${version}.src.tar.gz";
hash = "sha256-Qreo6A2AXaoDAi7T/eQyHUw78smQoUQWXQHu7Nb2mcY=";
outputs = { nixpkgs, ... }:
let
goOverlay = final: prev: {
go = prev.go.overrideAttrs (old: rec {
version = "1.23.0";
src = prev.fetchurl {
url = "https://go.dev/dl/go${version}.src.tar.gz";
hash = "sha256-Qreo6A2AXaoDAi7T/eQyHUw78smQoUQWXQHu7Nb2mcY=";
};
});
};
# helpers
withSystem = nixpkgs.lib.genAttrs [
"x86_64-linux"
"x86_64-darwin"
"aarch64-linux"
"aarch64-darwin"
];
withPkgs = f:
withSystem (system:
f (import nixpkgs {
inherit system;
overlays = [ goOverlay ];
}));
in {
devShells = withPkgs (pkgs: {
default = pkgs.mkShell {
buildInputs = with pkgs; [
gnumake
go
golangci-lint-langserver
# go-dlv # TODO: Add
gopls
python3
python312Packages.python-lsp-server
black
];
};
});
formatter = withPkgs (pkgs: pkgs.nixfmt-rfc-style);
};
# helpers
withSystem = nixpkgs.lib.genAttrs ["x86_64-linux" "x86_64-darwin" "aarch64-linux" "aarch64-darwin"];
withPkgs = f:
withSystem (system:
f (import nixpkgs {
inherit system;
overlays = [goOverlay];
}));
in {
devShells = withPkgs (pkgs: {
default = pkgs.mkShell {
buildInputs = with pkgs; [
gnumake
go
golangci-lint-langserver
# go-dlv # TODO: Add
gopls
python3
];
};
});
formatter = withPkgs (pkgs: pkgs.nixfmt-rfc-style);
};
}
33 changes: 33 additions & 0 deletions pkg/kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ type Producer struct {

type Interface interface {
Produce(msg kafka.Message) (partition int32, offset int64, err error)
ProduceTx(msg []kafka.Message) (partition int32, offset int64, err error)
}

func New(brokers []string, topic string, tlsConfig *tls.Config, logger *log.Logger) (*Producer, error) {
config := sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Version = sarama.V3_1_0_0
// V ????
config.Producer.Transaction.ID = "canary"
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Idempotent = true
config.ClientID, _ = os.Hostname()
sarama.Logger = logger

Expand All @@ -49,3 +53,32 @@ func (p *Producer) Produce(msg kafka.Message) (int32, int64, error) {
}
return p.producer.SendMessage(producerMessage)
}

func (p *Producer) Close() error {
return p.producer.Close()
}

func (p *Producer) ProduceTx(msg []kafka.Message) (int32, int64, error) {
err := p.producer.BeginTxn()
if err != nil {
return 0, 0, err
}

var par int32
var off int64
for _, m := range msg {
par, off, err = p.Produce(m)
if err != nil {
err = p.producer.AbortTxn()
if err != nil {
return 0, 0, err
}
}
}

err = p.producer.CommitTxn()
if err != nil {
return 0, 0, err
}
return par, off, nil
}

0 comments on commit 8d3d3ba

Please sign in to comment.