Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,12 @@ python tests/integration/test_compat_runtime.py
python tests/integration/test_mpi_basic.py

# DLIO + MPI together
python tests/integration/test_dlio_mpi.py
# This is an MPI program: launch it with an MPI runner, not plain `python`.
# If the host has fewer cores than -np, add --oversubscribe (or --use-hwthread-cpus);
# otherwise mpirun fails with "There are not enough slots available in the system".
mpirun -np 8 python tests/integration/test_dlio_mpi.py
# Single-host machine with <8 cores:
# mpirun --oversubscribe -np 8 python tests/integration/test_dlio_mpi.py

# A/B comparison: MLP vs dpsi implementations
python tests/integration/test_ab_comparison.py
Expand Down
21 changes: 18 additions & 3 deletions tests/integration/test_dlio_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
print(f"Endpoint assignment will be: rank % 4")
print("="*60 + "\n")

if size == 1:
# The script was very likely started with plain `python ...` instead
# of an MPI launcher. It still runs (rank 0 only), but that is not a
# meaningful multi-endpoint test, so make the correct invocation
# explicit rather than letting the user puzzle over single-rank output.
print("NOTE: only 1 MPI process detected. This test is meant to run "
"under an MPI launcher.")
print(" Example: mpirun -np 8 python tests/integration/test_dlio_mpi.py")
print(" If the host has fewer cores than -np, add --oversubscribe "
"(or --use-hwthread-cpus).\n")

# Add DLIO to path
sys.path.insert(0, '/home/eval/Documents/Code/s3dlio/python')

Expand Down Expand Up @@ -52,9 +63,13 @@ def __init__(self):
"http://endpoint4:9000",
]

# Since we have OMPI_COMM_WORLD_RANK set by mpirun, simulate the selection
ompi_rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
endpoint_index = ompi_rank % len(endpoints)
# Endpoint selection is driven by the MPI rank. Use comm.Get_rank()
# (already captured above) rather than the OpenMPI-specific
# OMPI_COMM_WORLD_RANK environment variable, which is unset under plain
# `python`, MPICH, or srun and previously raised a bare KeyError. That env
# var is now only displayed for diagnostics, read safely with a default.
ompi_rank = os.environ.get('OMPI_COMM_WORLD_RANK', 'not set')
endpoint_index = rank % len(endpoints)
selected_endpoint = endpoints[endpoint_index]

print(f"Rank {rank:2d}: OMPI_COMM_WORLD_RANK={ompi_rank} → endpoint[{endpoint_index}] = {selected_endpoint}")
Expand Down
102 changes: 102 additions & 0 deletions tests/integration/test_issue_380_dlio_mpi_rank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""
Regression tests for issue #380:

Parameter error from integration test (test_dlio_mpi.py)

Root cause: ``tests/integration/test_dlio_mpi.py`` selected its endpoint with

ompi_rank = int(os.environ['OMPI_COMM_WORLD_RANK'])

That subscript is OpenMPI-specific and is **unset** whenever the script is not
launched by OpenMPI's ``mpirun`` — e.g. the documented ``python tests/
integration/test_dlio_mpi.py`` invocation, MPICH/``mpiexec``, or ``srun`` — so
the test died with a bare ``KeyError: 'OMPI_COMM_WORLD_RANK'``. The guide
(``tests/README.md``) also told users to start this MPI program with plain
``python`` and gave no guidance for the "not enough slots" error that appears
when ``-np`` exceeds the host core count.

After the fix:

* Endpoint selection uses ``rank = comm.Get_rank()`` (portable across launchers
and already captured by the script), not the OpenMPI env var.
* ``OMPI_COMM_WORLD_RANK`` is read for display only, via
``os.environ.get(..., 'not set')`` — never a hard subscript.
* The guide launches the test with ``mpirun`` and documents
``--oversubscribe`` for under-provisioned hosts.

