From af2a26e0e155be5aae77a36bdc3463c4d2d32022 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Tue, 24 Dec 2024 15:15:35 +0800 Subject: [PATCH] fix: handle missing documents in retry indexing task and streamline error handling Signed-off-by: -LAN- --- api/tasks/retry_document_indexing_task.py | 49 ++++++++++++----------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/api/tasks/retry_document_indexing_task.py b/api/tasks/retry_document_indexing_task.py index 034f1bb400264a..485caa5152ea78 100644 --- a/api/tasks/retry_document_indexing_task.py +++ b/api/tasks/retry_document_indexing_task.py @@ -58,36 +58,37 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): document = ( db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() ) + if not document: + logging.info(click.style("Document not found: {}".format(document_id), fg="yellow")) + return try: - if document: - # clean old data - index_processor = IndexProcessorFactory(document.doc_form).init_index_processor() - - segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all() - if segments: - index_node_ids = [segment.index_node_id for segment in segments] - # delete from vector index - index_processor.clean(dataset, index_node_ids) + # clean old data + index_processor = IndexProcessorFactory(document.doc_form).init_index_processor() - for segment in segments: - db.session.delete(segment) - db.session.commit() + segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all() + if segments: + index_node_ids = [segment.index_node_id for segment in segments] + # delete from vector index + index_processor.clean(dataset, index_node_ids) - document.indexing_status = "parsing" - document.processing_started_at = datetime.datetime.utcnow() - db.session.add(document) + for segment in segments: + db.session.delete(segment) db.session.commit() - indexing_runner = IndexingRunner() - indexing_runner.run([document]) - redis_client.delete(retry_indexing_cache_key) + document.indexing_status = "parsing" + document.processing_started_at = datetime.datetime.utcnow() + db.session.add(document) + db.session.commit() + + indexing_runner = IndexingRunner() + indexing_runner.run([document]) + redis_client.delete(retry_indexing_cache_key) except Exception as ex: - if document: - document.indexing_status = "error" - document.error = str(ex) - document.stopped_at = datetime.datetime.utcnow() - db.session.add(document) - db.session.commit() + document.indexing_status = "error" + document.error = str(ex) + document.stopped_at = datetime.datetime.utcnow() + db.session.add(document) + db.session.commit() logging.info(click.style(str(ex), fg="yellow")) redis_client.delete(retry_indexing_cache_key) pass