Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data clean schedule #1859

Merged
merged 11 commits into from
Jan 2, 2024
2 changes: 2 additions & 0 deletions api/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ fi
if [[ "${MODE}" == "worker" ]]; then
celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} -c ${CELERY_WORKER_AMOUNT:-1} --loglevel INFO \
-Q ${CELERY_QUEUES:-dataset,generation,mail}
elif [[ "${MODE}" == "beat" ]]; then
celery -A app.celery beat --loglevel INFO
else
if [[ "${DEBUG}" == "true" ]]; then
flask run --host=${DIFY_BIND_ADDRESS:-0.0.0.0} --port=${DIFY_PORT:-5001} --debug
Expand Down
23 changes: 23 additions & 0 deletions api/extensions/ext_celery.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta

from celery import Task, Celery
from flask import Flask

Expand Down Expand Up @@ -35,4 +37,25 @@ def __call__(self, *args: object, **kwargs: object) -> object:

celery_app.set_default()
app.extensions["celery"] = celery_app

imports = [
"schedule.clean_embedding_cache_task",
"schedule.clean_unused_datasets_task",
]

beat_schedule = {
'clean_embedding_cache_task': {
'task': 'schedule.clean_embedding_cache_task.clean_embedding_cache_task',
'schedule': timedelta(minutes=1),
},
'clean_unused_datasets_task': {
'task': 'schedule.clean_unused_datasets_task.clean_unused_datasets_task',
'schedule': timedelta(minutes=10),
}
}
celery_app.conf.update(
beat_schedule=beat_schedule,
imports=imports
)

return celery_app
2 changes: 1 addition & 1 deletion api/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ cohere~=4.32
unstructured~=0.10.27
unstructured[docx,pptx,msg,md,ppt]~=0.10.27
bs4~=0.0.1
markdown~=3.5.1
markdown~=3.5.1
29 changes: 29 additions & 0 deletions api/schedule/clean_embedding_cache_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import app
import datetime
import time
import click
from flask import current_app
from werkzeug.exceptions import NotFound
from extensions.ext_database import db
from models.dataset import Embedding


@app.celery.task(queue='dataset')
def clean_embedding_cache_task():
click.echo(click.style('Start clean embedding cache.', fg='green'))
clean_days = int(current_app.config.get('CLEAN_DAY_SETTING'))
start_at = time.perf_counter()
thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
page = 1
while True:
try:
embeddings = db.session.query(Embedding).filter(Embedding.created_at < thirty_days_ago) \
.order_by(Embedding.created_at.desc()).paginate(page=page, per_page=100)
except NotFound:
break
for embedding in embeddings:
db.session.delete(embedding)
db.session.commit()
page += 1
end_at = time.perf_counter()
click.echo(click.style('Cleaned embedding cache from db success latency: {}'.format(end_at - start_at), fg='green'))
69 changes: 69 additions & 0 deletions api/schedule/clean_unused_datasets_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
import app
import datetime
import time
import click
from flask import current_app
from werkzeug.exceptions import NotFound
from core.index.index import IndexBuilder
from extensions.ext_database import db
from models.dataset import Dataset, DatasetQuery, Document, DatasetCollectionBinding


@app.celery.task(queue='dataset')
def clean_unused_datasets_task():
click.echo(click.style('Start clean unused datasets indexes.', fg='green'))
clean_days = int(current_app.config.get('CLEAN_DAY_SETTING'))
start_at = time.perf_counter()
thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
page = 1
while True:
try:
datasets = db.session.query(Dataset).filter(Dataset.created_at < thirty_days_ago) \
.order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50)
except NotFound:
break
page += 1
for dataset in datasets:
dataset_query = db.session.query(DatasetQuery).filter(
DatasetQuery.created_at > thirty_days_ago,
DatasetQuery.dataset_id == dataset.id
).all()
if not dataset_query or len(dataset_query) == 0:
documents = db.session.query(Document).filter(
Document.dataset_id == dataset.id,
Document.indexing_status == 'completed',
Document.enabled == True,
Document.archived == False,
Document.updated_at > thirty_days_ago
).all()
if not documents or len(documents) == 0:
try:
# remove index
vector_index = IndexBuilder.get_index(dataset, 'high_quality')
kw_index = IndexBuilder.get_index(dataset, 'economy')
# delete from vector index
if vector_index:
if dataset.collection_binding_id:
vector_index.delete_by_group_id(dataset.id)
else:
if dataset.collection_binding_id:
vector_index.delete_by_group_id(dataset.id)
else:
vector_index.delete()
kw_index.delete()
# update document
update_params = {
Document.enabled: False
}

Document.query.filter_by(dataset_id=dataset.id).update(update_params)
db.session.commit()
click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id),
fg='green'))
except Exception as e:
click.echo(
click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)),
fg='red'))
end_at = time.perf_counter()
click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green'))
Loading