-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(inputs): Add Mavlink input plugin #16221
base: master
Are you sure you want to change the base?
feat(inputs): Add Mavlink input plugin #16221
Conversation
…to feature/mavlink-input-plugin
plugins/inputs/mavlink/mavlink.go
Outdated
endpointConfig, err := ParseMavlinkEndpointConfig(s) | ||
if err != nil { | ||
s.Log.Debugf("%s", err.Error()) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Config validation should be done in func (s *Mavlink) Init() error
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, move this code into Init.
plugins/inputs/mavlink/mavlink.go
Outdated
} | ||
|
||
func (s *Mavlink) Start(acc telegraf.Accumulator) error { | ||
s.acc = acc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really necessary to copy the var I think,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
plugins/inputs/mavlink/sample.conf
Outdated
# REQUEST_DATA_STREAM or MAV_CMD_SET_MESSAGE_INTERVAL | ||
# (See https://mavlink.io/en/mavgen_python/howto_requestmessages.html#how-to-request--stream-messages) | ||
stream_request_enable = true | ||
stream_request_frequency = 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no EOL at EOF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
plugins/inputs/mavlink/sample.conf
Outdated
@@ -0,0 +1,37 @@ | |||
# Read metrics from a Mavlink connection to a flight controller. | |||
[[inputs.mavlink]] | |||
# Flight controller URL. Must be a valid Mavlink connection string in one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments need to have double escaped: ##
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
plugins/inputs/mavlink/README.md
Outdated
### Note: Mavlink Dialects | ||
|
||
This plugin currently only uses the ArduPilot-specific dialect, which also | ||
includes messages from the common Mavlink dialect. | ||
|
||
See the [Mavlink docs](https://mavlink.io/en/messages/ardupilotmega.html) for | ||
more info on dialects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intended to be a sub-section of Configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just removed this and merged it into the copy in the plugin summary at the top, doesn't need its own section.
plugins/inputs/mavlink/mavlink.go
Outdated
// Container for a parsed Mavlink frame | ||
type metricFrameData struct { | ||
name string | ||
tags map[string]string | ||
fields map[string]any | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, you can just use telegraf.Metric
as that will be just the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this type entirely in favor of passing around telegraf.Metric
for i := 0; i < t.NumField(); i++ { | ||
field := t.Field(i) | ||
value := v.Field(i) | ||
out.fields[ConvertToSnakeCase(field.Name)] = value.Interface() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is needed to do this conversion, Telegraf is perfectly fine with CamelCase field and metric names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep this around, there's a lot of different conventions in mavlink message naming and not much consistency. The Mavlink docs use capitalized snake case like MESSAGE_GLOBAL_POSITION_INT
, gomavlib converts that to MessageGlobalPositionInt
, and some other client libraries use normal snake case like message_global_position_int
.
My argument for using lowercase snake case is that, with a lack of one definite standard, I suspect by convention most people will want the outputs to be snake case in their database (Influx, Postgres, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, fine for me 👍
// Check if a string is in a slice | ||
func Contains(slice []string, str string) bool { | ||
for _, item := range slice { | ||
if item == str { | ||
return true | ||
} | ||
} | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use internal choice.Contains
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated
plugins/inputs/mavlink/mavlink.go
Outdated
if err != nil { | ||
s.Log.Debugf("Mavlink failed to connect (%s), will try again in 5s...", err.Error()) | ||
time.Sleep(5 * time.Second) | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply return a internal.StartupError
and let Telegraf handle the reconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated so the Mavlink connection happens outside the loop and throws StartupError
. One caveat I do see about doing it this way is that this causes Telegraf to shutdown if Mavlink client initialization fails, so we could lose the metrics from other plugins if we're rebooting waiting for the USB device which may not turn on at the same time as the companion computer.
That said, this condition actually only happens in serial port mode, where the plugin is looking for a serial device. In TCP or UDP mode the client internally handles reconnection so this won't cause the client to fail.
if strings.HasPrefix(s.FcuURL, "serial://") { | ||
tmpStr := strings.TrimPrefix(s.FcuURL, "serial://") | ||
tmpStrParts := strings.Split(tmpStr, ":") | ||
deviceName := tmpStrParts[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Go builtin net/url
has useful Parse
method to facilitate this easier..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to remove strings.HasPrefix(s.FcuURL, "serial://")
and use url.Parse
instead. There's still a little string find-and-replace here, because the serial port paths have forward slashes in them, which seems to mess with the URL parsing.
Download PR build artifacts for linux_amd64.tar.gz, darwin_arm64.tar.gz, and windows_amd64.zip. 📦 Click here to get additional PR build artifactsArtifact URLs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going in good direction!
// Parse out the Mavlink endpoint. | ||
endpointConfig, err := ParseMavlinkEndpointConfig(s) | ||
if err != nil { | ||
return fmt.Errorf("%s", err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply return the error? Why wrapping the message in a new string to be added as error?
func ParseMavlinkEndpointConfig(s *Mavlink) ([]gomavlib.EndpointConf, error) { | ||
u, err := url.Parse(s.FcuURL) | ||
if err != nil { | ||
return nil, errors.New("invalid fcu_url") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, errors.New("invalid fcu_url") | |
return nil, fmt.Errorf("invalid fcu_url: %w", err) |
if len(tmpStrParts) == 2 { | ||
newBaudRate, err := strconv.Atoi(tmpStrParts[1]) | ||
if err != nil { | ||
return nil, errors.New("serial baud rate not valid") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, errors.New("serial baud rate not valid") | |
return nil, fmt.Errorf("serial baud rate not valid: %w", err) |
And similar for all following error returns..
tmpStr := strings.TrimPrefix(s.FcuURL, "tcp://") | ||
tmpStrParts := strings.Split(tmpStr, ":") | ||
if len(tmpStrParts) != 2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use net/url utilities
tmpStr := strings.TrimPrefix(s.FcuURL, "tcp://") | |
tmpStrParts := strings.Split(tmpStr, ":") | |
if len(tmpStrParts) != 2 { | |
if u.Port() == "" { |
hostname := tmpStrParts[0] | ||
port, err := strconv.Atoi(tmpStrParts[1]) | ||
if err != nil { | ||
return nil, errors.New("tcp port is invalid") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to convert port to int to convert it to string again later..
hostname := tmpStrParts[0] | |
port, err := strconv.Atoi(tmpStrParts[1]) | |
if err != nil { | |
return nil, errors.New("tcp port is invalid") | |
} | |
hostname := u.Hostname() | |
port := u.Port() |
(validity of port number already done in url.Parse
.)
if len(s.MessageFilter) > 0 && !choice.Contains(result.Name(), s.MessageFilter) { | ||
continue | ||
} | ||
result.AddTag("fcu_url", s.FcuURL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We normally use source
as tag name for indicating from where a metric arrived.
}) | ||
if err != nil { | ||
return &internal.StartupError{ | ||
Err: fmt.Errorf("mavlink client failed (%w)", err), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err: fmt.Errorf("mavlink client failed (%w)", err), | |
Err: fmt.Errorf("mavlink client failed: %w", err), |
|
||
## Filter to specific messages. Only the messages in this list will be parsed. | ||
## If blank or unset, all messages will be accepted. | ||
# message_filter = ["global_position_int", "attitude"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented out paramaters should contain the default value. This seems not to be the case here?
if len(hostname) > 0 { | ||
return []gomavlib.EndpointConf{ | ||
gomavlib.EndpointTCPClient{ | ||
Address: fmt.Sprintf("%s:%d", hostname, port), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could simply be
Address: fmt.Sprintf("%s:%d", hostname, port), | |
Address: u.Hostname, |
As you checked before it has port number..
} | ||
|
||
// Parse the FcuURL config to setup a mavlib endpoint config | ||
func ParseMavlinkEndpointConfig(s *Mavlink) ([]gomavlib.EndpointConf, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is the only value that is used in this function, then maybe just only pass that.
func ParseMavlinkEndpointConfig(s *Mavlink) ([]gomavlib.EndpointConf, error) { | |
func ParseMavlinkEndpointConfig(fcuURL string) ([]gomavlib.EndpointConf, error) { |
Summary
Add a Mavlink input plugin.
The
mavlink
plugin connects to a MavLink-compatible flight controller such as as ArduPilot or PX4. and translates all incoming messages into metrics.The purpose of this plugin is to allow Telegraf to be used to ingest live flight metrics from unmanned systems (drones, planes, boats, etc.)
Telegraf is already often used on flight computers (eg a Raspberry Pi) to collect system metrics for drones and it would be valuable to extend this to also provide a convenient way to record flight telemetry.
TODO
gomavlib
is currently on a fork to support reading serial ports on i386 and darwin. Will need to upstream the fix for this and move back to the main repo after merge, or before merge if the forked gomavlib is not acceptableChecklist
Related issues