Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/scripts/hardware_ci/run-xpu-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ docker run \
pytest -v -s v1/worker --ignore=v1/worker/test_gpu_model_runner.py
pytest -v -s v1/structured_output
pytest -v -s v1/spec_decode --ignore=v1/spec_decode/test_max_len.py --ignore=v1/spec_decode/test_eagle.py --ignore=v1/spec_decode/test_tree_attention.py
pytest -v -s v1/kv_connector/unit --ignore=v1/kv_connector/unit/test_multi_connector.py --ignore=v1/kv_connector/unit/test_nixl_connector.py --ignore=v1/kv_connector/unit/test_shared_storage_connector.py
pytest -v -s kv_connector/unit --ignore=kv_connector/unit/test_multi_connector.py --ignore=kv_connector/unit/test_nixl_connector.py --ignore=kv_connector/unit/test_shared_storage_connector.py
pytest -v -s v1/test_serial_utils.py
pytest -v -s v1/test_utils.py
pytest -v -s v1/test_metrics_reader.py
Expand Down
3 changes: 2 additions & 1 deletion .buildkite/test-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,17 @@ steps:
- pytest -v -s v1/worker
- pytest -v -s v1/structured_output
- pytest -v -s v1/spec_decode
- pytest -v -s v1/kv_connector/unit
- pytest -v -s v1/metrics
- pytest -v -s v1/test_serial_utils.py
- pytest -v -s v1/test_utils.py
- pytest -v -s v1/test_oracle.py
- pytest -v -s v1/test_metrics_reader.py
- pytest -v -s kv_connector/unit
# Integration test for streaming correctness (requires special branch).
- pip install -U git+https://github.com/robertgshaw2-redhat/lm-evaluation-harness.git@streaming-api
- pytest -v -s entrypoints/openai/correctness/test_lmeval.py::test_lm_eval_accuracy_v1_engine


- label: Examples Test # 30min
timeout_in_minutes: 45
mirror_hardwares: [amdexperimental]
Expand Down
3 changes: 1 addition & 2 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ CMakeLists.txt @tlrmchlsmth @LucasWilkinson
/tests/weight_loading @mgoin @youkaichao @yewentao256
/tests/lora @jeejeelee
/tests/models/language/generation/test_hybrid.py @tdoublep
/tests/v1/kv_connector/nixl_integration @NickLucche
/tests/v1/kv_connector @ApostaC
/tests/kv_connector @NickLucche @ApostaC
/tests/v1/offloading @ApostaC

# Transformers backend
Expand Down
2 changes: 1 addition & 1 deletion .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ pull_request_rules:
- files~=^examples/online_serving/disaggregated[^/]*/.*
- files~=^examples/offline_inference/disaggregated[^/]*/.*
- files~=^examples/others/lmcache/
- files~=^tests/v1/kv_connector/
- files~=^tests/kv_connector/
- files~=^vllm/distributed/kv_transfer/
- title~=(?i)\bP/?D\b
- title~=(?i)NIXL
Expand Down
2 changes: 1 addition & 1 deletion docs/features/disagg_prefill.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Now supports 5 types of connectors:

