Skip to content

Commit

Permalink
feat(kafka): sasl auth
Browse files Browse the repository at this point in the history
  • Loading branch information
kjubybot committed Dec 5, 2024
1 parent c79f13e commit a17e124
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 2 deletions.
47 changes: 46 additions & 1 deletion pkg/output/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"strings"

"github.com/IBM/sarama"
Expand Down Expand Up @@ -34,6 +35,16 @@ var (
PartitionStrategyRandom PartitionStrategy = "random"
)

type SASLMechanism string

var (
SASLTypeOAuth SASLMechanism = "OAUTHBEARER"
SASLTypePlaintext SASLMechanism = "PLAIN"
SASLTypeSCRAMSHA256 SASLMechanism = "SCRAM-SHA-256"
SASLTypeSCRAMSHA512 SASLMechanism = "SCRAM-SHA-512"
SASLTypeGSSAPI SASLMechanism = "GSSAPI"
)

func NewSyncProducer(config *Config) (sarama.SyncProducer, error) {
producerConfig, err := Init(config)
if err != nil {
Expand All @@ -50,8 +61,10 @@ func Init(config *Config) (*sarama.Config, error) {
c.Producer.Flush.Frequency = config.FlushFrequency
c.Producer.Retry.Max = config.MaxRetries
c.Producer.Return.Successes = true
c.Net.TLS.Enable = config.TLS
c.Metadata.Full = false

if config.TLSClientConfig.CertificatePath != "" {
if config.TLSClientConfig != nil {
clientCertificate, err := tls.LoadX509KeyPair(config.TLSClientConfig.CertificatePath, config.TLSClientConfig.KeyPath)
if err != nil {
return nil, fmt.Errorf("failed to read client certificate: %w", err)
Expand All @@ -70,6 +83,38 @@ func Init(config *Config) (*sarama.Config, error) {
c.Net.TLS.Config = tlsConfig
}

if config.SASLConfig != nil {
var password string
if config.SASLConfig.Password != "" {
password = config.SASLConfig.Password
} else if config.SASLConfig.PasswordFile != "" {
passwordFile, err := os.ReadFile(config.SASLConfig.PasswordFile)
if err != nil {
return nil, fmt.Errorf("failed to read client password: %w", err)
}

password = strings.TrimSpace(string(passwordFile))
}

c.Net.SASL.Enable = true
c.Net.SASL.Version = config.SASLConfig.Version
c.Net.SASL.User = config.SASLConfig.User
c.Net.SASL.Password = password

switch config.SASLConfig.Mechanism {
case SASLTypeOAuth:
c.Net.SASL.Mechanism = sarama.SASLTypeOAuth
case SASLTypeSCRAMSHA256:
c.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
case SASLTypeSCRAMSHA512:
c.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
case SASLTypeGSSAPI:
c.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
default:
c.Net.SASL.Mechanism = sarama.SASLTypePlaintext
}
}

switch config.RequiredAcks {
case RequiredAcksNone:
c.Producer.RequiredAcks = sarama.NoResponse
Expand Down
43 changes: 42 additions & 1 deletion pkg/output/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ type Config struct {
Brokers string `yaml:"brokers"`
Topic string `yaml:"topic"`
TLS bool `yaml:"tls" default:"false"`
TLSClientConfig TLSClientConfig `yaml:"tlsClientConfig"`
TLSClientConfig *TLSClientConfig `yaml:"tlsClientConfig"`
SASLConfig *SASLConfig `yaml:"sasl"`
MaxQueueSize int `yaml:"maxQueueSize" default:"51200"`
FlushFrequency time.Duration `yaml:"flushFrequency" default:"10s"`
FlushMessages int `yaml:"flushMessages" default:"500"`
Expand All @@ -26,6 +27,14 @@ type TLSClientConfig struct {
CACertificate string `yaml:"caCertificate"`
}

type SASLConfig struct {
Mechanism SASLMechanism `yaml:"mechanism" default:"PLAIN"`
Version int16 `yaml:"version" default:"1"`
User string `yaml:"user"`
Password string `yaml:"password"`
PasswordFile string `yaml:"passwordFile"`
}

func (c *Config) Validate() error {
if c.Brokers == "" {
return errors.New("brokers is required")
Expand All @@ -35,17 +44,49 @@ func (c *Config) Validate() error {
return errors.New("topic is required")
}

if c.TLSClientConfig != nil && c.SASLConfig != nil {
return errors.New("only one of 'tlsClientConfig' and 'sasl' can be specified")
}

if err := c.TLSClientConfig.Validate(); err != nil {
return err
}

if err := c.SASLConfig.Validate(); err != nil {
return err
}

return nil
}

func (c *TLSClientConfig) Validate() error {
if c == nil {
return nil
}

if c.CertificatePath != "" && c.KeyPath == "" {
return errors.New("client key is required")
}

return nil
}

func (c *SASLConfig) Validate() error {
if c == nil {
return nil
}

if c.User == "" {
return errors.New("'user' is required")
}

if c.Password != "" && c.PasswordFile != "" {
return errors.New("either 'password' or 'passwordFile' can be specified")
}

if c.Password == "" && c.PasswordFile == "" {
return errors.New("'password' or 'passwordFile' is required")
}

return nil
}

0 comments on commit a17e124

Please sign in to comment.