These tests pin those invariants so the regression cannot reappear. They are
pure-logic / source checks and intentionally require neither mpi4py nor a live
MPI runtime.
"""

import os
import re
from pathlib import Path

import pytest

TEST_SCRIPT = (
Path(__file__).resolve().parent / "test_dlio_mpi.py"
)

ENDPOINTS = [
"http://endpoint1:9000",
"http://endpoint2:9000",
"http://endpoint3:9000",
"http://endpoint4:9000",
]


# ---------------------------------------------------------------------------
# Direct reproduction of the original failure mode
# ---------------------------------------------------------------------------

def test_hard_env_subscript_was_the_crash():
"""The original pattern raises KeyError when not launched by OpenMPI."""
env = dict(os.environ)
env.pop("OMPI_COMM_WORLD_RANK", None)
with pytest.raises(KeyError):
# This mirrors the pre-fix line in test_dlio_mpi.py.
int(env["OMPI_COMM_WORLD_RANK"])


def test_safe_env_read_does_not_crash():
"""The fixed pattern degrades to a default instead of raising."""
env = dict(os.environ)
env.pop("OMPI_COMM_WORLD_RANK", None)
assert env.get("OMPI_COMM_WORLD_RANK", "not set") == "not set"


# ---------------------------------------------------------------------------
# Selection invariant: endpoint must be derived from the MPI rank
# ---------------------------------------------------------------------------

@pytest.mark.parametrize(
"rank,expected_index",
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 0), (7, 3), (8, 0)],
)
def test_rank_selects_endpoint_round_robin(rank, expected_index):
"""rank % len(endpoints) round-robins regardless of any env var."""
assert rank % len(ENDPOINTS) == expected_index
assert ENDPOINTS[rank % len(ENDPOINTS)] == ENDPOINTS[expected_index]


# ---------------------------------------------------------------------------
# Source guards: the crashing pattern must not return
# ---------------------------------------------------------------------------

def test_script_has_no_hard_env_subscript():
src = TEST_SCRIPT.read_text()
# No hard subscript of the OpenMPI rank env var in any quoting style.
assert not re.search(r"os\.environ\[\s*['\"]OMPI_COMM_WORLD_RANK['\"]\s*\]", src), (
"test_dlio_mpi.py must not subscript os.environ['OMPI_COMM_WORLD_RANK']; "
"use comm.Get_rank() and os.environ.get(...) instead."
)


def test_script_selects_endpoint_from_rank():
src = TEST_SCRIPT.read_text()
assert "endpoint_index = rank % len(endpoints)" in src, (
"endpoint selection must be driven by comm.Get_rank() (the `rank` var)."
)
152 changes: 152 additions & 0 deletions vdb_benchmark/tests/tests/test_issue_375_chunked_insert_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""
Regression tests for issue #375:

vdb benchmark shows a very low recall@10 because the flat_gt
collection size is too small.

Root cause: ``load_vdb.insert_data`` computed primary keys as
``range(batch_start, batch_end)`` based on the *chunk-local* index,
so every chunk re-used IDs ``0..chunk_size-1``. With ``num_vectors=1M``
and ``chunk_size=10k`` the source collection ended up with only 10k
unique PKs (and 99 duplicates per PK), which in turn made the
``flat_gt`` collection only 10k rows — about 1% of the source — and
drove recall@10 down to ~0.009.

The fix adds a ``start_id`` offset to ``insert_data`` and threads a
running ``global_id_offset`` through the chunked path in ``main``.
These tests verify the IDs are globally unique across chunks, and that
the legacy default (``start_id=0``) still works for the single-chunk
path.
"""
import os
import sys
from unittest.mock import MagicMock

import numpy as np
import pytest

# Make the package importable regardless of where pytest is invoked from.
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)

# We import the function under test from the real module. We do NOT import
# the module-level argparse / Milvus connect code — those run only inside
# ``main()``. The import itself is cheap.
from vdbbench.load_vdb import insert_data # noqa: E402


def _captured_ids(mock_collection):
"""Concatenate every IDs list passed to ``collection.insert``."""
captured = []
for call in mock_collection.insert.call_args_list:
# ``insert`` is called as ``collection.insert([ids, batch_vectors])``.
args, _kwargs = call
payload = args[0]
ids = payload[0]
captured.extend(list(ids))
return captured


class TestInsertDataIdOffset:
"""Verify primary-key uniqueness across chunked inserts."""

def test_default_start_id_preserves_legacy_behavior(self):
"""When ``start_id`` is omitted, IDs start at 0 — same as before #375."""
collection = MagicMock()
vectors = np.zeros((100, 8), dtype=np.float32)

total, _elapsed = insert_data(collection, vectors, batch_size=25)

assert total == 100
ids = _captured_ids(collection)
assert ids == list(range(0, 100))

def test_start_id_offsets_all_batches(self):
"""A non-zero ``start_id`` shifts every batch's IDs by that offset."""
collection = MagicMock()
vectors = np.zeros((50, 4), dtype=np.float32)

insert_data(collection, vectors, batch_size=10, start_id=1000)

ids = _captured_ids(collection)
assert ids == list(range(1000, 1050))

