Skip to content

Conversation

@BukeLy
Copy link

@BukeLy BukeLy commented Nov 13, 2025

Summary

This PR implements workspace isolation for pipeline status to enable concurrent document processing across multiple tenants without blocking each other.

Problem

In multi-tenant scenarios, different workspaces currently share a single global pipeline_status namespace, causing pipelines from different tenants to block each other. This severely impacts concurrent processing performance.

Solution

Core Changes

  1. Extended get_namespace_data() in shared_storage.py

    • Now recognizes workspace-specific pipeline namespaces
    • Pattern: "{workspace}:pipeline" (following GraphDB pattern)
    • Maintains backward compatibility with "pipeline_status"
  2. Added workspace parameter to initialize_pipeline_status()

    • Supports per-tenant isolated pipeline namespaces
    • Empty workspace defaults to global namespace for backward compatibility
  3. Updated 5 call sites

    • 2 in lightrag.py: process_document_queue(), aremove_document()
    • 3 in document_routes.py: background_delete_documents(), clear_documents(), cancel_pipeline()
    • All now use workspace-aware locks via get_storage_keyed_lock()

Key Features

  • Concurrent Processing: Different workspaces can process documents in parallel
  • Backward Compatible: Empty workspace uses global "pipeline_status"
  • Fail-Fast: Uninitialized pipeline raises clear PipelineNotInitializedError
  • Performance: Expected N× improvement for N concurrent tenants

Bug Fixes

Fixed AttributeError caused by accessing non-existent self.global_config:

  • Used self.workspace in lightrag.py (2 locations)
  • Used rag.workspace in document_routes.py (3 locations)

Testing

All syntax checks passed and comprehensive testing completed:

  1. ✅ Global pipeline initialization (backward compatibility)
  2. ✅ Workspace-specific pipeline initialization
  3. ✅ Proper isolation between workspaces
  4. ✅ Uninitialized pipeline error detection
  5. ✅ Lock functionality with workspaces

Impact

Performance

Before: 3 tenants × 0.2s each = 0.6s (serial)
After:  3 tenants × 0.2s each = ~0.2s (parallel)
Result: 3× performance improvement

Code Changes

4 files changed, 121 insertions(+), 17 deletions(-)

References

  • Follows existing GraphDB workspace pattern in utils_graph.py
  • Implementation plan: .claude/plan/minimal-workspace-isolation.md
  • Optimization plan: .claude/plan/linus-review-optimization.md

@danielaskdd
Copy link
Collaborator

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

try:
from lightrag.kg.shared_storage import (
get_namespace_data,
get_all_update_flags_status,
)
pipeline_status = await get_namespace_data("pipeline_status")

P1 Badge Use workspace namespace when reporting pipeline status

The pipeline status endpoint still reads the global "pipeline_status" dictionary even though other parts of the change now isolate pipeline state per workspace (f"{workspace}:pipeline"). When a tenant operates in a non-default workspace, their jobs update a different namespace, so this endpoint will continue to return the idle global state and hide that the workspace is busy. Construct the namespace from rag.workspace and initialize it, similar to the updated delete/cancel handlers, so each tenant sees their own status.


