From 0c7a28cb95e8e80c37f9ed999533c66fc83405bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ahl?= Date: Fri, 15 Mar 2019 22:38:26 +0100 Subject: [PATCH] feat: tail command command tail is a poor man tail that pool s3 for new access logs default every 1min. It possible to change how often it should pool for new access logs from s3 with flag --polling-interval --- cmd/tail.go | 65 ++++++++++++++++++++++++++++++++ logworker/logworker.go | 84 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 cmd/tail.go diff --git a/cmd/tail.go b/cmd/tail.go new file mode 100644 index 0000000..dd88c7e --- /dev/null +++ b/cmd/tail.go @@ -0,0 +1,65 @@ +package cmd + +import ( + "bytes" + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/dbgeek/elblogcat/logcat" + "github.com/dbgeek/elblogcat/logworker" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +// tailCmd represents the tail command +var tailCmd = &cobra.Command{ + Use: "tail", + Short: "Porman tail pool for new accesslogs for default every 1min", + Long: ` +`, + Run: func(cmd *cobra.Command, args []string) { + awsConfiguration := logworker.AWSconfiguration{Region: "eu-west-1"} + configuration := logworker.NewConfiguration() + accessLogFilter := logworker.NewAccessLogFilter() + client := logworker.NewLogWorker( + &awsConfiguration, + &configuration, + &accessLogFilter, + ) + logs := make(chan string, 1) + + client.Tail(logs) + + for v := range logs { + buff := &aws.WriteAtBuffer{} + key := fmt.Sprintf("%s%s", accessLogFilter.AccesslogPath(configuration.Prefix), v) + _, err := client.S3Downloader.Download(buff, &s3.GetObjectInput{ + Bucket: aws.String(viper.GetString("s3-bucket")), + Key: aws.String(key), + }) + if err != nil { + logworker.Logger.Fatalf("Failed to Download key: %v from s3. Got error: %v", + key, + err) + } + + c := logcat.NewRowFilter() + b := bytes.NewBuffer(buff.Bytes()) + a := logcat.Accesslog{ + Content: b, + RowFilter: c, + PrintFields: viper.GetString("fields"), + } + a.Cat() + } + }, +} + +func init() { + rootCmd.AddCommand(tailCmd) + tailCmd.PersistentFlags().Duration("polling-interval", 60*time.Second, "") + viper.BindPFlag("polling-interval", tailCmd.PersistentFlags().Lookup("polling-interval")) + +} diff --git a/logworker/logworker.go b/logworker/logworker.go index e8b592d..547c522 100644 --- a/logworker/logworker.go +++ b/logworker/logworker.go @@ -31,8 +31,9 @@ type ( Profile string } Configuration struct { - Bucket string - Prefix string + Bucket string + Prefix string + PollingInterval time.Duration } AccessLogFilter struct { matchString string @@ -138,6 +139,80 @@ func (l *LogWorker) List() []string { return accessLogs } +func (l *LogWorker) Tail(logch chan<- string) { + go func() { + accessLogFilter := NewAccessLogFilter() + consumedAccessLogs := make(map[string]struct{}) + + lbAccessLogTimestamp := l.AccessLogFilter.StartTime + for t := lbAccessLogTimestamp; t.Before(time.Now().UTC()); t = t.Add(5 * time.Minute) { + lbAccessLogTimestamp = t + lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s", + accessLogFilter.AwsAccountID, + accessLogFilter.Region, + accessLogFilter.LoadBalancerID, + t.Format("20060102T1504Z"), + ) + s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog) + for _, accessLog := range *l.listAccessLogs(s3Prefix) { + if _, ok := consumedAccessLogs[accessLog]; !ok { + consumedAccessLogs[accessLog] = struct{}{} + logch <- accessLog + } + } + } + + poller := time.Tick(l.Configuration.PollingInterval) + for now := range poller { + + lbAccessLogTimestamp = lbAccessLogTimestamp.Add(15 * time.Second) + lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s", + accessLogFilter.AwsAccountID, + accessLogFilter.Region, + accessLogFilter.LoadBalancerID, + now.UTC().Format("20060102T1504Z"), + ) + s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog) + for _, accessLog := range *l.listAccessLogs(s3Prefix) { + if _, ok := consumedAccessLogs[accessLog]; !ok { + consumedAccessLogs[accessLog] = struct{}{} + logch <- accessLog + } + } + for k := range consumedAccessLogs { + ts := strings.Split(k, "_") + t, _ := time.Parse("20060102T1504Z", ts[4]) + if t.Before(now.UTC().Add(-2 * time.Minute)) { + delete(consumedAccessLogs, k) + } + + } + } + }() +} + +func (l *LogWorker) listAccessLogs(s3Prefix string) *[]string { + var al []string + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(l.Configuration.Bucket), + Prefix: aws.String(s3Prefix), + Delimiter: aws.String("/"), + MaxKeys: aws.Int64(200), + } + err := l.S3.ListObjectsV2Pages(input, + func(page *s3.ListObjectsV2Output, lastPage bool) bool { + for _, val := range page.Contents { + accessLog := strings.Split(*val.Key, "/")[len(strings.Split(*val.Key, "/"))-1] + al = append(al, accessLog) + } + return true + }) + if err != nil { + fmt.Println(err) + } + return &al +} + func (a *AccessLogFilter) AccesslogPath(prefix string) string { return filepath.Join(prefix, fmt.Sprintf("AWSLogs/%s/elasticloadbalancing/%s/%s/", a.AwsAccountID, a.Region, a.StartTime.Format("2006/01/02"))) + "/" @@ -197,7 +272,8 @@ func NewAccessLogFilter() AccessLogFilter { func NewConfiguration() Configuration { return Configuration{ - Bucket: viper.GetString("s3-bucket"), - Prefix: viper.GetString("s3-prefix"), + Bucket: viper.GetString("s3-bucket"), + Prefix: viper.GetString("s3-prefix"), + PollingInterval: viper.GetDuration("polling-interval"), } }