Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only start Journald input with supported systemd versions #39605

Closed
Closed
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Make HTTP Endpoint input GA. {issue}38979[38979] {pull}39410[39410]
- Update CEL mito extensions to v1.12.2. {pull}39755[39755]
- Add support for base64-encoded HMAC headers to HTTP Endpoint. {pull}39655[39655]
- Journald input validates the minimum compatible version of Systemd and will fail to start if the Systemd version in the host < v255. {issue}34077[34077] {pull}39605[39605]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ TEST_BOXES = [
{:name => "ubuntu1604", :box => "ubuntu/xenial64", :platform => "ubuntu"},
{:name => "ubuntu1804", :box => "ubuntu/bionic64", :platform => "ubuntu"},
{:name => "ubuntu2004", :box => "ubuntu/focal64", :platform => "ubuntu"},
{:name => "ubuntu2204", :box => "ubuntu/jammy64", :platform => "ubuntu"},

{:name => "debian8", :box => "generic/debian8", :platform => "debian"},
{:name => "debian9", :box => "debian/stretch64", :platform => "debian"},
Expand Down
8 changes: 8 additions & 0 deletions filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func Filebeat(inputs beater.PluginFactory, settings instance.Settings) *cmd.Beat
command.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M"))
command.TestCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
command.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))

// main_test.go calls this function before the Journald input is initialised
// to avoid panics, we check whether the flag is defined before calling
// AddGoFlag
if ignoreSystemdFlag := flag.CommandLine.Lookup("ignore-journald-version"); ignoreSystemdFlag != nil {
command.Flags().AddGoFlag(ignoreSystemdFlag)
}

command.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager))
command.AddCommand(genGenerateCmd())
return command
Expand Down
3 changes: 3 additions & 0 deletions filebeat/docs/inputs/input-journald.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ https://www.freedesktop.org/software/systemd/man/systemd-journald.service.html[`
is a system service that collects and stores logging data. The `journald` input
reads this log data and the metadata associated with it.

WARNING: There is bug in journald < v255 that causes {beatname_uc} to
crash. Only use this input with journald >= v255.

The simplest configuration example is one that reads all logs from the default
journal.

