Skip to content

Commit

Permalink
Merge branch 'feat/add-langsmith-dotted-order' into deploy/dev
Browse files Browse the repository at this point in the history
* feat/add-langsmith-dotted-order:
  feat: add langsmith dotted order
  Feat/clean message records (#10588)
  Feat/account not found (#10804)
  • Loading branch information
ZhouhaoJiang committed Nov 18, 2024
2 parents 7e92517 + ff110e9 commit 7ce7222
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 9 deletions.
5 changes: 5 additions & 0 deletions api/configs/feature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ class DataSetConfig(BaseSettings):
default=False,
)

PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING: PositiveInt = Field(
description="Interval in days for message cleanup operations - plan: sandbox",
default=30,
)


class WorkspaceConfig(BaseSettings):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class LangSmithRunModel(LangSmithTokenUsage, LangSmithMultiModel):
reference_example_id: Optional[str] = Field(None, description="Reference example ID associated with the run")
input_attachments: Optional[dict[str, Any]] = Field(None, description="Input attachments of the run")
output_attachments: Optional[dict[str, Any]] = Field(None, description="Output attachments of the run")
dotted_order: Optional[str] = Field(None, description="Dotted order of the run")

@field_validator("inputs", "outputs")
@classmethod
Expand Down
42 changes: 41 additions & 1 deletion api/core/ops/langsmith_trace/langsmith_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
LangSmithRunType,
LangSmithRunUpdateModel,
)
from core.ops.utils import filter_none_values
from core.ops.utils import filter_none_values, generate_dotted_order
from extensions.ext_database import db
from models.model import EndUser, MessageFile
from models.workflow import WorkflowNodeExecution
Expand Down Expand Up @@ -63,6 +63,7 @@ def trace(self, trace_info: BaseTraceInfo):

def workflow_trace(self, trace_info: WorkflowTraceInfo):
if trace_info.message_id:
dotted_order = generate_dotted_order(trace_info.message_id, trace_info.start_time)
message_run = LangSmithRunModel(
id=trace_info.message_id,
name=TraceTaskName.MESSAGE_TRACE.value,
Expand All @@ -76,9 +77,13 @@ def workflow_trace(self, trace_info: WorkflowTraceInfo):
},
tags=["message", "workflow"],
error=trace_info.error,
dotted_order=dotted_order,
)
self.add_run(message_run)

workflow_dotted_order = generate_dotted_order(
trace_info.workflow_app_log_id or trace_info.workflow_run_id, trace_info.workflow_data.created_at
)
langsmith_run = LangSmithRunModel(
file_list=trace_info.file_list,
total_tokens=trace_info.total_tokens,
Expand All @@ -95,6 +100,7 @@ def workflow_trace(self, trace_info: WorkflowTraceInfo):
error=trace_info.error,
tags=["workflow"],
parent_run_id=trace_info.message_id or None,
dotted_order=workflow_dotted_order,
)

self.add_run(langsmith_run)
Expand Down Expand Up @@ -177,6 +183,7 @@ def workflow_trace(self, trace_info: WorkflowTraceInfo):
else:
run_type = LangSmithRunType.tool

node_dotted_order = generate_dotted_order(node_execution_id, created_at, workflow_dotted_order)
langsmith_run = LangSmithRunModel(
total_tokens=node_total_tokens,
name=node_type,
Expand All @@ -191,6 +198,7 @@ def workflow_trace(self, trace_info: WorkflowTraceInfo):
},
parent_run_id=trace_info.workflow_app_log_id or trace_info.workflow_run_id,
tags=["node_execution"],
dotted_order=node_dotted_order,
)

self.add_run(langsmith_run)
Expand All @@ -216,6 +224,7 @@ def message_trace(self, trace_info: MessageTraceInfo):
end_user_id = end_user_data.session_id
metadata["end_user_id"] = end_user_id

dotted_order = generate_dotted_order(message_id, trace_info.start_time)
message_run = LangSmithRunModel(
input_tokens=trace_info.message_tokens,
output_tokens=trace_info.answer_tokens,
Expand All @@ -233,10 +242,12 @@ def message_trace(self, trace_info: MessageTraceInfo):
tags=["message", str(trace_info.conversation_mode)],
error=trace_info.error,
file_list=file_list,
dotted_order=dotted_order,
)
self.add_run(message_run)

