This plugin is forked from https://github.com/mdoi/fluent-plugin-gcloud-pubsub
Google Cloud Pub/Sub Input/Output(BufferedOutput) plugin for Fluentd with google-cloud gem
- Publish messages to Google Cloud Pub/Sub
- Pull messages from Google Cloud Pub/Sub
- Create a project on Google Developer Console
- Add a topic of Cloud Pub/Sub to the project
- Add a pull style subscription to the topic
- Download your credential (json) or set scope on GCE instance
When using output plugin, you need to grant Pub/Sub Publisher and Pub/Sub Viewer role to IAM.
Also, when using input plugin, you need to grant Pub/Sub Subscriber and Pub/Sub Viewer role to IAM.
fluent-plugin-gcloud-pubsub-custom | fluentd | ruby |
---|---|---|
>= 1.0.0 | >= v0.14.0 | >= 2.1 |
< 1.0.0 | >= v0.12.0 | >= 1.9 |
Install by gem:
$ gem install fluent-plugin-gcloud-pubsub-custom
Caution
This plugin doesn't work in td-agent.
Please use in Fluentd installed by gem.
Use gcloud_pubsub
output plugin.
<match example.publish>
@type gcloud_pubsub
project <YOUR PROJECT>
key <YOUR KEY>
topic <YOUR TOPIC>
autocreate_topic false
max_messages 1000
max_total_size 9800000
max_message_size 4000000
compress_batches false
<buffer>
@type memory
flush_interval 1s
</buffer>
<format>
@type json
</format>
</match>
project
(optional)- Set your GCP project.
- Running fluentd on GCP, you don't have to specify.
- You can also use environment variable such as
GCLOUD_PROJECT
.
key
(optional)- Set your credential file path.
- Running fluentd on GCP, you can use scope instead of specifying this.
- You can also use environment variable such as
GCLOUD_KEYFILE
.
topic
(required)- Set topic name to publish.
- You can use placeholder in this param. See: https://docs.fluentd.org/v1.0/articles/buffer-section
autocreate_topic
(optional, default:false
)- If set to
true
, specified topic will be created when it doesn't exist.
- If set to
max_messages
(optional, default:1000
)- Publishing messages count per request to Cloud Pub/Sub.
max_total_size
(optional, default:9800000
=9.8MB
)- Publishing messages bytesize per request to Cloud Pub/Sub. This parameter affects only message size. You should specify a little smaller value than quota.
max_message_size
(optional, default:4000000
=4MB
)- Messages exceeding
max_message_size
are not published because Pub/Sub clients cannot receive it.
- Messages exceeding
attribute_keys
(optional, default:[]
)- Extract these fields from the record and send them as attributes on the Pub/Sub message. Cannot be set if compress_batches is enabled.
metric_prefix
(optional, default:fluentd_output_gcloud_pubsub
)- The prefix for Prometheus metric names
compress_batches
(optional, default:false
)- If set to
true
, messages will be batched and compressed before publication. See message compression for details.
- If set to
Use gcloud_pubsub
input plugin.
<source>
@type gcloud_pubsub
tag example.pull
project <YOUR PROJECT>
key <YOUR KEY>
topic <YOUR TOPIC>
subscription <YOUR SUBSCRIPTION>
max_messages 1000
return_immediately true
pull_interval 0.5
pull_threads 2
parse_error_action exception
enable_rpc true
rpc_bind 0.0.0.0
rpc_port 24680
<parse>
@type json
</parse>
</source>
tag
(required)- Set tag of messages.
- If
tag_key
is specified,tag
is used as tag when record don't have specified key.
tag_key
(optional)- Set key to be used as tag.
project
(optional)- Set your GCP project
- Running fluentd on GCP, you don't have to specify.
- You can also use environment variable such as
GCLOUD_PROJECT
.
key
(optional)- Set your credential file path.
- Running fluentd on GCP, you can use scope instead of specifying this.
- You can also use environment variable such as
GCLOUD_KEYFILE
.
topic
(optional)- Set topic name that the subscription belongs to.
subscription
(required)- Set subscription name to pull.
max_messages
(optional, default:100
)return_immediately
(optional, default:true
)- See returnImmediately on https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull
- If
return_immediately
istrue
or pulling message is stopped by HTTP RPC, this plugin waitpull_interval
each pull.
pull_interval
(optional, default:5.0
)- Pulling messages by intervals of specified seconds.
pull_threads
(optional, default:1
)- Set number of threads to pull messages.
attribute_keys
(optional, default:[]
)- Acquire these fields from attributes on the Pub/Sub message and merge them into the record.
parse_error_action
(optional, default:exception
)- Set error type when parsing messages fails.
exception
: Raise exception. Messages are not acknowledged.warning
: Only logging as warning.
- Set error type when parsing messages fails.
metric_prefix
(optional, default:fluentd_input_gcloud_pubsub
)- The prefix for Prometheus metric names
enable_rpc
(optional, default:false
)- If
true
is specified, HTTP RPC to stop or start pulling message is enabled.
- If
rpc_bind
(optional, default:0.0.0.0
)- Bind IP address for HTTP RPC.
rpc_port
(optional, default:24680
)- Port for HTTP RPC.
The compress_batches
option can be used to enable the compression of messages
before publication to Pub/Sub.
This works by collecting the buffered messages, taking up to max_total_size
or
max_message_size
input records, then compressing them with Zlib (i.e.
gzip/Deflate) before publishing them as a single message to the Pub/Sub topic.
When transporting large volumes of records via Pub/Sub, e.g. multiple Terabytes per month, this can lead to significant cost savings, as typically the CPU time required to compress the messages will be minimal in comparison to the Pub/Sub costs.
The compression ratio achievable will vary largely depending on the homogeneity of the input records, but typically will be 50% at the very minimum and often around 80-90%.
In order to achieve good compression, consider the following:
- Ensure that the buffer is being filled with a reasonable batch of messages: do
not use
flush_mode immediate
, and keep theflush_interval
value sufficiently high. Use the Prometheus metrics to determine how many records are being published per message. - Keep the
max_messages
andmax_message_size
values high (the defaults are optimal). - If there are many different sources of messages being mixed and routed to a
single
gcloud_pubsub
output, use multiple outputs (which will each have their own buffer) through tagging or labelling.
The receiving end must be able to decode these compressed batches of messages,
which it can determine via an attribute set on the Pub/Sub message. The
gcloud_pubsub
input plugin will do this transparently, decompressing any
messages which contain a batch of records and normally processing any messages
which represent just a single record.
Therefore, as long as all of the receivers are updated with support for
compressed batches first, it's then possible to gradually roll out this feature.
The input and output plugins expose several metrics in order to monitor performance:
-
fluentd_output_gcloud_pubsub_compression_enabled
- Gauge: Whether compression/batching is enabled
-
fluentd_output_gcloud_pubsub_messages_published_per_batch
- Histogram: Number of records published to Pub/Sub per buffer flush
-
fluentd_output_gcloud_pubsub_messages_published_bytes
- Histogram: Total size in bytes of the records published to Pub/Sub, before compression.
-
fluentd_output_gcloud_pubsub_messages_compression_duration_seconds
- Histogram: Time taken to compress a batch of messages
-
fluentd_output_gcloud_pubsub_messages_compressed_size_per_original_size_ratio
- Histogram: Compression ratio achieved on a batch of messages, expressed in terms of space saved.
-
fluentd_input_gcloud_pubsub_pull_errors_total
- Counter: Errors encountered while pulling or processing messages (split by a
retryable
label)
- Counter: Errors encountered while pulling or processing messages (split by a
-
fluentd_input_gcloud_pubsub_messages_pulled
- Histogram: Number of Pub/Sub messages pulled by the subscriber on each invocation
-
fluentd_input_gcloud_pubsub_messages_pulled_bytes
- Histogram: Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request
- Add
tag
attribute in output plugin and usetag
attribute as tag in input plugin. - Send ack after other output plugin committed (if possible).