Skip to content

Commit

Permalink
Merge branch 'langgenius:main' into tools
Browse files Browse the repository at this point in the history
  • Loading branch information
forrestlinfeng authored Aug 9, 2024
2 parents de11954 + 425174e commit 356d6c2
Show file tree
Hide file tree
Showing 43 changed files with 1,200 additions and 719 deletions.
3 changes: 2 additions & 1 deletion api/core/app/apps/advanced_chat/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def generate(
)

# get tracing instance
trace_manager = TraceQueueManager(app_id=app_model.id)
user_id = user.id if isinstance(user, Account) else user.session_id
trace_manager = TraceQueueManager(app_model.id, user_id)

if invoke_from == InvokeFrom.DEBUGGER:
# always enable retriever resource in debugger mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
)
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.prompt.utils.prompt_message_util import PromptMessageUtil
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from events.message_event import message_was_created
Expand Down
15 changes: 5 additions & 10 deletions api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from core.app.task_pipeline.workflow_iteration_cycle_manage import WorkflowIterationCycleManage
from core.file.file_obj import FileVar
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.tools.tool_manager import ToolManager
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeType
from core.workflow.nodes.tool.entities import ToolNodeData
Expand All @@ -40,6 +41,7 @@
WorkflowRunStatus,
WorkflowRunTriggeredFrom,
)
from services.workflow_service import WorkflowService