# create llm run parented to message run
llm_dotted_order = generate_dotted_order(message_id, trace_info.start_time, dotted_order)
llm_run = LangSmithRunModel(
input_tokens=trace_info.message_tokens,
output_tokens=trace_info.answer_tokens,
Expand All @@ -254,10 +265,16 @@ def message_trace(self, trace_info: MessageTraceInfo):
tags=["llm", str(trace_info.conversation_mode)],
error=trace_info.error,
file_list=file_list,
dotted_order=llm_dotted_order,
)
self.add_run(llm_run)

def moderation_trace(self, trace_info: ModerationTraceInfo):
parent_run_id = trace_info.message_id
parent_dotted_order = generate_dotted_order(parent_run_id, trace_info.start_time)
moderation_dotted_order = generate_dotted_order(
trace_info.message_id, trace_info.start_time, parent_dotted_order
)
langsmith_run = LangSmithRunModel(
name=TraceTaskName.MODERATION_TRACE.value,
inputs=trace_info.inputs,
Expand All @@ -275,11 +292,17 @@ def moderation_trace(self, trace_info: ModerationTraceInfo):
parent_run_id=trace_info.message_id,
start_time=trace_info.start_time or trace_info.message_data.created_at,
end_time=trace_info.end_time or trace_info.message_data.updated_at,
dotted_order=moderation_dotted_order,
)

self.add_run(langsmith_run)

def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
parent_run_id = trace_info.message_id
parent_dotted_order = generate_dotted_order(parent_run_id, trace_info.start_time)
suggested_question_dotted_order = generate_dotted_order(
trace_info.message_id, trace_info.start_time, parent_dotted_order
)
message_data = trace_info.message_data
suggested_question_run = LangSmithRunModel(
name=TraceTaskName.SUGGESTED_QUESTION_TRACE.value,
Expand All @@ -293,11 +316,17 @@ def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
parent_run_id=trace_info.message_id,
start_time=trace_info.start_time or message_data.created_at,
end_time=trace_info.end_time or message_data.updated_at,
dotted_order=suggested_question_dotted_order,
)

self.add_run(suggested_question_run)

def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
parent_run_id = trace_info.message_id
parent_dotted_order = generate_dotted_order(parent_run_id, trace_info.start_time)
dataset_retrieval_dotted_order = generate_dotted_order(
trace_info.message_id, trace_info.start_time, parent_dotted_order
)
dataset_retrieval_run = LangSmithRunModel(
name=TraceTaskName.DATASET_RETRIEVAL_TRACE.value,
inputs=trace_info.inputs,
Expand All @@ -310,11 +339,15 @@ def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
parent_run_id=trace_info.message_id,
start_time=trace_info.start_time or trace_info.message_data.created_at,
end_time=trace_info.end_time or trace_info.message_data.updated_at,
dotted_order=dataset_retrieval_dotted_order,
)

self.add_run(dataset_retrieval_run)

def tool_trace(self, trace_info: ToolTraceInfo):
parent_run_id = trace_info.message_id
parent_dotted_order = generate_dotted_order(parent_run_id, trace_info.start_time)
tool_dotted_order = generate_dotted_order(trace_info.message_id, trace_info.start_time, parent_dotted_order)
tool_run = LangSmithRunModel(
name=trace_info.tool_name,
inputs=trace_info.tool_inputs,
Expand All @@ -328,11 +361,17 @@ def tool_trace(self, trace_info: ToolTraceInfo):
start_time=trace_info.start_time,
end_time=trace_info.end_time,
file_list=[trace_info.file_url],
dotted_order=tool_dotted_order,
)

self.add_run(tool_run)

