Skip to content

Commit

Permalink
Merge branch 'feat/support-parent-child-chunk' into deploy/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnJyong committed Dec 5, 2024
2 parents 726649e + 206684d commit 0475c4e
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 62 deletions.
13 changes: 13 additions & 0 deletions api/controllers/console/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,18 @@ def get(self, dataset_id):
}, 200


class DatasetAutoDisableLogApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self, dataset_id):
dataset_id_str = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id_str)
if dataset is None:
raise NotFound("Dataset not found.")
return DatasetService.get_dataset_auto_disable_logs(dataset_id_str), 200


api.add_resource(DatasetListApi, "/datasets")
api.add_resource(DatasetApi, "/datasets/<uuid:dataset_id>")
api.add_resource(DatasetUseCheckApi, "/datasets/<uuid:dataset_id>/use-check")
Expand All @@ -747,3 +759,4 @@ def get(self, dataset_id):
api.add_resource(DatasetRetrievalSettingApi, "/datasets/retrieval-setting")
api.add_resource(DatasetRetrievalSettingMockApi, "/datasets/retrieval-setting/<string:vector_type>")
api.add_resource(DatasetPermissionUserListApi, "/datasets/<uuid:dataset_id>/permission-part-users")
api.add_resource(DatasetAutoDisableLogApi, "/datasets/<uuid:dataset_id>/auto-disable-logs")
116 changes: 55 additions & 61 deletions api/controllers/console/datasets/datasets_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,9 +758,8 @@ class DocumentStatusApi(DocumentResource):
@login_required
@account_initialization_required
@cloud_edition_billing_resource_check("vector_space")
def patch(self, dataset_id, document_id, action):
def patch(self, dataset_id, action):
dataset_id = str(dataset_id)
document_id = str(document_id)
dataset = DatasetService.get_dataset(dataset_id)
if dataset is None:
raise NotFound("Dataset not found.")
Expand All @@ -775,84 +774,79 @@ def patch(self, dataset_id, document_id, action):
# check user's permission
DatasetService.check_dataset_permission(dataset, current_user)

document = self.get_document(dataset_id, document_id)
document_ids = request.args.getlist("document_id")
for document_id in document_ids:
document = self.get_document(dataset_id, document_id)

indexing_cache_key = "document_{}_indexing".format(document.id)
cache_result = redis_client.get(indexing_cache_key)
if cache_result is not None:
raise InvalidActionError("Document is being indexed, please try again later")
indexing_cache_key = "document_{}_indexing".format(document.id)
cache_result = redis_client.get(indexing_cache_key)
if cache_result is not None:
raise InvalidActionError(f"Document:{document.name} is being indexed, please try again later")

if action == "enable":
if document.enabled:
raise InvalidActionError("Document already enabled.")
if action == "enable":
if document.enabled:
continue
document.enabled = True
document.disabled_at = None
document.disabled_by = None
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()

document.enabled = True
document.disabled_at = None
document.disabled_by = None
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)

# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
add_document_to_index_task.delay(document_id)

add_document_to_index_task.delay(document_id)
elif action == "disable":
if not document.completed_at or document.indexing_status != "completed":
raise InvalidActionError(f"Document: {document.name} is not completed.")
if not document.enabled:
continue

return {"result": "success"}, 200
document.enabled = False
document.disabled_at = datetime.now(UTC).replace(tzinfo=None)
document.disabled_by = current_user.id
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()

elif action == "disable":
if not document.completed_at or document.indexing_status != "completed":
raise InvalidActionError("Document is not completed.")
if not document.enabled:
raise InvalidActionError("Document already disabled.")
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)

document.enabled = False
document.disabled_at = datetime.now(UTC).replace(tzinfo=None)
document.disabled_by = current_user.id
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
remove_document_from_index_task.delay(document_id)

# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
elif action == "archive":
if document.archived:
continue

remove_document_from_index_task.delay(document_id)
document.archived = True
document.archived_at = datetime.now(UTC).replace(tzinfo=None)
document.archived_by = current_user.id
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()

return {"result": "success"}, 200
if document.enabled:
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)

elif action == "archive":
if document.archived:
raise InvalidActionError("Document already archived.")
remove_document_from_index_task.delay(document_id)

document.archived = True
document.archived_at = datetime.now(UTC).replace(tzinfo=None)
document.archived_by = current_user.id
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
elif action == "un_archive":
if not document.archived:
continue
document.archived = False
document.archived_at = None
document.archived_by = None
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()

if document.enabled:
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)

remove_document_from_index_task.delay(document_id)

return {"result": "success"}, 200
elif action == "un_archive":
if not document.archived:
raise InvalidActionError("Document is not archived.")

document.archived = False
document.archived_at = None
document.archived_by = None
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()

# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)

add_document_to_index_task.delay(document_id)
add_document_to_index_task.delay(document_id)

else:
raise InvalidActionError()
return {"result": "success"}, 200
else:
raise InvalidActionError()