try:
from lightrag.kg.shared_storage import get_namespace_data
pipeline_status = await get_namespace_data("pipeline_status")
# Check if pipeline is busy
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(

P1 Badge Check workspace-specific busy flag before enqueuing deletions

The deletion entry point still checks await get_namespace_data("pipeline_status") for a global busy flag while the background task now marks busy inside the workspace-specific namespace (f"{workspace}:pipeline"). If two deletion requests are issued for the same workspace, the first sets busy in workspace:pipeline, but the second request reads the unused global dictionary and proceeds, defeating the lock and allowing concurrent deletions in one workspace. Build the namespace from rag.workspace and reuse the keyed lock to ensure the busy flag is consulted per workspace.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@BukeLy
Copy link
Author

BukeLy commented Nov 13, 2025

Fixed Issues

  1. Pipeline status endpoint (document_routes.py:2306)

    • ✅ Now uses workspace-specific namespace f"{workspace}:pipeline"
    • ✅ Each tenant sees their own pipeline status instead of global state
  2. Delete documents busy check (document_routes.py:2513)

    • ✅ Now checks workspace-specific busy flag
    • ✅ Uses proper keyed lock for workspace isolation
    • ✅ Prevents concurrent deletions within the same workspace

Both endpoints now follow the same pattern as the other updated handlers:

  • Get workspace from rag.workspace
  • Construct namespace: f"{workspace}:pipeline" or "pipeline_status" (backward compatible)
  • Initialize pipeline status before use
  • Use workspace-aware locks via get_storage_keyed_lock()

CI checks should pass now.

@chatgpt-codex-connector
Copy link

To use Codex here, create a Codex account and connect to github.

Problem:
In multi-tenant scenarios, different workspaces share a single global
pipeline_status namespace, causing pipelines from different tenants to
block each other, severely impacting concurrent processing performance.

Solution:
- Extended get_namespace_data() to recognize workspace-specific pipeline
  namespaces with pattern "{workspace}:pipeline" (following GraphDB pattern)
- Added workspace parameter to initialize_pipeline_status() for per-tenant
  isolated pipeline namespaces
- Updated all 7 call sites to use workspace-aware locks:
  * lightrag.py: process_document_queue(), aremove_document()
  * document_routes.py: background_delete_documents(), clear_documents(),
    cancel_pipeline(), get_pipeline_status(), delete_documents()

Impact:
- Different workspaces can process documents concurrently without blocking
- Backward compatible: empty workspace defaults to "pipeline_status"
- Maintains fail-fast: uninitialized pipeline raises clear error
- Expected N× performance improvement for N concurrent tenants

Bug fixes:
- Fixed AttributeError by using self.workspace instead of self.global_config
- Fixed pipeline status endpoint to show workspace-specific status
- Fixed delete endpoint to check workspace-specific busy flag

Code changes: 4 files, 141 insertions(+), 28 deletions(-)

Testing: All syntax checks passed, comprehensive workspace isolation tests completed
@BukeLy BukeLy force-pushed the feature/pipeline-workspace-isolation branch from 23ab491 to 2f3620b Compare November 13, 2025 14:31
@danielaskdd
Copy link
Collaborator

@codex review

@chatgpt-codex-connector
Copy link

Codex Review: Didn't find any major issues. Bravo.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@danielaskdd
Copy link
Collaborator

目前的代码还需要解决对遗留代码的兼容性问题。LightRAG Core的用户的代码一直都是使用 initialize_pipeline_status() 来初始化流水线的,并不会传递workspace。因此 initialize_pipeline_status 函数应该在没有传递参数的时候自动使用默认的workspace来初始化流水线。因此 rag.initialize_storages 需要把默认 worksapce 的值保留在 shared_storage 中,以便 initialize_pipeline_status 函数能够获取和使用。

The current codebase still requires addressing compatibility with legacy code. Previously, user code utilizing LightRAG Core initialized the pipeline via initialize_pipeline_status(), which did not pass a workspace parameter. As such, the initialize_pipeline_status function should automatically use a default workspace when no parameter is provided. Consequently, rag.initialize_storages must retain the default workspace value in shared_storage to ensure it remains accessible and usable by initialize_pipeline_status.

@danielaskdd
Copy link
Collaborator

还有另一个兼容问题,lightrag_server.py 文件的健康检查端点 /health 实现函数 get_status 中需要向前端返回流水线的状态。前端UI需要根据这个状态来更新流水线的状态和做许多界面更新的判断。这里有几个问题需要解决:

  1. 现有前端是不知道有 workspace 这个概念的,因此调用 /health 是没有送 workspace 参数的,这个可以通过上一个评论中的方法让 get_status 函数返回默认workspace 的流水线状态。
  2. 需要让前端能够主动获取不同 workspace 的流水线状态。为了保持所有API对multi workspace 操作兼容,建议在HTTP请求头中添加一个 LIGHTRAG-WORKSPACE 请求头,/health 端点通过识别这个请求通来确定用户是希望访问哪一个workspace的状态。日后所有的API端点都将通过 LIGHTRAG-WORKSPACE 请求来是被请求需要访问的 workspace。

Another compatibility issue exists: the health check endpoint /health in lightrag_server.py, implemented by the get_status function, must return the pipeline status to the frontend. The frontend UI relies on this status to update pipeline states and perform various UI-related decisions. Several issues need to be addressed:

  1. The current frontend is unaware of the workspace concept and does not include a workspace parameter when calling /health. This can be resolved by modifying get_status to return the pipeline status of a default workspace, as suggested in the previous comment.

  2. The frontend must be enabled to proactively retrieve pipeline statuses for different workspaces. To ensure full compatibility with multi-workspace operations across all API endpoints, we recommend introducing a custom HTTP request header: LIGHTRAG-WORKSPACE. The /health endpoint will detect this header to determine which workspace’s status the client intends to access. Going forward, all API endpoints will rely on the LIGHTRAG-WORKSPACE header to identify the target workspace for each request.

Fixes two compatibility issues in workspace isolation:

1. Problem: lightrag_server.py calls initialize_pipeline_status()
   without workspace parameter, causing pipeline to initialize in
   global namespace instead of rag's workspace.

   Solution: Add set_default_workspace() mechanism in shared_storage.
   LightRAG.initialize_storages() now sets default workspace, which
   initialize_pipeline_status() uses when called without parameters.

2. Problem: /health endpoint hardcoded to use "pipeline_status",
   cannot return workspace-specific status or support frontend
   workspace selection.

   Solution: Add LIGHTRAG-WORKSPACE header support. Endpoint now
   extracts workspace from header or falls back to server default,
   returning correct workspace-specific pipeline status.

Changes:
- lightrag/kg/shared_storage.py: Add set/get_default_workspace()
- lightrag/lightrag.py: Call set_default_workspace() in initialize_storages()
- lightrag/api/lightrag_server.py: Add get_workspace_from_request() helper,
  update /health endpoint to support LIGHTRAG-WORKSPACE header

Testing:
- Backward compatibility: Old code works without modification
- Multi-instance safety: Explicit workspace passing preserved
- /health endpoint: Supports both default and header-specified workspaces

Related: HKUDS#2353
@BukeLy
Copy link
Author

BukeLy commented Nov 15, 2025

已修复兼容性问题 / Compatibility Issues Fixed

@danielaskdd 您好!我已经修复了您提出的两个兼容性问题:

Hi @danielaskdd! I have fixed both compatibility issues you mentioned:


🔧 问题1:initialize_pipeline_status() 默认workspace支持

🔧 Issue 1: Default Workspace Support for initialize_pipeline_status()

问题描述 / Problem:

  • lightrag_server.py 调用 initialize_pipeline_status() 时不传递 workspace 参数
  • 导致 pipeline 始终初始化到全局 "pipeline_status" namespace,而不是 rag 对象的 workspace
  • The lightrag_server.py calls initialize_pipeline_status() without workspace parameter
  • This causes pipeline to always initialize in global "pipeline_status" namespace instead of rag's workspace

解决方案 / Solution:

  1. shared_storage.py 中添加了默认 workspace 机制:

    • Added default workspace mechanism in shared_storage.py:
    _default_workspace: Optional[str] = None
    
    def set_default_workspace(workspace: str)
    def get_default_workspace() -> str
  2. lightrag.pyinitialize_storages() 中自动设置默认 workspace:

    • Automatically set default workspace in LightRAG.initialize_storages():
    from lightrag.kg.shared_storage import set_default_workspace
    set_default_workspace(self.workspace)
  3. 修改 initialize_pipeline_status() 在无参数时使用默认 workspace:

    • Modified initialize_pipeline_status() to use default workspace when called without parameters:
    async def initialize_pipeline_status(workspace: str = ""):
        if not workspace:
            workspace = get_default_workspace()  # 向后兼容 / Backward compatibility
        ...

效果 / Result:

  • ✅ 旧代码无需修改即可正常工作 / Old code works without modification
  • lightrag_server.py 中的 pipeline 现在会正确初始化到 rag 的 workspace / Pipeline in lightrag_server.py now correctly initializes to rag's workspace
  • ✅ 完全向后兼容 / Fully backward compatible

🔧 问题2:/health 端点支持 LIGHTRAG-WORKSPACE 请求头

🔧 Issue 2: /health Endpoint Support for LIGHTRAG-WORKSPACE Header

问题描述 / Problem:

  • /health 端点硬编码使用 "pipeline_status",无法返回 workspace-specific 状态
  • 前端 UI 无法指定要查询哪个 workspace 的状态
  • /health endpoint hardcoded to use "pipeline_status", cannot return workspace-specific status
  • Frontend UI cannot specify which workspace's status to query

解决方案 / Solution:

  1. 添加了 workspace 提取辅助函数:

    • Added workspace extraction helper function:
    def get_workspace_from_request(request: Request) -> str:
        workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip()
        if not workspace:
            workspace = args.workspace  # 回退到默认 / Fallback to default
        return workspace
  2. 修改 /health 端点支持请求头:

    • Modified /health endpoint to support custom header:
    @app.get("/health")
    async def get_status(request: Request):
        workspace = get_workspace_from_request(request)
        namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
        pipeline_status = await get_namespace_data(namespace)
        ...
  3. 返回完整的 workspace 信息:

    • Return complete workspace information:
    return {
        "configuration": {
            "workspace": workspace,              # 实际使用的 / Actually used
            "default_workspace": args.workspace, # 服务器默认 / Server default
            ...
        }
    }

使用方法 / Usage:

# 无请求头 - 使用默认 workspace / No header - use default workspace
curl http://localhost:9621/health

# 带请求头 - 查询指定 workspace / With header - query specific workspace
curl -H "LIGHTRAG-WORKSPACE: tenant_a" http://localhost:9621/health
curl -H "LIGHTRAG-WORKSPACE: tenant_b" http://localhost:9621/health

效果 / Result:

  • ✅ 前端可以查询任意 workspace 的 pipeline 状态 / Frontend can query pipeline status of any workspace
  • ✅ 无请求头时自动使用默认 workspace / Automatically uses default workspace when no header provided
  • ✅ 为未来所有 API 端点提供统一的 workspace 提取机制 / Provides unified workspace extraction mechanism for all future API endpoints

📦 Commit 信息 / Commit Details

5f153582 fix: Add default workspace support for backward compatibility

Changes:
- lightrag/kg/shared_storage.py: +33 lines
- lightrag/lightrag.py: +8 lines  
- lightrag/api/lightrag_server.py: +41 lines

Total: 3 files changed, 82 insertions(+), 5 deletions(-)

✅ 测试验证 / Testing

已通过以下测试 / Passed following tests:

  1. ✅ 默认 workspace 机制正常工作 / Default workspace mechanism works correctly
  2. ✅ LightRAG 集成测试通过 / LightRAG integration tests pass
  3. ✅ 向后兼容性验证 / Backward compatibility verified
  4. ✅ 多 workspace 隔离验证 / Multi-workspace isolation verified

🎯 设计说明 / Design Notes

为什么 pipeline 需要特殊处理?/ Why does pipeline need special handling?

其他存储锁(如 Redis、Vector DB)通过 storage.global_config.get("workspace") 获取 workspace,因为它们的函数都接收存储对象作为参数:

Other storage locks (like Redis, Vector DB) get workspace via storage.global_config.get("workspace") because their functions receive storage objects as parameters:

# 存储锁示例 / Storage lock example
async def _merge_nodes_then_upsert(entity_name: str, entities_vdb, ...):
    workspace = entities_vdb.global_config.get("workspace", "")  # ✓ 可以获取 / Can get it
    ...

initialize_pipeline_status() 是独立的初始化函数,没有存储对象可传递,因此需要默认 workspace 机制:

But initialize_pipeline_status() is a standalone initialization function with no storage object to pass, thus requiring the default workspace mechanism:

# Pipeline 初始化 / Pipeline initialization
await rag.initialize_storages()       # 设置默认 workspace / Set default workspace
await initialize_pipeline_status()    # 使用默认 workspace / Use default workspace

请您查看并确认这些修改是否符合您的预期。如有任何问题或建议,请随时告知!

Please review and confirm if these changes meet your expectations. Feel free to let me know if you have any questions or suggestions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants