Skip to content

Commit

Permalink
ex/feat: :include option for TelemetryReporter (#27)
Browse files Browse the repository at this point in the history
* feat: add in :include option for selecting specific fields.

* feat: add better stringifying

* chore: fix compilation error
  • Loading branch information
Ziinc authored Jun 11, 2024
1 parent fca7edd commit ea5f751
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 13 deletions.
103 changes: 94 additions & 9 deletions ex/lib/logflare_ex/telemetry_reporter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,20 @@ defmodule LogflareEx.TelemetryReporter do
Client options will then be merged together, with each level overriding the previous.
### Reporter Options
- `:include`: a list of dot paths to include in the event payload.
"""
use GenServer
require Logger

@doc """
`:telemetry.attach/4` callback for allowing attaching to telemetry events.
Telemetry events attached this way are batched to Logflare.
Options:
- `:include` - dot syntax fields to be included.
"""
@spec handle_attach(list(), map(), map(), nil | list()) :: :ok
def handle_attach(event, measurements, metadata, nil),
Expand All @@ -55,27 +62,105 @@ defmodule LogflareEx.TelemetryReporter do
config_file_opts = (Application.get_env(:logflare_ex, __MODULE__) || []) |> Map.new()
opts = Enum.into(config, config_file_opts)

payload = %{metadata: metadata, measurements: measurements}
to_include = Map.get(opts, :include, [])

filtered_payload =
for path <- to_include,
String.starts_with?(path, "measurements.") or String.starts_with?(path, "metadata."),
reduce: %{} do
acc -> put_path(acc, path, get_path(payload, path))
end

# split handler paths
event_str = Enum.map_join(event, ".", &Atom.to_string(&1))

measurements_str =
Enum.map_join(measurements, " ", fn {k, v} ->
"#{inspect(k)}=#{inspect(v)}"
end)
if Map.get(filtered_payload, :measurements) do
Enum.map_join(filtered_payload.measurements, " ", fn {k, v} ->
"#{stringify(k)}=#{stringify(v)}"
end)
else
""
end

message = "#{event_str} | #{measurements_str}"
client = LogflareEx.client(opts)

LogflareEx.send_batched_event(client, %{
message: message,
event: event_str,
metadata: metadata,
measurements: measurements
})
payload =
Map.merge(
%{
message: message,
event: event_str
},
filtered_payload
)

LogflareEx.send_batched_event(client, payload)

:ok
end

defp stringify(v) do
case v do
v when is_float(v) -> Float.to_string(v)
v when is_integer(v) -> Integer.to_string(v)
v when is_atom(v) -> Atom.to_string(v)
v when is_binary(v) -> v
v when is_map(v) -> inspect(v)
_other -> inspect(v)
end
end

# puts a value at a given dot path or atom list path
# if the path does not exist, it will fill in the key(s)
defp put_path(nil, path, value), do: put_path(%{}, path, value)

defp put_path(payload, [part], value) do
Map.put(payload, part, value)
end

defp put_path(payload, [head | tail] = path, value) when is_list(path) do
head_value = Map.get(payload, head)
Map.put(payload, head, put_path(head_value, tail, value))
end

defp put_path(payload, path, value) when is_binary(path) do
list_path =
for part <- String.split(path, ".") do
if atom_map?(payload) do
String.to_existing_atom(part)
else
part
end
end

put_path(payload, list_path, value)
end

defp atom_map?(map) do
key = Map.keys(map) |> List.first()
is_atom(key)
end

# gets a value of a map/struct at a given dot path
defp get_path(payload, path) when is_binary(path) do
for part <- String.split(path, "."), reduce: payload do
%{} = data ->
if atom_map?(data) do
Map.get(data, String.to_existing_atom(part))
else
Map.get(data, part)
end

data when is_list(data) ->
Enum.map(data, fn datum -> get_path(datum, part) end)

other ->
other
end
end

def start_link(opts) do
server_opts = Keyword.take(opts, [:name])
GenServer.start_link(__MODULE__, opts, server_opts)
Expand Down
93 changes: 89 additions & 4 deletions ex/test/telemetry_reporter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,107 @@ defmodule LogflareEx.TelemetryReporterTest do
end)
end

test "handle_attach/4" do
test "handle_attach/4 with :include option" do
pid = self()
ref = make_ref()

Tesla
|> expect(:post, fn _client, _path, _body ->
|> expect(:post, fn _client, _path, body ->
decoded = Bertex.decode(body)
send(pid, {ref, decoded})
{:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}}
end)

:telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4,
auto_flush: true,
flush_interval: 50
flush_interval: 50,
include: [
"measurements.latency",
"metadata.some"
]
)

:telemetry.execute([:some, :event], %{latency: 123}, %{some: "metadata"})
:telemetry.execute([:some, :event], %{latency: %{nested: 123}, value: 1235}, %{
some: "metadata",
to_exclude: "this field"
})

Process.sleep(300)
# should clear cache
assert LogflareEx.count_queued_events() == 0
assert_received {^ref, %{"batch" => [event]}}

# include option will only add
refute event[:metadata][:to_exclude]
refute event[:latency][:value]
# include nested values
assert event[:measurements][:latency][:nested] == 123
assert event[:message] =~ "latency=%{nested: 123}"
end

test "handle_attach/4 with no :include option" do
pid = self()
ref = make_ref()

Tesla
|> expect(:post, fn _client, _path, body ->
decoded = Bertex.decode(body)
send(pid, {ref, decoded})
{:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}}
end)

:telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4,
auto_flush: true,
flush_interval: 50
)

:telemetry.execute([:some, :event], %{latency: 123, value: 1235}, %{
some: "metadata",
to_exclude: "this field"
})

Process.sleep(300)

assert_received {^ref, %{"batch" => [event]}}

# no other fields will be included
assert event[:event] == "some.event"
refute event[:metadata]
refute event[:measurements]
end

test "handle_attach/4 with nested list paths" do
pid = self()
ref = make_ref()

Tesla
|> expect(:post, fn _client, _path, body ->
decoded = Bertex.decode(body)
send(pid, {ref, decoded})
{:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}}
end)

:telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4,
auto_flush: true,
flush_interval: 50,
include: ["measurements.latency"]
)

:telemetry.execute([:some, :event], %{latency: [123, 223], other: "value"}, %{
some: "metadata",
to_exclude: "this field"
})

Process.sleep(300)

assert_received {^ref, %{"batch" => [event]}}

# no other fields will be included
assert event[:event] == "some.event"
assert event[:message] =~ "latency=[123, 223]"
refute event[:metadata]
assert event[:measurements][:latency] == [123, 223]
refute event[:measurements][:other]
end
end

Expand Down

0 comments on commit ea5f751

Please sign in to comment.