Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeZLin committed Feb 26, 2024
2 parents 6ff207e + ab6ce81 commit 8e3247e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
2 changes: 1 addition & 1 deletion input.c
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ int inbuf_read_to_delimeter (struct inbuf *inbuf, FILE *fp,
if (!inbuf->buf)
return 0; /* Previous EOF encountered, see below. */

while (conf.run && 1) {
while (conf.run) {
ssize_t r;
size_t dof;
int delim_found;
Expand Down
15 changes: 8 additions & 7 deletions kcat.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
void *opaque) {
int32_t broker_id = -1;
struct buf *b = rkmessage->_private;
#if RD_KAFKA_VERSION < 0x01000000
static int say_once = 1;
#endif

if (b)
buf_destroy(b);
Expand All @@ -166,7 +169,6 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,

#if RD_KAFKA_VERSION < 0x01000000
if (rkmessage->offset == 0 && say_once) {
static int say_once = 1;
KC_INFO(3, "Enable message offset reporting "
"with '-X topic.produce.offset.report=true'\n");
say_once = 0;
Expand Down Expand Up @@ -414,12 +416,13 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) {
} else {
struct inbuf inbuf;
struct buf *b;
int at_eof = 0;

inbuf_init(&inbuf, conf.msg_size, conf.delim, conf.delim_size);

/* Read messages from input, delimited by conf.delim */
while (conf.run &&
inbuf_read_to_delimeter(&inbuf, fp, &b)) {
!(at_eof = !inbuf_read_to_delimeter(&inbuf, fp, &b))) {
int msgflags = 0;
char *buf = b->buf;
char *key = NULL;
Expand Down Expand Up @@ -484,11 +487,9 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) {
conf.run = 0;
}

if (conf.run) {
if (!feof(fp))
KC_FATAL("Unable to read message: %s",
strerror(errno));
}
if (conf.run && !at_eof)
KC_FATAL("Unable to read message: %s",
strerror(errno));
}

#if ENABLE_TXNS
Expand Down
17 changes: 17 additions & 0 deletions tests/0004-piped_producer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
#

set -e
source helpers.sh


#
# Verify that piping messages to the producer works.
#


topic=$(make_topic_name)



echo "msg1" | $KCAT -P -t $topic

0 comments on commit 8e3247e

Please sign in to comment.