def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
parent_run_id = trace_info.message_id
parent_dotted_order = generate_dotted_order(parent_run_id, trace_info.start_time)
generate_name_dotted_order = generate_dotted_order(
trace_info.message_id, trace_info.start_time, parent_dotted_order
)
name_run = LangSmithRunModel(
name=TraceTaskName.GENERATE_NAME_TRACE.value,
inputs=trace_info.inputs,
Expand All @@ -344,6 +383,7 @@ def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
tags=["generate_name"],
start_time=trace_info.start_time or datetime.now(),
end_time=trace_info.end_time or datetime.now(),
dotted_order=generate_name_dotted_order,
)

self.add_run(name_run)
Expand Down
14 changes: 14 additions & 0 deletions api/core/ops/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from contextlib import contextmanager
from datetime import datetime
from typing import Optional

from extensions.ext_database import db
from models.model import Message
Expand Down Expand Up @@ -43,3 +44,16 @@ def replace_text_with_content(data):
return [replace_text_with_content(item) for item in data]
else:
return data


def generate_dotted_order(run_id: str, start_time: datetime, parent_dotted_order: Optional[str] = None) -> str:
"""
generate dotted_order for langsmith
"""
timestamp = start_time.strftime("%Y%m%dT%H%M%S%f")[:-3] + "Z"
current_segment = f"{timestamp}{run_id}"

if parent_dotted_order is None:
return current_segment

return f"{parent_dotted_order}.{current_segment}"
5 changes: 5 additions & 0 deletions api/extensions/ext_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __call__(self, *args: object, **kwargs: object) -> object:
"schedule.clean_unused_datasets_task",
"schedule.create_tidb_serverless_task",
"schedule.update_tidb_serverless_status_task",
"schedule.clean_messages",
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME
beat_schedule = {
Expand All @@ -87,6 +88,10 @@ def __call__(self, *args: object, **kwargs: object) -> object:
"task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",
"schedule": crontab(minute="30", hour="*"),
},
"clean_messages": {
"task": "schedule.clean_messages.clean_messages",
"schedule": timedelta(days=day),
},
}
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""add_created_at_index_for_messages
Revision ID: 01d6889832f7
Revises: 09a8d1878d9b
Create Date: 2024-11-12 09:25:05.527827
"""
from alembic import op
import models as models
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '01d6889832f7'
down_revision = '09a8d1878d9b'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('messages', schema=None) as batch_op:
batch_op.create_index('message_created_at_idx', ['created_at'], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('messages', schema=None) as batch_op:
batch_op.drop_index('message_created_at_idx')
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions api/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ class Message(db.Model):
db.Index("message_end_user_idx", "app_id", "from_source", "from_end_user_id"),
db.Index("message_account_idx", "app_id", "from_source", "from_account_id"),
db.Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"),
db.Index("message_created_at_idx", "created_at"),
)

id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))
Expand Down
79 changes: 79 additions & 0 deletions api/schedule/clean_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import datetime
import time

import click
from werkzeug.exceptions import NotFound

import app
from configs import dify_config
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.model import (
App,
Message,
MessageAgentThought,
MessageAnnotation,
MessageChain,
MessageFeedback,
MessageFile,
)
from models.web import SavedMessage
from services.feature_service import FeatureService


@app.celery.task(queue="dataset")
def clean_messages():
click.echo(click.style("Start clean messages.", fg="green"))
start_at = time.perf_counter()
plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
)
page = 1
while True:
try:
# Main query with join and filter
messages = (
db.session.query(Message)
.filter(Message.created_at < plan_sandbox_clean_message_day)
.order_by(Message.created_at.desc())
.paginate(page=page, per_page=100)
)

except NotFound:
break
if messages.items is None or len(messages.items) == 0:
break
for message in messages.items:
app = App.query.filter_by(id=message.app_id).first()
features_cache_key = f"features:{app.tenant_id}"
plan_cache = redis_client.get(features_cache_key)
if plan_cache is None:
features = FeatureService.get_features(app.tenant_id)
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
plan = features.billing.subscription.plan
else:
plan = plan_cache.decode()
if plan == "sandbox":
# clean related message
db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(Message).filter(Message.id == message.id).delete()
db.session.commit()
end_at = time.perf_counter()
click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))
Loading

0 comments on commit 7ce7222

Please sign in to comment.