class DocumentPauseApi(DocumentResource):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""add_auto_disabled_dataset_logs
Revision ID: b608381a3e95
Revises: e19037032219
Create Date: 2024-12-05 15:13:58.514594
"""
from alembic import op
import models as models
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'b608381a3e95'
down_revision = 'e19037032219'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('dataset_auto_disable_logs',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('dataset_id', models.types.StringUUID(), nullable=False),
sa.Column('document_id', models.types.StringUUID(), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP(0)'), nullable=False),
sa.PrimaryKeyConstraint('id', name='dataset_auto_disable_log_pkey')
)
with op.batch_alter_table('dataset_auto_disable_logs', schema=None) as batch_op:
batch_op.create_index('dataset_auto_disable_log_created_atx', ['created_at'], unique=False)
batch_op.create_index('dataset_auto_disable_log_dataset_idx', ['dataset_id'], unique=False)
batch_op.create_index('dataset_auto_disable_log_tenant_idx', ['tenant_id'], unique=False)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('dataset_auto_disable_logs', schema=None) as batch_op:
batch_op.drop_index('dataset_auto_disable_log_tenant_idx')
batch_op.drop_index('dataset_auto_disable_log_dataset_idx')
batch_op.drop_index('dataset_auto_disable_log_created_atx')

op.drop_table('dataset_auto_disable_logs')
# ### end Alembic commands ###
16 changes: 16 additions & 0 deletions api/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,3 +898,19 @@ class ExternalKnowledgeBindings(db.Model):
created_at = db.Column(db.DateTime, nullable=False, server_default=db.text("CURRENT_TIMESTAMP(0)"))
updated_by = db.Column(StringUUID, nullable=True)
updated_at = db.Column(db.DateTime, nullable=False, server_default=db.text("CURRENT_TIMESTAMP(0)"))


class DatasetAutoDisableLog(db.Model):
__tablename__ = "dataset_auto_disable_logs"
__table_args__ = (
db.PrimaryKeyConstraint("id", name="dataset_auto_disable_log_pkey"),
db.Index("dataset_auto_disable_log_tenant_idx", "tenant_id"),
db.Index("dataset_auto_disable_log_dataset_idx", "dataset_id"),
db.Index("dataset_auto_disable_log_created_atx", "created_at"),
)

id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))
tenant_id = db.Column(StringUUID, nullable=False)
dataset_id = db.Column(StringUUID, nullable=False)
document_id = db.Column(StringUUID, nullable=False)
created_at = db.Column(db.DateTime, nullable=False, server_default=db.text("CURRENT_TIMESTAMP(0)"))
28 changes: 27 additions & 1 deletion api/schedule/clean_unused_datasets_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import Dataset, DatasetQuery, Document
from models.dataset import Dataset, DatasetAutoDisableLog, DatasetQuery, Document
from services.feature_service import FeatureService


Expand Down Expand Up @@ -76,6 +76,19 @@ def clean_unused_datasets_task():
)
if not dataset_query or len(dataset_query) == 0:
try:
# add auto disable log
documents = db.session.query(Document).filter(
Document.dataset_id == dataset.id,
Document.enabled == True,
Document.archived == False,
).all()
for document in documents:
dataset_auto_disable_log = DatasetAutoDisableLog(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
document_id=document.id,
)
db.session.add(dataset_auto_disable_log)
# remove index
index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
index_processor.clean(dataset, None)
Expand Down Expand Up @@ -153,6 +166,19 @@ def clean_unused_datasets_task():
else:
plan = plan_cache.decode()
if plan == "sandbox":
# add auto disable log
documents = db.session.query(Document).filter(
Document.dataset_id == dataset.id,
Document.enabled == True,
Document.archived == False,
).all()
for document in documents:
dataset_auto_disable_log = DatasetAutoDisableLog(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
document_id=document.id,
)
db.session.add(dataset_auto_disable_log)
# remove index
index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
index_processor.clean(dataset, None)
Expand Down
19 changes: 19 additions & 0 deletions api/services/dataset_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
AppDatasetJoin,
ChildChunk,
Dataset,
DatasetAutoDisableLog,
DatasetCollectionBinding,
DatasetPermission,
DatasetPermissionEnum,
Expand Down Expand Up @@ -404,6 +405,24 @@ def get_related_apps(dataset_id: str):
.order_by(db.desc(AppDatasetJoin.created_at))
.all()
)

@staticmethod
def get_dataset_auto_disable_logs(dataset_id: str) -> dict:
# get recent 30 days auto disable logs
start_date = datetime.datetime.now() - datetime.timedelta(days=30)
dataset_auto_disable_logs = DatasetAutoDisableLog.query.filter(
DatasetAutoDisableLog.dataset_id == dataset_id,
DatasetAutoDisableLog.created_at >= start_date,
).all()
if dataset_auto_disable_logs:
return {
"document_ids": [log.document_id for log in dataset_auto_disable_logs],
"count": len(dataset_auto_disable_logs),
}
return {
"document_ids": [],
"count": 0,
}


class DocumentService:
Expand Down

0 comments on commit 0475c4e

Please sign in to comment.