Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] <title>插入文件模块出现不明原因阻塞 #566

Open
2 tasks done
zifengdexiatian opened this issue Nov 1, 2024 · 0 comments
Open
2 tasks done

[BUG] <title>插入文件模块出现不明原因阻塞 #566

zifengdexiatian opened this issue Nov 1, 2024 · 0 comments

Comments

@zifengdexiatian
Copy link

是否已有关于该错误的issue或讨论? | Is there an existing issue / discussion for this?

  • 我已经搜索过已有的issues和讨论 | I have searched the existing issues / discussions

该问题是否在FAQ中有解答? | Is there an existing answer for this in FAQ?

  • 我已经搜索过FAQ | I have searched FAQ

当前行为 | Current Behavior

insert_files_server.py的后台文件自动插入数据库服务有不明原因的降速阻塞。
当同时上传超过50个txt文件后,前30-50个文件会非常快速地进行embedding并插入数据库,但是之后每个文件的插入时间会突然增加到3s左右(有时甚至十多秒)。
debug发现,耗时突然增加的函数是process_data(),该函数内部耗时激增的位置不确定,原因为:

@get_time_async
async def process_data(retriever, milvus_kb, mysql_client, file_info, time_record):
    insert_logger.info(f'开始处理文件: {file_info}')
    parse_timeout_seconds = 300
    insert_timeout_seconds = 300
    content_length = -1
    status = 'green'
    process_start = time.perf_counter()
    insert_logger.info(f'Start insert file: {file_info}')
    _, file_id, user_id, file_name, kb_id, file_location, file_size, file_url, chunk_size = file_info
    # 获取格式为'2021-08-01 00:00:00'的时间戳
    insert_timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
    mysql_client.update_knowlegde_base_latest_insert_time(kb_id, insert_timestamp)
    local_file = LocalFileForInsert(user_id, kb_id, file_id, file_location, file_name, file_url, chunk_size, mysql_client)
    msg = "success"
    chunks_number = 0
    mysql_client.update_file_msg(file_id, f'Processing:{random.randint(1, 5)}%')
    # 这里是把文件做向量化,然后写入Milvus的逻辑
    start = time.perf_counter()
    try:
        await asyncio.wait_for(
            asyncio.to_thread(local_file.split_file_to_docs),
            timeout=parse_timeout_seconds
        )
        insert_logger.info(f"函数split_file_to_docs耗时: {time.perf_counter() - start}")
        start = time.perf_counter()
        content_length = sum([len(doc.page_content) for doc in local_file.docs])
        if content_length > MAX_CHARS:
            status = 'red'
            msg = f"{file_name} content_length too large, {content_length} >= MaxLength({MAX_CHARS})"
            return status, content_length, chunks_number, msg
        elif content_length == 0:
            status = 'red'
            msg = f"{file_name} content_length is 0, file content is empty or The URL exists anti-crawling or requires login."
            return status, content_length, chunks_number, msg
    except asyncio.TimeoutError:
        local_file.event.set()
        insert_logger.error(f'Timeout: split_file_to_docs took longer than {parse_timeout_seconds} seconds')
        status = 'red'
        msg = f"split_file_to_docs timeout: {parse_timeout_seconds}s"
        return status, content_length, chunks_number, msg
    except Exception as e:
        error_info = f'split_file_to_docs error: {traceback.format_exc()}'
        msg = error_info
        insert_logger.error(msg)
        status = 'red'
        msg = f"split_file_to_docs error"
        return status, content_length, chunks_number, msg
    end = time.perf_counter()
    time_record['parse_time'] = round(end - start, 2)
    insert_logger.info(f'parse time: {end - start} {len(local_file.docs)}')
    mysql_client.update_file_msg(file_id, f'Processing:{random.randint(5, 75)}%')

    try:
        start = time.perf_counter()
        chunks_number, insert_time_record = await asyncio.wait_for(
            retriever.insert_documents(local_file.docs, chunk_size),
            timeout=insert_timeout_seconds)
        insert_time = time.perf_counter()
        time_record.update(insert_time_record)
        insert_logger.info(f'insert time: {insert_time - start}')
        # mysql_client.update_chunks_number(local_file.file_id, chunks_number)
    except asyncio.TimeoutError:
        insert_logger.error(f'Timeout: milvus insert took longer than {insert_timeout_seconds} seconds')
        expr = f'file_id == \"{local_file.file_id}\"'
        milvus_kb.delete_expr(expr)
        status = 'red'
        time_record['insert_timeout'] = True
        msg = f"milvus insert timeout: {insert_timeout_seconds}s"
        return status, content_length, chunks_number, msg
    except Exception as e:
        error_info = f'milvus insert error: {traceback.format_exc()}'
        insert_logger.error(error_info)
        status = 'red'
        time_record['insert_error'] = True
        msg = f"milvus insert error"
        return status, content_length, chunks_number, msg

    mysql_client.update_file_msg(file_id, f'Processing:{random.randint(75, 100)}%')
    time_record['upload_total_time'] = round(time.perf_counter() - process_start, 2)
    mysql_client.update_file_upload_infos(file_id, time_record)
    insert_logger.info(f'insert_files_to_milvus: {user_id}, {kb_id}, {file_id}, {file_name}, {status}')
    msg = json.dumps(time_record, ensure_ascii=False)
    insert_logger.info(f'处理数据函数耗时: {time.perf_counter() - process_start}')
    return status, content_length, chunks_number, msg