def test_three_chunks_produce_globally_unique_ids(self):
"""
Exact reproduction of issue #375: simulate the chunked path in
``main()`` with three chunks. Before the fix, every chunk re-used
IDs 0..chunk_size-1 and the union had only ``chunk_size`` unique
values; after the fix the union has ``3 * chunk_size`` unique values.
"""
collection = MagicMock()
chunk_size = 1000
batch_size = 100
num_chunks = 3

global_offset = 0
for _ in range(num_chunks):
chunk = np.zeros((chunk_size, 4), dtype=np.float32)
insert_data(collection, chunk, batch_size=batch_size, start_id=global_offset)
global_offset += chunk_size

ids = _captured_ids(collection)
assert len(ids) == num_chunks * chunk_size
# The critical assertion the original code would fail:
assert len(set(ids)) == num_chunks * chunk_size, (
"Duplicate primary keys across chunks — issue #375 regression."
)
assert min(ids) == 0
assert max(ids) == num_chunks * chunk_size - 1

def test_uneven_final_chunk(self):
"""The final chunk is usually smaller than ``chunk_size``."""
collection = MagicMock()
# 2500 vectors total, chunks of 1000 → 1000, 1000, 500
chunks = [1000, 1000, 500]
global_offset = 0
for n in chunks:
chunk = np.zeros((n, 4), dtype=np.float32)
insert_data(collection, chunk, batch_size=300, start_id=global_offset)
global_offset += n

ids = _captured_ids(collection)
assert ids == list(range(0, 2500))
assert len(set(ids)) == 2500

def test_batch_size_larger_than_chunk(self):
"""``batch_size`` >= len(vectors) should still produce one batch with the offset applied."""
collection = MagicMock()
vectors = np.zeros((42, 4), dtype=np.float32)

insert_data(collection, vectors, batch_size=1000, start_id=500)

assert collection.insert.call_count == 1
ids = _captured_ids(collection)
assert ids == list(range(500, 542))


class TestFlatGtCoverageGuard:
"""
Sanity-check the *intent* of the coverage guard added to
``enhanced_bench.create_flat_collection``: a flat_gt collection
that covers far fewer entities than the source should be flagged.

We assert the threshold here rather than invoking Milvus, so this
test runs in CI with no external dependencies.
"""

@pytest.mark.parametrize(
"flat_count,source_count,should_pass",
[
(1_000_000, 1_000_000, True), # exact match
(995_000, 1_000_000, True), # 99.5%, within tolerance
(10_000, 1_000_000, False), # the issue #375 failure mode
(100_000, 1_000_000, False), # only 10%, still wrong
(0, 1_000_000, False), # empty
],
)
def test_coverage_threshold(self, flat_count, source_count, should_pass):
coverage = flat_count / source_count if source_count else 0.0
passes = coverage >= 0.99
assert passes is should_pass
26 changes: 25 additions & 1 deletion vdb_benchmark/vdbbench/enhanced_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,13 @@ def create_flat_collection(
pct = min(100.0, 100.0 * copied / total_vectors)
print(f" Copied {copied}/{total_vectors} vectors ({pct:.1f}%)")

print(f" Copied {copied}/{total_vectors} vectors (100.0%)")
# Compute actual completion percentage rather than hardcoding 100%.
# When the source collection has duplicate primary keys, the
# query_iterator deduplicates and `copied` ends up well below
# `total_vectors` — printing "100.0%" here used to hide that
# (see issue #375).
final_pct = 100.0 * copied / total_vectors if total_vectors else 0.0
print(f" Copied {copied}/{total_vectors} vectors ({final_pct:.1f}%)")
flat_coll.flush()

# Wait for entity count to stabilize after flush
Expand All @@ -928,6 +934,24 @@ def create_flat_collection(
flat_coll.load()
print(f"FLAT collection '{flat_collection_name}' ready with "
f"{flat_coll.num_entities} vectors.")

# Guard: the ground-truth FLAT collection must cover the source
# collection's vectors. If it doesn't, recall@k will be artificially
# low because most ANN-returned PKs simply won't exist in the GT
# set. This was the failure mode reported in issue #375 (caused by
# duplicate PKs in the source). Fail loudly rather than silently
# producing meaningless recall numbers.
coverage = (flat_coll.num_entities / total_vectors) if total_vectors else 0.0
if coverage < 0.99:
print(f"ERROR: FLAT ground-truth collection covers only "
f"{flat_coll.num_entities}/{total_vectors} "
f"({coverage * 100:.2f}%) of the source collection. "
f"This will produce artificially low recall@k. "
f"Common cause: duplicate primary keys in the source "
f"collection (see issue #375). "
f"Re-run the load step and verify unique PKs.")
return False

return True

except Exception as e:
Expand Down
Loading
Loading