diff --git a/ex/lib/logflare_ex/telemetry_reporter.ex b/ex/lib/logflare_ex/telemetry_reporter.ex index cccbf44..f34ddbc 100644 --- a/ex/lib/logflare_ex/telemetry_reporter.ex +++ b/ex/lib/logflare_ex/telemetry_reporter.ex @@ -38,6 +38,10 @@ defmodule LogflareEx.TelemetryReporter do Client options will then be merged together, with each level overriding the previous. + + ### Reporter Options + + - `:include`: a list of dot paths to include in the event payload. """ use GenServer require Logger @@ -45,6 +49,9 @@ defmodule LogflareEx.TelemetryReporter do @doc """ `:telemetry.attach/4` callback for allowing attaching to telemetry events. Telemetry events attached this way are batched to Logflare. + + Options: + - `:include` - dot syntax fields to be included. """ @spec handle_attach(list(), map(), map(), nil | list()) :: :ok def handle_attach(event, measurements, metadata, nil), @@ -55,27 +62,105 @@ defmodule LogflareEx.TelemetryReporter do config_file_opts = (Application.get_env(:logflare_ex, __MODULE__) || []) |> Map.new() opts = Enum.into(config, config_file_opts) + payload = %{metadata: metadata, measurements: measurements} + to_include = Map.get(opts, :include, []) + + filtered_payload = + for path <- to_include, + String.starts_with?(path, "measurements.") or String.starts_with?(path, "metadata."), + reduce: %{} do + acc -> put_path(acc, path, get_path(payload, path)) + end + # split handler paths event_str = Enum.map_join(event, ".", &Atom.to_string(&1)) measurements_str = - Enum.map_join(measurements, " ", fn {k, v} -> - "#{inspect(k)}=#{inspect(v)}" - end) + if Map.get(filtered_payload, :measurements) do + Enum.map_join(filtered_payload.measurements, " ", fn {k, v} -> + "#{stringify(k)}=#{stringify(v)}" + end) + else + "" + end message = "#{event_str} | #{measurements_str}" client = LogflareEx.client(opts) - LogflareEx.send_batched_event(client, %{ - message: message, - event: event_str, - metadata: metadata, - measurements: measurements - }) + payload = + Map.merge( + %{ + message: message, + event: event_str + }, + filtered_payload + ) + + LogflareEx.send_batched_event(client, payload) :ok end + defp stringify(v) do + case v do + v when is_float(v) -> Float.to_string(v) + v when is_integer(v) -> Integer.to_string(v) + v when is_atom(v) -> Atom.to_string(v) + v when is_binary(v) -> v + v when is_map(v) -> inspect(v) + _other -> inspect(v) + end + end + + # puts a value at a given dot path or atom list path + # if the path does not exist, it will fill in the key(s) + defp put_path(nil, path, value), do: put_path(%{}, path, value) + + defp put_path(payload, [part], value) do + Map.put(payload, part, value) + end + + defp put_path(payload, [head | tail] = path, value) when is_list(path) do + head_value = Map.get(payload, head) + Map.put(payload, head, put_path(head_value, tail, value)) + end + + defp put_path(payload, path, value) when is_binary(path) do + list_path = + for part <- String.split(path, ".") do + if atom_map?(payload) do + String.to_existing_atom(part) + else + part + end + end + + put_path(payload, list_path, value) + end + + defp atom_map?(map) do + key = Map.keys(map) |> List.first() + is_atom(key) + end + + # gets a value of a map/struct at a given dot path + defp get_path(payload, path) when is_binary(path) do + for part <- String.split(path, "."), reduce: payload do + %{} = data -> + if atom_map?(data) do + Map.get(data, String.to_existing_atom(part)) + else + Map.get(data, part) + end + + data when is_list(data) -> + Enum.map(data, fn datum -> get_path(datum, part) end) + + other -> + other + end + end + def start_link(opts) do server_opts = Keyword.take(opts, [:name]) GenServer.start_link(__MODULE__, opts, server_opts) diff --git a/ex/test/telemetry_reporter_test.exs b/ex/test/telemetry_reporter_test.exs index fe8c185..9788498 100644 --- a/ex/test/telemetry_reporter_test.exs +++ b/ex/test/telemetry_reporter_test.exs @@ -17,22 +17,107 @@ defmodule LogflareEx.TelemetryReporterTest do end) end - test "handle_attach/4" do + test "handle_attach/4 with :include option" do + pid = self() + ref = make_ref() + Tesla - |> expect(:post, fn _client, _path, _body -> + |> expect(:post, fn _client, _path, body -> + decoded = Bertex.decode(body) + send(pid, {ref, decoded}) {:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}} end) :telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4, auto_flush: true, - flush_interval: 50 + flush_interval: 50, + include: [ + "measurements.latency", + "metadata.some" + ] ) - :telemetry.execute([:some, :event], %{latency: 123}, %{some: "metadata"}) + :telemetry.execute([:some, :event], %{latency: %{nested: 123}, value: 1235}, %{ + some: "metadata", + to_exclude: "this field" + }) Process.sleep(300) # should clear cache assert LogflareEx.count_queued_events() == 0 + assert_received {^ref, %{"batch" => [event]}} + + # include option will only add + refute event[:metadata][:to_exclude] + refute event[:latency][:value] + # include nested values + assert event[:measurements][:latency][:nested] == 123 + assert event[:message] =~ "latency=%{nested: 123}" + end + + test "handle_attach/4 with no :include option" do + pid = self() + ref = make_ref() + + Tesla + |> expect(:post, fn _client, _path, body -> + decoded = Bertex.decode(body) + send(pid, {ref, decoded}) + {:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}} + end) + + :telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4, + auto_flush: true, + flush_interval: 50 + ) + + :telemetry.execute([:some, :event], %{latency: 123, value: 1235}, %{ + some: "metadata", + to_exclude: "this field" + }) + + Process.sleep(300) + + assert_received {^ref, %{"batch" => [event]}} + + # no other fields will be included + assert event[:event] == "some.event" + refute event[:metadata] + refute event[:measurements] + end + + test "handle_attach/4 with nested list paths" do + pid = self() + ref = make_ref() + + Tesla + |> expect(:post, fn _client, _path, body -> + decoded = Bertex.decode(body) + send(pid, {ref, decoded}) + {:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}} + end) + + :telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4, + auto_flush: true, + flush_interval: 50, + include: ["measurements.latency"] + ) + + :telemetry.execute([:some, :event], %{latency: [123, 223], other: "value"}, %{ + some: "metadata", + to_exclude: "this field" + }) + + Process.sleep(300) + + assert_received {^ref, %{"batch" => [event]}} + + # no other fields will be included + assert event[:event] == "some.event" + assert event[:message] =~ "latency=[123, 223]" + refute event[:metadata] + assert event[:measurements][:latency] == [123, 223] + refute event[:measurements][:other] end end