From ffaf2e442c0ec58f928fd66a0a7acd67d87b950c Mon Sep 17 00:00:00 2001 From: slhmy <1484836413@qq.com> Date: Sun, 22 Oct 2023 15:45:12 +0800 Subject: [PATCH] Perf logger --- application/asynq_worker/main.go | 2 +- application/migrate_db/main.go | 2 +- application/server/handler/event.go | 2 +- application/server/main.go | 4 ++-- core/agent/asynq/middleware.go | 22 +++++++++++++++++ core/agent/asynq/scheduler.go | 2 +- core/log.go | 2 +- core/middleware/error.go | 2 +- service/business/judger_task.go | 37 +++++++++++++---------------- service/user.go | 2 +- 10 files changed, 48 insertions(+), 29 deletions(-) create mode 100644 core/agent/asynq/middleware.go diff --git a/application/asynq_worker/main.go b/application/asynq_worker/main.go index e05941d..c2120fb 100644 --- a/application/asynq_worker/main.go +++ b/application/asynq_worker/main.go @@ -7,7 +7,7 @@ import ( ) func main() { - core.GetAppLogger().Info("Starting task server...") + core.AppLogger().Info("Starting task server...") config := asynqAgent.AsynqServerConfig{ Concurrency: 10, UsePriority: true, diff --git a/application/migrate_db/main.go b/application/migrate_db/main.go index 4863153..c6c02ee 100644 --- a/application/migrate_db/main.go +++ b/application/migrate_db/main.go @@ -58,5 +58,5 @@ func main() { Host: "http://localhost:8000", }) - core.GetAppLogger().Info("migrate tables success") + core.AppLogger().Info("migrate tables success") } diff --git a/application/server/handler/event.go b/application/server/handler/event.go index 6379d54..da33618 100644 --- a/application/server/handler/event.go +++ b/application/server/handler/event.go @@ -34,7 +34,7 @@ func Stream(ginCtx *gin.Context) { ginCtx.Stream(func(w io.Writer) bool { // With event type message := fmt.Sprintf("event: %s\ndata: %s\n\n", "eventType", time.Now().String()) - core.GetAppLogger().Infof("Send message:\n%s", message) + core.AppLogger().Infof("Send message:\n%s", message) fmt.Fprint(w, message) time.Sleep(1 * time.Second) counter++ diff --git a/application/server/main.go b/application/server/main.go index 9590812..ed7c625 100644 --- a/application/server/main.go +++ b/application/server/main.go @@ -46,13 +46,13 @@ func main() { baseRouter := r.Group("/") if serveFrontend { - core.GetAppLogger().Info("Serving frontend...") + core.AppLogger().Info("Serving frontend...") r.LoadHTMLFiles("./frontend/dist/index.html") handler.SetupFrontendRoute(baseRouter) } if swaggerOn { - core.GetAppLogger().Info("Serving swagger Doc...") + core.AppLogger().Info("Serving swagger Doc...") handler.SetupSwaggoRouter(baseRouter) } diff --git a/core/agent/asynq/middleware.go b/core/agent/asynq/middleware.go new file mode 100644 index 0000000..f32f8ee --- /dev/null +++ b/core/agent/asynq/middleware.go @@ -0,0 +1,22 @@ +package asynqAgent + +import ( + "context" + "time" + + "github.com/OJ-lab/oj-lab-services/core" + "github.com/hibiken/asynq" +) + +func LoggingMiddleware(h asynq.Handler) asynq.Handler { + return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error { + start := time.Now() + core.AppLogger().Printf("Start processing %q", t.Type()) + err := h.ProcessTask(ctx, t) + if err != nil { + return err + } + core.AppLogger().Printf("Finished processing %q: Elapsed Time = %v", t.Type(), time.Since(start)) + return nil + }) +} diff --git a/core/agent/asynq/scheduler.go b/core/agent/asynq/scheduler.go index a8bb6de..ba2ff2f 100644 --- a/core/agent/asynq/scheduler.go +++ b/core/agent/asynq/scheduler.go @@ -21,7 +21,7 @@ func RunSecheduler(scheduleTasks ...ScheduleTask) { if err != nil { panic(err) } - core.GetAppLogger().Infof( + core.AppLogger().Infof( "setup schedule task: %s, cronspec: %s, entryID: %s", scheduleTask.Task.Type(), scheduleTask.Cronspec, entryID, ) diff --git a/core/log.go b/core/log.go index 39c1db1..fdcd0e4 100644 --- a/core/log.go +++ b/core/log.go @@ -9,7 +9,7 @@ import ( const logLevelProp = "log.level" -func GetAppLogger() *logrus.Entry { +func AppLogger() *logrus.Entry { return logrus.WithFields(logrus.Fields{ "CALLER": func() string { pc := make([]uintptr, 1) diff --git a/core/middleware/error.go b/core/middleware/error.go index 0bb6ff5..a7324cc 100644 --- a/core/middleware/error.go +++ b/core/middleware/error.go @@ -22,7 +22,7 @@ func HandleError(ginCtx *gin.Context) { errCount := len(ginCtx.Errors) if errCount > 0 { - core.GetAppLogger().Errorf("Last error from GIN middleware: %+v", ginCtx.Errors[errCount-1].Err) + core.AppLogger().Errorf("Last error from GIN middleware: %+v", ginCtx.Errors[errCount-1].Err) err := GetServiceError(*ginCtx.Errors[errCount-1]) ginCtx.JSON(err.Code, gin.H{ "code": err.Code, diff --git a/service/business/judger_task.go b/service/business/judger_task.go index b5cc25b..9a72fd7 100644 --- a/service/business/judger_task.go +++ b/service/business/judger_task.go @@ -37,6 +37,7 @@ func NewTaskJudgerGetState(judger model.Judger) *asynq.Task { func GetAsynqMuxJudger() asynqAgent.AsynqMux { serveMux := asynq.NewServeMux() + serveMux.Use(asynqAgent.LoggingMiddleware) serveMux.HandleFunc(TaskNameJudgerTrackAllState, handleTaskJudgerTrackAllState) serveMux.HandleFunc(TaskNameJudgerGetState, handleTaskJudgerGetState) serveMux.HandleFunc(TaskNameJudgerHandleSubmission, handleTaskJudgerHandleSubmission) @@ -48,13 +49,12 @@ func GetAsynqMuxJudger() asynqAgent.AsynqMux { } func handleTaskJudgerTrackAllState(ctx context.Context, task *asynq.Task) error { - core.GetAppLogger().Info("handleTaskJudgerTrackAllState") db := gormAgent.GetDefaultDB() judgerList, err := mapper.GetJudgerList(db) if err != nil { return err } - core.GetAppLogger().Infof("judger list: %v", judgerList) + core.AppLogger().Infof("judger list: %v", judgerList) asynqClient := asynqAgent.GetDefaultTaskClient() for _, judger := range judgerList { @@ -63,7 +63,7 @@ func handleTaskJudgerTrackAllState(ctx context.Context, task *asynq.Task) error asynq.TaskID(fmt.Sprintf("%s:%s", TaskNameJudgerGetState, judger.Host)), ) if err != nil { - core.GetAppLogger().Errorf("failed to enqueue task %s: %v", TaskNameJudgerGetState, err) + core.AppLogger().Errorf("failed to enqueue task %s: %v", TaskNameJudgerGetState, err) } } @@ -90,22 +90,22 @@ func handleTaskJudgerGetState(ctx context.Context, task *asynq.Task) error { return err } judgerState := model.StringToJudgerState(judgerStateString) - core.GetAppLogger().Debugf("Get Judger %v state=%v", judgerClient.Host, judgerState) + core.AppLogger().Debugf("Get Judger %v state=%v", judgerClient.Host, judgerState) if judgerState == model.JudgerStateIdle { - core.GetAppLogger().Debugf("Judger %v is idle, try find submission to handle", judgerClient.Host) + core.AppLogger().Debugf("Judger %v is idle, try find submission to handle", judgerClient.Host) asynqClient := asynqAgent.GetDefaultTaskClient() err := asynqClient.EnqueueTask( NewTaskJudgerHandleSubmission(judger), asynq.TaskID(fmt.Sprintf("%s:%s", TaskNameJudgerHandleSubmission, judger.Host)), ) if err != nil { - core.GetAppLogger().Errorf("failed to enqueue task %s: %v", TaskNameJudgerHandleSubmission, err) + core.AppLogger().Errorf("failed to enqueue task %s: %v", TaskNameJudgerHandleSubmission, err) } } if !judger.State.CanUpdate(model.JudgerStateIdle) { - core.GetAppLogger().Debugf("Judger state is invalid or don't need update, ignoring this state update") + core.AppLogger().Debugf("Judger state is invalid or don't need update, ignoring this state update") return nil } judger.State = judgerState @@ -121,7 +121,6 @@ func handleTaskJudgerGetState(ctx context.Context, task *asynq.Task) error { if err != nil { return err } - core.GetAppLogger().Debugf("Successfully handled task %s", task.Type()) return nil } @@ -139,7 +138,6 @@ func handleTaskJudgerHandleSubmission(ctx context.Context, task *asynq.Task) err if err := json.Unmarshal(task.Payload(), &judger); err != nil { return err } - core.GetAppLogger().Debugf("handleTaskJudgerHandleSubmission judger=%v", judger) err := db.Transaction(func(tx *gorm.DB) error { if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("host = ?", judger.Host).First(&judger).Error; err != nil { @@ -159,10 +157,10 @@ func handleTaskJudgerHandleSubmission(ctx context.Context, task *asynq.Task) err }(), }, false).First(&submission).Error; err != nil { if err == gorm.ErrRecordNotFound { - core.GetAppLogger().Debugf("No submission to handle, ignoring this task") + core.AppLogger().Debugf("No submission to handle, ignoring this task") return nil } - core.GetAppLogger().Errorf("failed to get submission: %v", err) + core.AppLogger().Errorf("failed to get submission: %v", err) return err } @@ -175,24 +173,24 @@ func handleTaskJudgerHandleSubmission(ctx context.Context, task *asynq.Task) err } judgerState := model.StringToJudgerState(judgerStateString) if judgerState != model.JudgerStateIdle { - core.GetAppLogger().Debugf("Judger %v is not idle, ignoring this task", judgerClient.Host) + core.AppLogger().Debugf("Judger %v is not idle, ignoring this task", judgerClient.Host) return nil } - core.GetAppLogger().Debugf("Get Judger %v state=%v", judgerClient.Host, judgerState) + core.AppLogger().Debugf("Get Judger %v state=%v", judgerClient.Host, judgerState) judger.State = model.JudgerStateBusy err = tx.Model(&judger).Update("state", judger.State).Error if err != nil { - core.GetAppLogger().Errorf("failed to update judger state: %v", err) + core.AppLogger().Errorf("failed to update judger state: %v", err) return err } submission.Status = model.SubmissionStatusRunning err = tx.Model(&submission).Update("status", submission.Status).Error if err != nil { - core.GetAppLogger().Errorf("failed to update submission status: %v", err) + core.AppLogger().Errorf("failed to update submission status: %v", err) return err } - core.GetAppLogger().Debugf("Judger %v is busy, start to handle submission %v", judgerClient.Host, submission.UID) + core.AppLogger().Debugf("Judger %v is busy, start to handle submission %v", judgerClient.Host, submission.UID) judgeVerdict, err := judgerClient.PostJudgeSync( submission.ProblemSlug, judgerAgent.JudgeRequest{ @@ -201,15 +199,15 @@ func handleTaskJudgerHandleSubmission(ctx context.Context, task *asynq.Task) err }, ) if err != nil { - core.GetAppLogger().Errorf("failed to judge submission: %v", err) + core.AppLogger().Errorf("failed to judge submission: %v", err) return err } - core.GetAppLogger().Debugf("Get Judger %v verdict=%v", judgerClient.Host, judgeVerdict) + core.AppLogger().Debugf("Get Judger %v verdict=%v", judgerClient.Host, judgeVerdict) submission.Status = model.SubmissionStatusFinished verdictBytes, err := json.Marshal(judgeVerdict) if err != nil { - core.GetAppLogger().Errorf("failed to marshal verdict: %v", err) + core.AppLogger().Errorf("failed to marshal verdict: %v", err) return err } err = tx.Model(&submission).Updates(map[string]interface{}{ @@ -231,6 +229,5 @@ func handleTaskJudgerHandleSubmission(ctx context.Context, task *asynq.Task) err if err != nil { return err } - core.GetAppLogger().Debugf("Successfully handled task %s", task.Type()) return nil } diff --git a/service/user.go b/service/user.go index 2ffe640..b7b199a 100644 --- a/service/user.go +++ b/service/user.go @@ -31,7 +31,7 @@ func CheckUserExist(ctx context.Context, account string) (bool, error) { } if count > 1 { - core.GetAppLogger().Warnf("user %s has %d records", account, count) + core.AppLogger().Warnf("user %s has %d records", account, count) } return count > 0, nil