- **SharedStorageConnector**: refer to <gh-file:examples/offline_inference/disaggregated-prefill-v1/run.sh> for the example usage of SharedStorageConnector disaggregated prefilling.
- **LMCacheConnectorV1**: refer to <gh-file:examples/others/lmcache/disagg_prefill_lmcache_v1/disagg_example_nixl.sh> for the example usage of LMCacheConnectorV1 disaggregated prefilling which uses NIXL as the underlying KV transmission.
- **NixlConnector**: refer to <gh-file:tests/v1/kv_connector/nixl_integration/run_accuracy_test.sh> for the example usage of NixlConnector disaggregated prefilling which support fully async send/recv.
- **NixlConnector**: refer to <gh-file:tests/kv_connector/nixl_integration/run_accuracy_test.sh> for the example usage of NixlConnector disaggregated prefilling which support fully async send/recv.
- **P2pNcclConnector**: refer to <gh-file:examples/online_serving/disaggregated_serving_p2p_nccl_xpyd/disagg_example_p2p_nccl_xpyd.sh> for the example usage of P2pNcclConnector disaggregated prefilling.
- **MultiConnector**: take advantage of the kv_connector_extra_config: dict[str, Any] already present in KVTransferConfig to stash all the connectors we want in an ordered list of kwargs.such as:

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ run_tests_for_model() {
done

# Build the command for the proxy server with all the hosts and ports
PROXY_CMD="python ${GIT_ROOT}/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py --port 8192"
PROXY_CMD="python ${GIT_ROOT}/tests/kv_connector/nixl_integration/toy_proxy_server.py --port 8192"

# Add all prefill hosts and ports
PROXY_CMD+=" --prefiller-hosts ${PREFILL_HOSTS[@]}"
Expand All @@ -168,7 +168,7 @@ run_tests_for_model() {

# Run lm eval for this model
echo "Running tests for $model_name"
TEST_MODEL=$model_name python -m pytest -s -x ${GIT_ROOT}/tests/v1/kv_connector/nixl_integration/test_accuracy.py
TEST_MODEL=$model_name python -m pytest -s -x ${GIT_ROOT}/tests/kv_connector/nixl_integration/test_accuracy.py

# Clean up before running next model
cleanup_instances
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ run_tests_for_model() {

# Build the command for the proxy server with all the hosts and ports
PROXY_PORT=8192
PROXY_CMD="python ${GIT_ROOT}/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py --port $PROXY_PORT"
PROXY_CMD="python ${GIT_ROOT}/tests/kv_connector/nixl_integration/toy_proxy_server.py --port $PROXY_PORT"
PROXY_CMD+=" --prefiller-ports ${PREFILL_PORT}"
PROXY_CMD+=" --decoder-ports ${DECODE_PORT}"
# Start the proxy server
Expand All @@ -106,7 +106,7 @@ run_tests_for_model() {

# Run lm eval for this model
echo "Running tests for $model_name"
PREFILL_PORT=$PREFILL_PORT DECODE_PORT=$DECODE_PORT PROXY_PORT=$PROXY_PORT python -m pytest -s -v ${GIT_ROOT}/tests/v1/kv_connector/nixl_integration/test_edge_cases.py
PREFILL_PORT=$PREFILL_PORT DECODE_PORT=$DECODE_PORT PROXY_PORT=$PROXY_PORT python -m pytest -s -v ${GIT_ROOT}/tests/kv_connector/nixl_integration/test_edge_cases.py

# Clean up before running next model
cleanup_instances
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ BLOCK_SIZE=${BLOCK_SIZE:-32}

# execution env
GIT_ROOT=$(git rev-parse --show-toplevel)
EXP_ROOT="${GIT_ROOT}/tests/v1/kv_connector/nixl_integration"
EXP_ROOT="${GIT_ROOT}/tests/kv_connector/nixl_integration"
CONDA_PATH=${CONDA_PATH:-"/home/${USER}/anaconda3"}
CONDA_ENV_NAME=${CONDA_ENV_NAME:-"nixl"}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ BLOCK_SIZE=${BLOCK_SIZE:-32}

# execution env
GIT_ROOT=$(git rev-parse --show-toplevel)
EXP_ROOT="${GIT_ROOT}/tests/v1/kv_connector/nixl_integration"
EXP_ROOT="${GIT_ROOT}/tests/kv_connector/nixl_integration"
CONDA_PATH=${CONDA_PATH:-"/home/${USER}/anaconda3"}
CONDA_ENV_NAME=${CONDA_ENV_NAME:-"nixl"}

Expand Down Expand Up @@ -123,4 +123,4 @@ PREFILL_PORT=${PREFILL_PORT} \
DECODE_HOST=${DECODE_HOST} \
DECODE_PORT=${DECODE_PORT} \
PROXY_HOST=${PROXY_HOST} \
PROXY_PORT=${PROXY_PORT} python -m pytest -s -v ${GIT_ROOT}/tests/v1/kv_connector/nixl_integration/test_edge_cases.py
PROXY_PORT=${PROXY_PORT} python -m pytest -s -v ${GIT_ROOT}/tests/kv_connector/nixl_integration/test_edge_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_multi_shared_storage_connector_consistency():
"name": "storage1",
},
"kv_connector_module_path":
"tests.v1.kv_connector.unit.utils",
"tests.kv_connector.unit.utils",
}, {
"kv_connector":
"TestSharedStorageConnector",
Expand All @@ -73,7 +73,7 @@ def test_multi_shared_storage_connector_consistency():
"name": "storage2",
},
"kv_connector_module_path":
"tests.v1.kv_connector.unit.utils",
"tests.kv_connector.unit.utils",
}]
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@

