Skip to content

Commit

Permalink
feat: add prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
刘江波 committed Dec 20, 2024
1 parent dacd457 commit 16e22f0
Show file tree
Hide file tree
Showing 17 changed files with 506 additions and 28 deletions.
4 changes: 3 additions & 1 deletion api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -427,4 +427,6 @@ CREATE_TIDB_SERVICE_JOB_ENABLED=false
# Maximum number of submitted thread count in a ThreadPool for parallel node execution
MAX_SUBMIT_COUNT=100
# Lockout duration in seconds
LOGIN_LOCKOUT_DURATION=86400
LOGIN_LOCKOUT_DURATION=86400

PROMETHEUS_MULTIPROC_DIR=/tmp/prometheus_multiproc_dir
28 changes: 21 additions & 7 deletions api/configs/feature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ class SecurityConfig(BaseSettings):

SECRET_KEY: str = Field(
description="Secret key for secure session cookie signing."
"Make sure you are changing this key for your deployment with a strong key."
"Generate a strong key using `openssl rand -base64 42` or set via the `SECRET_KEY` environment variable.",
"Make sure you are changing this key for your deployment with a strong key."
"Generate a strong key using `openssl rand -base64 42` "
"or set via the `SECRET_KEY` environment variable.",
default="",
)

Expand Down Expand Up @@ -141,7 +142,7 @@ class EndpointConfig(BaseSettings):

CONSOLE_API_URL: str = Field(
description="Base URL for the console API,"
"used for login authentication callback or notion integration callbacks",
"used for login authentication callback or notion integration callbacks",
default="",
)

Expand All @@ -168,8 +169,8 @@ class FileAccessConfig(BaseSettings):

FILES_URL: str = Field(
description="Base URL for file preview or download,"
" used for frontend display and multi-model inputs"
"Url is signed and has expiration time.",
" used for frontend display and multi-model inputs"
"Url is signed and has expiration time.",
validation_alias=AliasChoices("FILES_URL", "CONSOLE_API_URL"),
alias_priority=1,
default="",
Expand Down Expand Up @@ -318,7 +319,7 @@ def WEB_API_CORS_ALLOW_ORIGINS(self) -> list[str]:

RESPECT_XFORWARD_HEADERS_ENABLED: bool = Field(
description="Enable or disable the X-Forwarded-For Proxy Fix middleware from Werkzeug"
" to respect X-* headers to redirect clients",
" to respect X-* headers to redirect clients",
default=False,
)

Expand Down Expand Up @@ -592,7 +593,7 @@ class RagEtlConfig(BaseSettings):

KEYWORD_DATA_SOURCE_TYPE: str = Field(
description="Data source type for keyword extraction"
" ('database' or other supported types), default to 'database'",
" ('database' or other supported types), default to 'database'",
default="database",
)

Expand Down Expand Up @@ -767,6 +768,18 @@ class LoginConfig(BaseSettings):
)


class PrometheusConfig(BaseSettings):
HISTOGRAM_BUCKETS_1MIN: list[float] = Field(
description="The buckets of Prometheus histogram under 1 minute",
default=[0.1, 0.2, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 40, 50, 60],
)

HISTOGRAM_BUCKETS_5MIN: list[float] = Field(
description="The buckets of Prometheus histogram under 5 minute",
default=[0.1, 0.2, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 40, 50, 60, 120, 180, 300],
)


class FeatureConfig(
# place the configs in alphabet order
AppExecutionConfig,
Expand Down Expand Up @@ -794,6 +807,7 @@ class FeatureConfig(
WorkflowNodeExecutionConfig,
WorkspaceConfig,
LoginConfig,
PrometheusConfig,
# hosted services config
HostedServiceConfig,
CeleryBeatConfig,
Expand Down
11 changes: 9 additions & 2 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,9 @@ def _process_stream_response(
conversation_id=self._conversation.id,
trace_manager=trace_manager,
)

self._workflow_time_it(
is_success=True, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)
yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
Expand All @@ -421,6 +423,9 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)
self._workflow_time_it(
is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)

yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
Expand All @@ -445,7 +450,9 @@ def _process_stream_response(
trace_manager=trace_manager,
exceptions_count=event.exceptions_count,
)

self._workflow_time_it(
is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)
yield self._workflow_finish_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
)
Expand Down
6 changes: 6 additions & 0 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)
self._workflow_time_it(
is_success=True, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)

# save workflow app log
self._save_workflow_app_log(workflow_run)
Expand All @@ -381,6 +384,9 @@ def _process_stream_response(
conversation_id=None,
trace_manager=trace_manager,
)
self._workflow_time_it(
is_success=False, graph_runtime_state=graph_runtime_state, workflow_run=workflow_run
)

# save workflow app log
self._save_workflow_app_log(workflow_run)
Expand Down
36 changes: 36 additions & 0 deletions api/core/app/task_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from prometheus_client import Counter, Histogram