class WorkflowCycleManage(WorkflowIterationCycleManage):
Expand Down Expand Up @@ -97,7 +99,6 @@ def _init_workflow_run(self, workflow: Workflow,

def _workflow_run_success(
self, workflow_run: WorkflowRun,
start_at: float,
total_tokens: int,
total_steps: int,
outputs: Optional[str] = None,
Expand All @@ -107,7 +108,6 @@ def _workflow_run_success(
"""
Workflow run success
:param workflow_run: workflow run
:param start_at: start time
:param total_tokens: total tokens
:param total_steps: total steps
:param outputs: outputs
Expand All @@ -116,7 +116,7 @@ def _workflow_run_success(
"""
workflow_run.status = WorkflowRunStatus.SUCCEEDED.value
workflow_run.outputs = outputs
workflow_run.elapsed_time = time.perf_counter() - start_at
workflow_run.elapsed_time = WorkflowService.get_elapsed_time(workflow_run_id=workflow_run.id)
workflow_run.total_tokens = total_tokens
workflow_run.total_steps = total_steps
workflow_run.finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
Expand All @@ -139,7 +139,6 @@ def _workflow_run_success(

def _workflow_run_failed(
self, workflow_run: WorkflowRun,
start_at: float,
total_tokens: int,
total_steps: int,
status: WorkflowRunStatus,
Expand All @@ -150,7 +149,6 @@ def _workflow_run_failed(
"""
Workflow run failed
:param workflow_run: workflow run
:param start_at: start time
:param total_tokens: total tokens
:param total_steps: total steps
:param status: status
Expand All @@ -159,7 +157,7 @@ def _workflow_run_failed(
"""
workflow_run.status = status.value
workflow_run.error = error
workflow_run.elapsed_time = time.perf_counter() - start_at
workflow_run.elapsed_time = WorkflowService.get_elapsed_time(workflow_run_id=workflow_run.id)
workflow_run.total_tokens = total_tokens
workflow_run.total_steps = total_steps
workflow_run.finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
Expand Down Expand Up @@ -542,7 +540,6 @@ def _handle_workflow_finished(
if isinstance(event, QueueStopEvent):
workflow_run = self._workflow_run_failed(
workflow_run=workflow_run,
start_at=self._task_state.start_at,
total_tokens=self._task_state.total_tokens,
total_steps=self._task_state.total_steps,
status=WorkflowRunStatus.STOPPED,
Expand All @@ -565,7 +562,6 @@ def _handle_workflow_finished(
elif isinstance(event, QueueWorkflowFailedEvent):
workflow_run = self._workflow_run_failed(
workflow_run=workflow_run,
start_at=self._task_state.start_at,
total_tokens=self._task_state.total_tokens,
total_steps=self._task_state.total_steps,
status=WorkflowRunStatus.FAILED,
Expand All @@ -583,7 +579,6 @@ def _handle_workflow_finished(

workflow_run = self._workflow_run_success(
workflow_run=workflow_run,
start_at=self._task_state.start_at,
total_tokens=self._task_state.total_tokens,
total_steps=self._task_state.total_steps,
outputs=outputs,
Expand Down
3 changes: 2 additions & 1 deletion api/core/callback_handler/agent_tool_callback_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from pydantic import BaseModel

from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.tools.entities.tool_entities import ToolInvokeMessage

_TEXT_COLOR_MAPPING = {
Expand Down
3 changes: 2 additions & 1 deletion api/core/llm_generator/llm_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from core.model_runtime.entities.message_entities import SystemPromptMessage, UserPromptMessage
from core.model_runtime.entities.model_entities import ModelType
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.ops.utils import measure_time
from core.prompt.utils.prompt_template_parser import PromptTemplateParser

Expand Down
3 changes: 2 additions & 1 deletion api/core/moderation/input_moderation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from core.app.app_config.entities import AppConfig
from core.moderation.base import ModerationAction, ModerationException
from core.moderation.factory import ModerationFactory
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.ops.utils import measure_time

logger = logging.getLogger(__name__)
Expand Down
14 changes: 13 additions & 1 deletion api/core/ops/entities/trace_entity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from enum import Enum
from typing import Any, Optional, Union

from pydantic import BaseModel, ConfigDict, field_validator
Expand Down Expand Up @@ -105,4 +106,15 @@ class GenerateNameTraceInfo(BaseTraceInfo):
'DatasetRetrievalTraceInfo': DatasetRetrievalTraceInfo,
'ToolTraceInfo': ToolTraceInfo,
'GenerateNameTraceInfo': GenerateNameTraceInfo,
}
}


class TraceTaskName(str, Enum):
CONVERSATION_TRACE = 'conversation'
WORKFLOW_TRACE = 'workflow'
MESSAGE_TRACE = 'message'
MODERATION_TRACE = 'moderation'
SUGGESTED_QUESTION_TRACE = 'suggested_question'
DATASET_RETRIEVAL_TRACE = 'dataset_retrieval'
TOOL_TRACE = 'tool'
GENERATE_NAME_TRACE = 'generate_conversation_name'
42 changes: 20 additions & 22 deletions api/core/ops/langfuse_trace/entities/langfuse_trace_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ class LangfuseTrace(BaseModel):
"""
Langfuse trace model
"""

id: Optional[str] = Field(
default=None,
description="The id of the trace can be set, defaults to a random id. Used to link traces to external systems "
"or when creating a distributed trace. Traces are upserted on id.",
"or when creating a distributed trace. Traces are upserted on id.",
)
name: Optional[str] = Field(
default=None,
Expand All @@ -68,7 +69,7 @@ class LangfuseTrace(BaseModel):
metadata: Optional[dict[str, Any]] = Field(
default=None,
description="Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated "
"via the API.",
"via the API.",
)
user_id: Optional[str] = Field(
default=None,
Expand All @@ -81,22 +82,22 @@ class LangfuseTrace(BaseModel):
version: Optional[str] = Field(
default=None,
description="The version of the trace type. Used to understand how changes to the trace type affect metrics. "
"Useful in debugging.",
"Useful in debugging.",
)
release: Optional[str] = Field(
default=None,
description="The release identifier of the current deployment. Used to understand how changes of different "
"deployments affect metrics. Useful in debugging.",
"deployments affect metrics. Useful in debugging.",
)
tags: Optional[list[str]] = Field(
default=None,
description="Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET "
"API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.",
"API. Tags can also be changed in the UI. Tags are merged and never deleted via the API.",
)
public: Optional[bool] = Field(
default=None,
description="You can make a trace public to share it via a public link. This allows others to view the trace "
"without needing to log in or be members of your Langfuse project.",
"without needing to log in or be members of your Langfuse project.",
)

@field_validator("input", "output")
Expand All @@ -109,6 +110,7 @@ class LangfuseSpan(BaseModel):
"""
Langfuse span model
"""

id: Optional[str] = Field(
default=None,
description="The id of the span can be set, otherwise a random id is generated. Spans are upserted on id.",
Expand Down Expand Up @@ -140,17 +142,17 @@ class LangfuseSpan(BaseModel):
metadata: Optional[dict[str, Any]] = Field(
default=None,
description="Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated "
"via the API.",
"via the API.",
)
level: Optional[str] = Field(
default=None,
description="The level of the span. Can be DEBUG, DEFAULT, WARNING or ERROR. Used for sorting/filtering of "
"traces with elevated error levels and for highlighting in the UI.",
"traces with elevated error levels and for highlighting in the UI.",
)
status_message: Optional[str] = Field(
default=None,
description="The status message of the span. Additional field for context of the event. E.g. the error "
"message of an error event.",
"message of an error event.",
)
input: Optional[Union[str, dict[str, Any], list, None]] = Field(
default=None, description="The input of the span. Can be any JSON object."
Expand All @@ -161,7 +163,7 @@ class LangfuseSpan(BaseModel):
version: Optional[str] = Field(
default=None,
description="The version of the span type. Used to understand how changes to the span type affect metrics. "
"Useful in debugging.",
"Useful in debugging.",
)
parent_observation_id: Optional[str] = Field(
default=None,
Expand All @@ -185,10 +187,9 @@ class UnitEnum(str, Enum):
class GenerationUsage(BaseModel):
promptTokens: Optional[int] = None
completionTokens: Optional[int] = None
totalTokens: Optional[int] = None
total: Optional[int] = None
input: Optional[int] = None
output: Optional[int] = None
total: Optional[int] = None
unit: Optional[UnitEnum] = None
inputCost: Optional[float] = None
outputCost: Optional[float] = None
Expand Down Expand Up @@ -224,15 +225,13 @@ class LangfuseGeneration(BaseModel):
completion_start_time: Optional[datetime | str] = Field(
default=None,
description="The time at which the completion started (streaming). Set it to get latency analytics broken "
"down into time until completion started and completion duration.",
"down into time until completion started and completion duration.",
)
end_time: Optional[datetime | str] = Field(
default=None,
description="The time at which the generation ended. Automatically set by generation.end().",
)
model: Optional[str] = Field(
default=None, description="The name of the model used for the generation."
)
model: Optional[str] = Field(default=None, description="The name of the model used for the generation.")
model_parameters: Optional[dict[str, Any]] = Field(
default=None,
description="The parameters of the model used for the generation; can be any key-value pairs.",
Expand All @@ -248,27 +247,27 @@ class LangfuseGeneration(BaseModel):
usage: Optional[GenerationUsage] = Field(
default=None,
description="The usage object supports the OpenAi structure with tokens and a more generic version with "
"detailed costs and units.",
"detailed costs and units.",
)
metadata: Optional[dict[str, Any]] = Field(
default=None,
description="Additional metadata of the generation. Can be any JSON object. Metadata is merged when being "
"updated via the API.",
"updated via the API.",
)
level: Optional[LevelEnum] = Field(
default=None,
description="The level of the generation. Can be DEBUG, DEFAULT, WARNING or ERROR. Used for sorting/filtering "
"of traces with elevated error levels and for highlighting in the UI.",
"of traces with elevated error levels and for highlighting in the UI.",
)
status_message: Optional[str] = Field(
default=None,
description="The status message of the generation. Additional field for context of the event. E.g. the error "
"message of an error event.",
"message of an error event.",
)
version: Optional[str] = Field(
default=None,
description="The version of the generation type. Used to understand how changes to the span type affect "
"metrics. Useful in debugging.",
"metrics. Useful in debugging.",
)

model_config = ConfigDict(protected_namespaces=())
Expand All @@ -277,4 +276,3 @@ class LangfuseGeneration(BaseModel):
def ensure_dict(cls, v, info: ValidationInfo):
field_name = info.field_name
return validate_input_output(v, field_name)

Loading

0 comments on commit 356d6c2

Please sign in to comment.