from vllm import LLM
from vllm.config import KVTransferConfig
from vllm.distributed.kv_transfer.kv_connector.utils import KVOutputAggregator
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import (
KVConnectorStats)
from vllm.distributed.kv_transfer.kv_connector.v1.multi_connector import (
from vllm.distributed.kv_transfer.kv_connector.metrics import KVConnectorStats
from vllm.distributed.kv_transfer.kv_connector.multi_connector import (
MultiKVConnectorStats)
from vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector import (
from vllm.distributed.kv_transfer.kv_connector.nixl_connector import (
KVConnectorRole, NixlAgentMetadata, NixlConnector, NixlConnectorMetadata,
NixlConnectorWorker, NixlKVConnectorStats)
from vllm.distributed.kv_transfer.kv_connector.utils import KVOutputAggregator
from vllm.forward_context import ForwardContext
from vllm.sampling_params import SamplingParams
from vllm.v1.attention.backends.flash_attn import FlashAttentionBackend
Expand Down Expand Up @@ -249,7 +248,7 @@ def _nixl_handshake(self, host: str, port: int, remote_tp_size: int,
class TestNixlHandshake:

@patch(
"vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.NixlWrapper",
"vllm.distributed.kv_transfer.kv_connector.nixl_connector.NixlWrapper",
FakeNixlWrapper)
def test_multi_xfer_one_engine(
self,
Expand Down Expand Up @@ -321,7 +320,7 @@ def test_multi_xfer_one_engine(
connector.clear_connector_metadata()

@patch(
"vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.NixlWrapper",
"vllm.distributed.kv_transfer.kv_connector.nixl_connector.NixlWrapper",
FakeNixlWrapper)
@pytest.mark.parametrize("decode_tp_size, prefill_tp_size", [
(1, 1),
Expand Down Expand Up @@ -379,7 +378,7 @@ def test_async_load_kv(
raise TimeoutError("Took too long to complete async handshake.")

@patch(
"vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.NixlWrapper",
"vllm.distributed.kv_transfer.kv_connector.nixl_connector.NixlWrapper",
FakeNixlWrapper)
def test_concurrent_load_kv(
self,
Expand Down Expand Up @@ -432,7 +431,7 @@ def test_concurrent_load_kv(
raise TimeoutError("Took too long to complete async handshake.")

@patch(
"vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.NixlWrapper",
"vllm.distributed.kv_transfer.kv_connector.nixl_connector.NixlWrapper",
FakeNixlWrapper)
def test_handshake_fails_on_kv_cache_layout_mismatch(self, dist_init):
"""
Expand All @@ -444,7 +443,7 @@ def test_handshake_fails_on_kv_cache_layout_mismatch(self, dist_init):
# Mock TP world size to 2 to force heterogeneous TP when
# remote_tp_size=1
with patch(
"vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.get_tensor_model_parallel_world_size", # noqa: E501
"vllm.distributed.kv_transfer.kv_connector.nixl_connector.get_tensor_model_parallel_world_size", # noqa: E501
return_value=2):
# Initialize connector and worker (with fake NIXL wrapper)
connector = NixlConnector(vllm_config, KVConnectorRole.WORKER)
Expand Down Expand Up @@ -481,9 +480,8 @@ def test_handshake_fails_on_kv_cache_layout_mismatch(self, dist_init):
# NOTE: resource cleanup in mp backend is a bit finicky, so the order in which
# we put here is important. First run ray, it will clean up the resources, then
# the rest of the tests.
@patch(
"vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.NixlWrapper",
FakeNixlWrapper)
@patch("vllm.distributed.kv_transfer.kv_connector.nixl_connector.NixlWrapper",
FakeNixlWrapper)
def test_kv_connector_stats(dist_init):
"""Test that KV transfer stats are properly recorded and retrieved."""
vllm_config = create_vllm_config()
Expand Down Expand Up @@ -685,9 +683,8 @@ def make_multi_stats(nixl_count: int,


@pytest.mark.parametrize("distributed_executor_backend", ["ray", None])
@patch(
"vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.NixlWrapper",
FakeNixlWrapper)
@patch("vllm.distributed.kv_transfer.kv_connector.nixl_connector.NixlWrapper",
FakeNixlWrapper)
def test_abort_timeout_on_prefiller(monkeypatch, distributed_executor_backend):
"""
Test lifecycle of an aborted Remote Prefill request hitting the timeout.
Expand Down Expand Up @@ -809,9 +806,9 @@ def test_register_kv_caches(dist_init):
unique_tensor[0].data_ptr(), unique_tensor[1].data_ptr()
]

with patch("vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.NixlWrapper") as mock_nixl_wrapper, \
patch("vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.threading.Event"), \
patch("vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.threading.Thread"): # noqa: E501
with patch("vllm.distributed.kv_transfer.kv_connector.nixl_connector.NixlWrapper") as mock_nixl_wrapper, \
patch("vllm.distributed.kv_transfer.kv_connector.nixl_connector.threading.Event"), \
patch("vllm.distributed.kv_transfer.kv_connector.nixl_connector.threading.Thread"): # noqa: E501

# Create connector
connector = NixlConnector(vllm_config, KVConnectorRole.WORKER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from vllm import SamplingParams
from vllm.config import KVTransferConfig, VllmConfig
from vllm.distributed.kv_events import BlockRemoved, BlockStored
from vllm.distributed.kv_transfer.kv_connector.v1 import KVConnectorRole
from vllm.distributed.kv_transfer.kv_connector.v1.offloading_connector import (
from vllm.distributed.kv_transfer.kv_connector import KVConnectorRole
from vllm.distributed.kv_transfer.kv_connector.offloading_connector import (
OffloadingConnector, OffloadingConnectorMetadata)
from vllm.forward_context import ForwardContext
from vllm.utils import sha256
Expand Down Expand Up @@ -115,7 +115,7 @@ def __init__(self, offloaded_block_size: int, gpu_block_size: int,
kv_connector_extra_config={
"spec_name": "MockOffloadingSpec",
"spec_module_path":
"tests.v1.kv_connector.unit.test_offloading_connector",
"tests.kv_connector.unit.test_offloading_connector",
"block_size": offloaded_block_size,
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ModelConfig, SchedulerConfig, VllmConfig)
from vllm.distributed.kv_transfer.kv_connector.factory import (
KVConnectorFactory)
from vllm.distributed.kv_transfer.kv_connector.v1.shared_storage_connector import ( # noqa
from vllm.distributed.kv_transfer.kv_connector.shared_storage_connector import ( # noqa
SharedStorageConnector)
from vllm.utils import sha256
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
Expand Down
7 changes: 3 additions & 4 deletions vllm/attention/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
from vllm.attention.utils.kv_sharing_utils import validate_kv_sharing_target
from vllm.config import CacheConfig, get_current_vllm_config
from vllm.distributed.kv_transfer import (get_kv_transfer_group,
has_kv_transfer_group,
is_v1_kv_transfer_group)
has_kv_transfer_group)
from vllm.forward_context import ForwardContext, get_forward_context
from vllm.logger import init_logger
from vllm.model_executor.layers.attention_layer_base import AttentionLayerBase
Expand Down Expand Up @@ -514,7 +513,7 @@ def forward(


def wait_for_kv_layer_from_connector(layer_name: str):
if not has_kv_transfer_group() or not is_v1_kv_transfer_group():
if not has_kv_transfer_group():
return

connector = get_kv_transfer_group()
Expand All @@ -531,7 +530,7 @@ def maybe_save_kv_layer_to_connector(
layer_name: str,
kv_cache_layer: List[torch.Tensor],
):
if not has_kv_transfer_group() or not is_v1_kv_transfer_group():
if not has_kv_transfer_group():
return

connector = get_kv_transfer_group()
Expand Down
9 changes: 4 additions & 5 deletions vllm/distributed/kv_transfer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

from vllm.distributed.kv_transfer.kv_transfer_state import (
KVConnectorBaseType, ensure_kv_transfer_initialized,
ensure_kv_transfer_shutdown, get_kv_transfer_group, has_kv_transfer_group,
is_v1_kv_transfer_group)
KVConnectorBase, ensure_kv_transfer_initialized,
ensure_kv_transfer_shutdown, get_kv_transfer_group, has_kv_transfer_group)

__all__ = [
"get_kv_transfer_group", "has_kv_transfer_group",
"is_v1_kv_transfer_group", "ensure_kv_transfer_initialized",
"ensure_kv_transfer_shutdown", "KVConnectorBaseType"
"ensure_kv_transfer_initialized", "ensure_kv_transfer_shutdown",
"KVConnectorBase"
]
6 changes: 6 additions & 0 deletions vllm/distributed/kv_transfer/kv_connector/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from vllm.distributed.kv_transfer.kv_connector.base import (KVConnectorBase,
KVConnectorRole)

__all__ = ["KVConnectorRole", "KVConnectorBase"]
Loading