from configs import dify_config

app_request = Counter(
name="app_request",
documentation="The total count of APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_request_failed = Counter(
name="app_request_failed",
documentation="The failed count of APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_request_latency = Histogram(
name="app_request_latency",
documentation="The latency of APP requests",
unit="seconds",
labelnames=["app_id", "tenant_id", "username"],
buckets=dify_config.HISTOGRAM_BUCKETS_5MIN,
)
app_input_tokens = Counter(
name="app_input_tokens",
documentation="The input tokens cost by APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_output_tokens = Counter(
name="app_output_tokens",
documentation="The output tokens cost by APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
app_total_tokens = Counter(
name="app_total_tokens",
documentation="The total tokens cost by APP requests",
labelnames=["app_id", "tenant_id", "username"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
MessageEndStreamResponse,
StreamResponse,
)
from core.app.task_pipeline import (
app_input_tokens,
app_output_tokens,
app_request,
app_request_failed,
app_request_latency,
app_total_tokens,
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manage import MessageCycleManage
from core.model_manager import ModelInstance
Expand Down Expand Up @@ -251,6 +259,47 @@ def _wrapper_process_stream_response(
if publisher:
yield MessageAudioEndStreamResponse(audio="", task_id=task_id)

def _chat_time_it(self, is_success: bool) -> None:
"""
Record chat / completion / agent run metrics.
"""
app_id = self._app_config.app_id
tenant_id = self._app_config.tenant_id
username = self._conversation.from_account_name
app_request.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc()

if not is_success:
app_request_failed.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc()
return
app_request_latency.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).observe(self._message.provider_response_latency)
app_input_tokens.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc(self._message.message_tokens)
app_output_tokens.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc(self._message.answer_tokens)
app_total_tokens.labels(
app_id=app_id,
tenant_id=tenant_id,
username=username,
).inc(self._message.message_tokens + self._message.answer_tokens)

def _process_stream_response(
self, publisher: AppGeneratorTTSPublisher, trace_manager: Optional[TraceQueueManager] = None
) -> Generator[StreamResponse, None, None]:
Expand All @@ -265,6 +314,7 @@ def _process_stream_response(

if isinstance(event, QueueErrorEvent):
err = self._handle_error(event, self._message)
self._chat_time_it(is_success=False)
yield self._error_to_stream_response(err)
break
elif isinstance(event, QueueStopEvent | QueueMessageEndEvent):
Expand All @@ -283,6 +333,7 @@ def _process_stream_response(

# Save message
self._save_message(trace_manager)
self._chat_time_it(is_success=True)

yield self._message_end_to_stream_response()
elif isinstance(event, QueueRetrieverResourcesEvent):
Expand Down Expand Up @@ -374,7 +425,7 @@ def _save_message(self, trace_manager: Optional[TraceQueueManager] = None) -> No
application_generate_entity=self._application_generate_entity,
conversation=self._conversation,
is_first_message=self._application_generate_entity.app_config.app_mode in {AppMode.AGENT_CHAT, AppMode.CHAT}
and self._application_generate_entity.conversation_id is None,
and self._application_generate_entity.conversation_id is None,
extras=self._application_generate_entity.extras,
)

Expand Down
36 changes: 36 additions & 0 deletions api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,22 @@
WorkflowStartStreamResponse,
WorkflowTaskState,
)
from core.app.task_pipeline import (
app_input_tokens,
app_output_tokens,
app_request,
app_request_failed,
app_request_latency,
app_total_tokens,
)
from core.file import FILE_MODEL_IDENTITY, File
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.tools.tool_manager import ToolManager
from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine import GraphRuntimeState
from core.workflow.nodes import NodeType
from core.workflow.nodes.tool.entities import ToolNodeData
from core.workflow.workflow_entry import WorkflowEntry
Expand Down Expand Up @@ -119,6 +128,33 @@ def _handle_workflow_run_start(self) -> WorkflowRun:

return workflow_run

def _workflow_time_it(
self, is_success: bool, graph_runtime_state: GraphRuntimeState, workflow_run: WorkflowRun
) -> None:
"""
Record advanced-chat / workflow run metrics.
"""
app_id = workflow_run.app_id
tenant_id = workflow_run.tenant_id
username = self._user.name
app_request.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc()

if not is_success:
app_request_failed.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc()
return
app_request_latency.labels(app_id=app_id, tenant_id=tenant_id, username=username).observe(
workflow_run.elapsed_time
)
app_input_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc(
graph_runtime_state.llm_usage.prompt_tokens
)
app_output_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc(
graph_runtime_state.llm_usage.completion_tokens
)
app_total_tokens.labels(app_id=app_id, tenant_id=tenant_id, username=username).inc(
graph_runtime_state.llm_usage.total_tokens
)

def _handle_workflow_run_success(
self,
workflow_run: WorkflowRun,
Expand Down
Loading

0 comments on commit 16e22f0

Please sign in to comment.