Expand Down
1 change: 1 addition & 0 deletions filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{})
e.t.Helper()
e.grp = unison.TaskGroup{}
manager := e.getManager()
if err := manager.Init(&e.grp, v2.ModeRun); err != nil {
if err := manager.Init(&e.grp); err != nil {
e.t.Fatalf("failed to initialise manager: %+v", err)
}

Expand Down
124 changes: 117 additions & 7 deletions filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
package journald

import (
"context"
"errors"
"flag"
"fmt"
"regexp"
"strconv"
"time"

"github.com/coreos/go-systemd/v22/dbus"
"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/urso/sderr"

Expand All @@ -37,6 +44,15 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

var noVersionCheck bool

func init() {
flag.BoolVar(&noVersionCheck,
"ignore-journald-version",
false,
"Does not check Journald version when starting the Journald input. This might cause Filebeat to crash!")
}

type journald struct {
Backoff time.Duration
MaxBackoff time.Duration
Expand All @@ -63,21 +79,54 @@ const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL"

const pluginName = "journald"

// ErrSystemdVersionNotSupported is returned by the plugin manager when the
// Systemd version is not supported.
var ErrSystemdVersionNotSupported = errors.New("systemd version must be >= 255")

// ErrCannotGetSystemdVersion is returned by the plugin manager when it is
// not possible to get the Systemd version via D-Bus.
var ErrCannotGetSystemdVersion = errors.New("cannot get systemd version")

// Plugin creates a new journald input plugin for creating a stateful input.
func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin {
return input.Plugin{
m := &cursor.InputManager{
Logger: log,
StateStore: store,
Type: pluginName,
Configure: configure,
}
p := input.Plugin{
Name: pluginName,
Stability: feature.Experimental,
Deprecated: false,
Info: "journald input",
Doc: "The journald input collects logs from the local journald service",
Manager: &cursor.InputManager{
Logger: log,
StateStore: store,
Type: pluginName,
Configure: configure,
},
Manager: m,
}

if noVersionCheck {
log.Warn("Journald version check has been DISABLED! Filebeat might crash if Journald version is < 255.")
return p
}

version, err := systemdVersion()
if err != nil {
configErr := fmt.Errorf("%w: %s", ErrCannotGetSystemdVersion, err)
m.Configure = func(_ *conf.C) ([]cursor.Source, cursor.Input, error) {
return nil, nil, configErr
}
return p
}

if version < 255 {
configErr := fmt.Errorf("%w. Systemd version: %d", ErrSystemdVersionNotSupported, version)
m.Configure = func(_ *conf.C) ([]cursor.Source, cursor.Input, error) {
return nil, nil, configErr
}
return p
}

return p
}

type pathSource string
Expand Down Expand Up @@ -303,3 +352,64 @@ func (r *readerAdapter) Next() (reader.Message, error) {

return m, nil
}

// parseSystemdVersion parses the string version from Systemd fetched via D-Bus.
// The function will parse and return the 3 digit major version, minor version
// and patch are ignored.
func parseSystemdVersion(ver string) (int, error) {
re := regexp.MustCompile(`(v)?(?P<version>\d\d\d)(\.)?`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 3 digits makes me a little nervous, if systemd goes to 4 this all breaks.

^(?P<version>\d{3,}) works with the examples provided. thoughts?

matches := re.FindStringSubmatch(ver)
if len(matches) == 0 {
return 0, fmt.Errorf("unsupported Systemd version format '%s'", ver)
}

// This should never fail because the regexp ensures we're getting a 3-digt
// integer, however, better safe than sorry.
version, err := strconv.Atoi(matches[2])
if err != nil {
return 0, fmt.Errorf("could not convert '%s' to int: %w", matches[2], err)
}

return version, nil
}

// getSystemdVersionViaDBus gets the Systemd version from sd-bus
//
// The Systemd D-Bus documentation states:
//
// Version encodes the version string of the running systemd
// instance. Note that the version string is purely informational,
// it should not be parsed, one may not assume the version to be
// formatted in any particular way. We take the liberty to change
// the versioning scheme at any time and it is not part of the API.
// Source: https://www.freedesktop.org/wiki/Software/systemd/dbus/
Comment on lines +376 to +385
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question]
Then why to use dbus to get the version? Is it the most generic way of doing that across all linux distros?

func getSystemdVersionViaDBus() (string, error) {
// Get a context with timeout just to be on the safe side
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := dbus.NewSystemConnectionContext(ctx)
if err != nil {
return "", fmt.Errorf("cannot connect to sd-bus: %w", err)
}

version, err := conn.GetManagerProperty("Version")
if err != nil {
return "", fmt.Errorf("cannot get version property: %w", err)
}

return version, nil
}

func systemdVersion() (int, error) {
versionStr, err := getSystemdVersionViaDBus()
if err != nil {
return 0, fmt.Errorf("caanot get Systemd version: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

Suggested change
return 0, fmt.Errorf("caanot get Systemd version: %w", err)
return 0, fmt.Errorf("cannot get Systemd version: %w", err)

}

version, err := parseSystemdVersion(versionStr)
if err != nil {
return 0, fmt.Errorf("cannot parse Systemd version: %w", err)
}

return version, nil
}
64 changes: 64 additions & 0 deletions filebeat/input/journald/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,25 @@ package journald

import (
"context"
"flag"
"fmt"
"path"
"testing"

"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestMain(m *testing.M) {
// We use TestMain because all of our current (May 2024) supported Linux
// distributions ship with a version of Systemd that will cause Filebeat
// input to crash when the journal reached the maximum number of files and
// a new rotation happens. To allow the Journald input to be instantiated
// we need to set a CLI flag and that needs to be done before all tests run.
flag.Parse()
noVersionCheck = true
m.Run()
}

func TestInputFieldsTranslation(t *testing.T) {
// A few random keys to verify
keysToCheck := map[string]string{
Expand Down Expand Up @@ -84,3 +96,55 @@ func TestInputFieldsTranslation(t *testing.T) {
})
}
}

func TestParseSystemdVersion(t *testing.T) {
foo := map[string]struct {
data string
expected int
}{
"Archlinux": {
expected: 255,
data: `255.6-1-arch`,
},
"AmazonLinux2": {
expected: 252,
data: `252.16-1.amzn2023.0.2`,
},
"Ubuntu 2204": {
expected: 249,
data: `249.11-0ubuntu3.12`,
},
"Debain 10": {
expected: 241,
data: "241",
},
"Red Hat Enterprise Linux 8": {
expected: 239,
data: "239 (239-78.el8)",
},
}

for name, tc := range foo {
t.Run(name, func(t *testing.T) {
version, err := parseSystemdVersion(tc.data)
if err != nil {
t.Errorf("did not expect an error: %s", err)
}

if version != tc.expected {
t.Errorf("expecting version %d, got %d", tc.expected, version)
}
})
}
}

func TestGetJounraldVersion(t *testing.T) {
version, err := getSystemdVersionViaDBus()
if err != nil {
t.Fatalf("did not expect an error: %s", err)
}

if version == "" {
t.Fatal("version must not be an empty string")
}
}
20 changes: 20 additions & 0 deletions filebeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

devtools "github.com/elastic/beats/v7/dev-tools/mage"
"github.com/elastic/beats/v7/dev-tools/mage/target/build"
"github.com/elastic/beats/v7/dev-tools/mage/target/unittest"
filebeat "github.com/elastic/beats/v7/filebeat/scripts/mage"

//mage:import
Expand All @@ -45,6 +46,7 @@ import (
func init() {
common.RegisterCheckDeps(Update)
test.RegisterDeps(IntegTest)
unittest.RegisterGoTestDeps(TestJournaldInput)

devtools.BeatDescription = "Filebeat sends log files to Logstash or directly to Elasticsearch."
}
Expand Down Expand Up @@ -214,3 +216,21 @@ func PythonIntegTest(ctx context.Context) error {
mg.Deps(Fields, Dashboards, devtools.BuildSystemTestBinary)
return devtools.PythonIntegTestFromHost(devtools.DefaultPythonTestIntegrationFromHostArgs())
}

// TestJournaldInput executes the Journald input unit tests.
//
// It requires Systemd and D-Bus to be installed
// on the host.
//
// Use TEST_COVERAGE=true to enable code coverage profiling.
// Use RACE_DETECTOR=true to enable the race detector.
func TestJournaldInput(ctx context.Context) error {
if devtools.Platform.GOOS == "linux" {
testArgs := devtools.DefaultGoTestUnitArgs()
testArgs.Packages = []string{"./input/journald"}
testArgs.ExtraFlags = append(testArgs.ExtraFlags, "-tags=withjournald")
return devtools.GoTest(ctx, testArgs)
}

return nil
}
7 changes: 7 additions & 0 deletions libbeat/docs/command-reference.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -744,12 +744,19 @@ the end of the file is reached. By default harvesters are closed after
The `--once` option is not currently supported with the
{filebeat-ref}/filebeat-input-filestream.html[`filestream`] input type.

*`--ignore-journald-version`*::
When the `--ignore-journald-version` is used, the Journald input
**will not** validate the minimum Systemd version during the input
initialisation. Running the Journald input with an unsupported version
of Systemd might cause {beatname_uc} to crash.
endif::[]

ifeval::["{beatname_lc}"=="metricbeat"]
*`--system.hostfs MOUNT_POINT`*::

Specifies the mount point of the host's filesystem for use in monitoring a host.
This flag is depricated, and an alternate hostfs should be specified via the `hostfs` module config value.
endif::[]


ifeval::["{beatname_lc}"=="packetbeat"]
Expand Down
15 changes: 15 additions & 0 deletions x-pack/filebeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

devtools "github.com/elastic/beats/v7/dev-tools/mage"
"github.com/elastic/beats/v7/dev-tools/mage/target/build"
"github.com/elastic/beats/v7/dev-tools/mage/target/unittest"
filebeat "github.com/elastic/beats/v7/filebeat/scripts/mage"

//mage:import
Expand All @@ -33,6 +34,7 @@ import (
func init() {
common.RegisterCheckDeps(Update)
test.RegisterDeps(IntegTest)
unittest.RegisterGoTestDeps(TestJournaldInput)

devtools.BeatDescription = "Filebeat sends log files to Logstash or directly to Elasticsearch."
devtools.BeatLicense = "Elastic License"
Expand Down Expand Up @@ -187,3 +189,16 @@ func PythonIntegTest(ctx context.Context) error {
mg.Deps(Fields, Dashboards, devtools.BuildSystemTestBinary)
return devtools.PythonIntegTestFromHost(devtools.DefaultPythonTestIntegrationFromHostArgs())
}

// TestJournald executes the Journald input tests
// Use TEST_COVERAGE=true to enable code coverage profiling.
// Use RACE_DETECTOR=true to enable the race detector.
func TestJournaldInput(ctx context.Context) error {
utArgs := devtools.DefaultGoTestUnitArgs()
utArgs.Packages = []string{"../../filebeat/input/journald"}
if devtools.Platform.GOOS == "linux" {
utArgs.ExtraFlags = append(utArgs.ExtraFlags, "-tags=withjournald")
}

return devtools.GoTest(ctx, utArgs)
}
Comment on lines +193 to +204
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a way to avoid duplicating that?

Loading