Skip to content

Commit

Permalink
Use workflow_run_id as workflow trace ID or message_id if present
Browse files Browse the repository at this point in the history
  • Loading branch information
Lothiraldan committed Dec 17, 2024
1 parent cd24e54 commit c4fb842
Showing 1 changed file with 38 additions and 20 deletions.
58 changes: 38 additions & 20 deletions api/core/ops/opik_trace/opik_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def wrap_dict(key_name, data):

def wrap_metadata(metadata, **kwargs):
"""Add common metatada to all Traces and Spans"""
metadata["created_from"] = "opik"
metadata["created_from"] = "dify"

metadata.update(kwargs)

Expand Down Expand Up @@ -75,40 +75,58 @@ def trace(self, trace_info: BaseTraceInfo):
self.generate_name_trace(trace_info)

def workflow_trace(self, trace_info: WorkflowTraceInfo):
dify_trace_id = trace_info.message_id or trace_info.workflow_app_log_id or trace_info.workflow_run_id
dify_trace_id = trace_info.workflow_run_id
opik_trace_id = uuid4_to_uuid7(trace_info.start_time, dify_trace_id)
workflow_metadata = wrap_metadata(
trace_info.metadata, message_id=trace_info.message_id, workflow_app_log_id=trace_info.workflow_app_log_id
)
root_span_id = None

if trace_info.message_id:
dify_trace_id = trace_info.message_id
opik_trace_id = uuid4_to_uuid7(trace_info.start_time, dify_trace_id)

trace_data = {
"id": opik_trace_id,
"name": TraceTaskName.MESSAGE_TRACE.value,
"start_time": trace_info.start_time,
"end_time": trace_info.end_time,
"metadata": wrap_metadata(trace_info.metadata, message_id=trace_info.message_id),
"metadata": workflow_metadata,
"input": wrap_dict("input", trace_info.workflow_run_inputs),
"output": wrap_dict("output", trace_info.workflow_run_outputs),
"tags": ["message", "workflow"],
"project_name": self.project,
}
self.add_trace(trace_data)

span_id = trace_info.workflow_app_log_id or trace_info.workflow_run_id
span_data = {
"trace_id": opik_trace_id,
"id": uuid4_to_uuid7(trace_info.start_time, span_id),
"parent_span_id": None,
"name": TraceTaskName.WORKFLOW_TRACE.value,
"type": "tool",
"start_time": trace_info.start_time,
"end_time": trace_info.end_time,
"metadata": wrap_metadata(trace_info.metadata),
"input": wrap_dict("input", trace_info.workflow_run_inputs),
"output": wrap_dict("output", trace_info.workflow_run_outputs),
"tags": ["workflow"],
"project_name": self.project,
}

self.add_span(span_data)
root_span_id = uuid4_to_uuid7(trace_info.start_time, trace_info.workflow_run_id)
span_data = {
"id": root_span_id,
"parent_span_id": None,
"trace_id": opik_trace_id,
"name": TraceTaskName.WORKFLOW_TRACE.value,
"input": wrap_dict("input", trace_info.workflow_run_inputs),
"output": wrap_dict("output", trace_info.workflow_run_outputs),
"start_time": trace_info.start_time,
"end_time": trace_info.end_time,
"metadata": workflow_metadata,
"tags": ["workflow"],
"project_name": self.project,
}
self.add_span(span_data)
else:
trace_data = {
"id": opik_trace_id,
"name": TraceTaskName.MESSAGE_TRACE.value,
"start_time": trace_info.start_time,
"end_time": trace_info.end_time,
"metadata": workflow_metadata,
"input": wrap_dict("input", trace_info.workflow_run_inputs),
"output": wrap_dict("output", trace_info.workflow_run_outputs),
"tags": ["workflow"],
"project_name": self.project,
}
self.add_trace(trace_data)

# through workflow_run_id get all_nodes_execution
workflow_nodes_execution_id_records = (
Expand Down

0 comments on commit c4fb842

Please sign in to comment.