第一次debug发现是

        await asyncio.wait_for(
            asyncio.to_thread(local_file.split_file_to_docs),
            timeout=parse_timeout_seconds
        )

的耗时突然增加,大约为3s,于是将其改为同步写法,但总耗时并没有减少,且耗时部分变成了

chunks_number, insert_time_record = await asyncio.wait_for(
            retriever.insert_documents(local_file.docs, chunk_size),
            timeout=insert_timeout_seconds)

且耗时也是3s左右,于是进一步debug,发现是insert_documents中插入到milvus数据库部分耗时增加

start = time.perf_counter()
                res: MutationResult = await asyncio.to_thread(
                    self.col.insert, insert_list, timeout=timeout, **kwargs
                )
                # insert_logger.info(f"insert: {res}, insert keys: {res.primary_keys}")
                insert_logger.info(f"insert: {res}")
                pks.extend(res.primary_keys)
                insert_logger.info(f"插入向量数据库耗时为: {round(time.perf_counter() - start, 2)}")

大约为3s左右,然后将其给为同步写法,这部分耗时又减少了,变成了几十毫秒。但是插入到es数据库的部分耗时又变成了3秒左右

                es_start = time.perf_counter()
                # docs的doc_id是file_id + '_' + i
                docs_ids = [doc.metadata['file_id'] + '_' + str(i) for i, doc in enumerate(embed_docs)]
                es_res = await es_store.aadd_documents(embed_docs, ids=docs_ids)
                time_record['es_insert_time'] = round(time.perf_counter() - es_start, 2)
                insert_logger.info(f'es_store insert number: {len(es_res)}, {es_res[0]}')

也就是说,并不是以上某个部分固定地消耗时长,看起来更像是因为异步编程造成的一些奇奇怪怪的阻塞,因为实在快速处理了几十个文件之后才出现这种情况的。
一开始我以为是embedding太慢导致的,于是换成了gpu进行计算,测试了我的embedding速度,也是毫秒级,所以不可能是embedding的问题。
后续又仔细查看了日志,发现在耗时激增后,又会出现耗时正常的情况,变成了正常和耗时交叉出现,最高耗时又20多秒。
我的insert_files_server.py服务的workers为1,测试过多workers,也是一样的情况。

期望行为 | Expected Behavior

期望能找着原因并修复,我有需要上传上百万txt文件的需求,现在的插入速度太慢。

运行环境 | Environment

- OS:centos 7
- NVIDIA Driver:550.90.07
- CUDA:12.4
- Docker Compose: 20.10.15
- NVIDIA GPU Memory:24g

QAnything日志 | QAnything logs

No response

复现方法 | Steps To Reproduce

No response

备注 | Anything else?

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant