From 4adc937f29c429b2b7612b488224f64937bd9653 Mon Sep 17 00:00:00 2001 From: Salvatore Mongiardo Date: Fri, 13 Mar 2026 16:15:21 +0100 Subject: [PATCH 1/4] Camel-Milvus: create rag helpers to be used as beans --- .../helpers/MilvusHelperCreateCollection.java | 195 ++++++++++++++++++ .../helpers/MilvusHelperCreateIndex.java | 106 ++++++++++ .../milvus/helpers/MilvusHelperDelete.java | 69 +++++++ .../helpers/MilvusHelperFieldMappingUtil.java | 51 +++++ .../milvus/helpers/MilvusHelperInsert.java | 99 +++++++++ .../helpers/MilvusHelperResultExtractor.java | 85 ++++++++ .../milvus/helpers/MilvusHelperSearch.java | 116 +++++++++++ .../milvus/helpers/MilvusHelperUpsert.java | 99 +++++++++ .../milvus/MilvusCreateCollectionTest.java | 57 ++--- .../milvus/it/MilvusComponentIT.java | 151 ++++++-------- 10 files changed, 900 insertions(+), 128 deletions(-) create mode 100644 components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java create mode 100644 components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java create mode 100644 components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java create mode 100644 components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java create mode 100644 components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java create mode 100644 components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java create mode 100644 components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java create mode 100644 components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java new file mode 100644 index 0000000000000..19f81db370086 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import io.milvus.grpc.DataType; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.FieldType; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.collection.CreateCollectionParam} from simple + * string properties and sets it as the exchange body together with the {@link MilvusAction#CREATE_COLLECTION} header. + */ +public class MilvusHelperCreateCollection implements Processor { + + private String collectionName = "default_collection"; + private String collectionDescription = "Default collection"; + private String idFieldName = "id"; + private String dimension = "768"; + private String textFieldName = "content"; + private String textFieldDataType = "VarChar"; + private String vectorFieldName = "embedding"; + private String vectorDataType = "FloatVector"; + private String textFieldMaxLength = "2048"; + private String additionalTextFields; + + @Override + public void process(Exchange exchange) throws Exception { + int vectorDim = Integer.parseInt(dimension); + int maxLength = Integer.parseInt(textFieldMaxLength); + DataType textDataType = DataType.valueOf(textFieldDataType); + + FieldType idField = FieldType.newBuilder() + .withName(idFieldName) + .withDataType(DataType.Int64) + .withPrimaryKey(true) + .withAutoID(true) + .build(); + + FieldType.Builder textFieldBuilder = FieldType.newBuilder() + .withName(textFieldName) + .withDataType(textDataType); + if (textDataType == DataType.VarChar) { + textFieldBuilder.withMaxLength(maxLength); + } + FieldType textField = textFieldBuilder.build(); + + DataType vecDataType = DataType.valueOf(vectorDataType); + + FieldType vectorField = FieldType.newBuilder() + .withName(vectorFieldName) + .withDataType(vecDataType) + .withDimension(vectorDim) + .build(); + + CreateCollectionParam.Builder builder = CreateCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withDescription(collectionDescription) + .addFieldType(idField) + .addFieldType(textField); + + if (additionalTextFields != null && !additionalTextFields.isBlank()) { + for (String fieldName : additionalTextFields.split(",")) { + String trimmed = fieldName.trim(); + if (!trimmed.isEmpty()) { + FieldType extraField = FieldType.newBuilder() + .withName(trimmed) + .withDataType(DataType.VarChar) + .withMaxLength(maxLength) + .build(); + builder.addFieldType(extraField); + } + } + } + + builder.addFieldType(vectorField); + + exchange.getIn().setBody(builder.build()); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_COLLECTION); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getCollectionDescription() { + return collectionDescription; + } + + public void setCollectionDescription(String collectionDescription) { + this.collectionDescription = collectionDescription; + } + + public String getIdFieldName() { + return idFieldName; + } + + public void setIdFieldName(String idFieldName) { + this.idFieldName = idFieldName; + } + + public String getDimension() { + return dimension; + } + + /** + * @param dimension the vector dimension as a string (e.g., {@code 768}, {@code 1536}) + */ + public void setDimension(String dimension) { + this.dimension = dimension; + } + + public String getTextFieldName() { + return textFieldName; + } + + public void setTextFieldName(String textFieldName) { + this.textFieldName = textFieldName; + } + + public String getTextFieldDataType() { + return textFieldDataType; + } + + /** + * @param textFieldDataType the Milvus {@link io.milvus.grpc.DataType} enum name (e.g., {@code VarChar}, + * {@code Int8}) + */ + public void setTextFieldDataType(String textFieldDataType) { + this.textFieldDataType = textFieldDataType; + } + + public String getVectorDataType() { + return vectorDataType; + } + + /** + * @param vectorDataType the Milvus {@link io.milvus.grpc.DataType} enum name for the vector field (e.g., + * {@code FloatVector}, {@code BinaryVector}, {@code Float16Vector}) + */ + public void setVectorDataType(String vectorDataType) { + this.vectorDataType = vectorDataType; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getTextFieldMaxLength() { + return textFieldMaxLength; + } + + /** + * @param textFieldMaxLength the maximum length for VarChar fields as a string (e.g., {@code 2048}) + */ + public void setTextFieldMaxLength(String textFieldMaxLength) { + this.textFieldMaxLength = textFieldMaxLength; + } + + public String getAdditionalTextFields() { + return additionalTextFields; + } + + /** + * @param additionalTextFields comma-separated list of extra VarChar field names (e.g., {@code title,author}) + */ + public void setAdditionalTextFields(String additionalTextFields) { + this.additionalTextFields = additionalTextFields; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java new file mode 100644 index 0000000000000..3f749e802c198 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import io.milvus.param.IndexType; +import io.milvus.param.MetricType; +import io.milvus.param.index.CreateIndexParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.index.CreateIndexParam} from simple string + * properties and sets it as the exchange body together with the {@link MilvusAction#CREATE_INDEX} header. + */ +public class MilvusHelperCreateIndex implements Processor { + + private String collectionName = "default_collection"; + private String vectorFieldName = "embedding"; + private String indexType = "IVF_FLAT"; + private String metricType = "COSINE"; + private String extraParam = "{\"nlist\": 128}"; + + @Override + public void process(Exchange exchange) throws Exception { + IndexType idxType = IndexType.valueOf(indexType); + MetricType metric = MetricType.valueOf(metricType); + + CreateIndexParam param = CreateIndexParam.newBuilder() + .withCollectionName(collectionName) + .withFieldName(vectorFieldName) + .withIndexType(idxType) + .withMetricType(metric) + .withExtraParam(extraParam) + .withSyncMode(Boolean.TRUE) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_INDEX); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getIndexType() { + return indexType; + } + + /** + * @param indexType the Milvus {@link io.milvus.param.IndexType} enum name (e.g., {@code IVF_FLAT}, {@code HNSW}) + */ + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + public String getMetricType() { + return metricType; + } + + /** + * @param metricType the Milvus {@link io.milvus.param.MetricType} enum name (e.g., {@code COSINE}, {@code L2}, + * {@code IP}) + */ + public void setMetricType(String metricType) { + this.metricType = metricType; + } + + public String getExtraParam() { + return extraParam; + } + + /** + * @param extraParam JSON string with index-specific parameters (e.g., {@code {"nlist": 128}}) + */ + public void setExtraParam(String extraParam) { + this.extraParam = extraParam; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java new file mode 100644 index 0000000000000..6a37abaca3bf8 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import io.milvus.param.dml.DeleteParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.dml.DeleteParam} from simple string properties + * and sets it as the exchange body together with the {@link MilvusAction#DELETE} header. + */ +public class MilvusHelperDelete implements Processor { + + private String collectionName = "default_collection"; + private String filter; + + @Override + public void process(Exchange exchange) throws Exception { + if (filter == null || filter.isEmpty()) { + throw new IllegalArgumentException( + "A filter expression is required for delete operations (e.g., \"id in [1, 2, 3]\")"); + } + + DeleteParam param = DeleteParam.newBuilder() + .withCollectionName(collectionName) + .withExpr(filter) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.DELETE); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getFilter() { + return filter; + } + + /** + * @param filter a Milvus boolean expression to select entities to delete (e.g., {@code id in [1, 2, 3]}, + * {@code age > 18}) + */ + public void setFilter(String filter) { + this.filter = filter; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java new file mode 100644 index 0000000000000..0cadfc9558447 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Shared utility for parsing {@code textFieldMappings} strings used by {@link MilvusHelperInsert} and + * {@link MilvusHelperUpsert}. + */ +final class MilvusHelperFieldMappingUtil { + + private MilvusHelperFieldMappingUtil() { + } + + /** + * Parses a comma-separated mapping string into an ordered map of field name to variable name. + * + * @param textFieldMappings the mappings string (e.g., {@code field1=var1,field2=var2}) + * @return an ordered map from field name to variable name + * @throws IllegalArgumentException if a mapping entry does not contain exactly one {@code =} separator + */ + static Map parseMappings(String textFieldMappings) { + Map result = new LinkedHashMap<>(); + for (String mapping : textFieldMappings.split(",")) { + String[] pair = mapping.trim().split("="); + if (pair.length != 2) { + throw new IllegalArgumentException( + "Invalid textFieldMappings entry: '" + mapping.trim() + + "'. Expected format: fieldName=variableName"); + } + result.put(pair[0].trim(), pair[1].trim()); + } + return result; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java new file mode 100644 index 0000000000000..3578425dc214b --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.milvus.param.dml.InsertParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.dml.InsertParam} from the exchange body vector + * and exchange variables mapped via {@link #setTextFieldMappings(String)}, then sets it as the exchange body together + * with the {@link MilvusAction#INSERT} header. + */ +public class MilvusHelperInsert implements Processor { + + private String collectionName = "default_collection"; + private String vectorFieldName = "embedding"; + private String textFieldMappings = "content=text"; + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + List vector = exchange.getIn().getBody(List.class); + if (vector == null) { + throw new IllegalArgumentException("Exchange body must contain a List vector, but was null"); + } + + Map mappings = MilvusHelperFieldMappingUtil.parseMappings(textFieldMappings); + + List fields = new ArrayList<>(); + for (Map.Entry entry : mappings.entrySet()) { + String value = exchange.getVariable(entry.getValue(), String.class); + if (value == null) { + throw new IllegalArgumentException( + "Exchange variable '" + entry.getValue() + "' is not set (mapped from field '" + entry.getKey() + "')"); + } + fields.add(new InsertParam.Field(entry.getKey(), Collections.singletonList(value))); + } + + fields.add(new InsertParam.Field(vectorFieldName, Collections.singletonList(vector))); + + InsertParam param = InsertParam.newBuilder() + .withCollectionName(collectionName) + .withFields(fields) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.INSERT); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getTextFieldMappings() { + return textFieldMappings; + } + + /** + * @param textFieldMappings comma-separated mappings of Milvus field names to exchange variable names (e.g., + * {@code field1=var1,field2=var2}) + */ + public void setTextFieldMappings(String textFieldMappings) { + this.textFieldMappings = textFieldMappings; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java new file mode 100644 index 0000000000000..f2831a9f3baea --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.milvus.param.highlevel.dml.response.SearchResponse; +import io.milvus.response.QueryResultsWrapper; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * A Camel {@link Processor} that extracts field values from a Milvus + * {@link io.milvus.param.highlevel.dml.response.SearchResponse} into a list of ranked maps. Each map contains a + * {@code rank} entry and the requested output field values. The extracted list is set as the exchange body. + */ +public class MilvusHelperResultExtractor implements Processor { + + private String outputFields = "content"; + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody(extract(exchange)); + } + + public List> extract(Exchange exchange) { + SearchResponse response = exchange.getIn().getBody(SearchResponse.class); + List> extracted = new ArrayList<>(); + + String[] fields = outputFields.split(","); + + if (response != null) { + List records = response.getRowRecords(0); + int rank = 1; + for (QueryResultsWrapper.RowRecord record : records) { + Map item = new LinkedHashMap<>(); + item.put("rank", rank++); + Object score = record.get("score"); + if (score != null) { + item.put("score", score); + } + for (String field : fields) { + String trimmed = field.trim(); + if (!trimmed.isEmpty()) { + Object value = record.get(trimmed); + if (value != null) { + item.put(trimmed, value); + } + } + } + extracted.add(item); + } + } + return extracted; + } + + public String getOutputFields() { + return outputFields; + } + + /** + * @param outputFields comma-separated list of field names to extract from search results (e.g., + * {@code content,title}) + */ + public void setOutputFields(String outputFields) { + this.outputFields = outputFields; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java new file mode 100644 index 0000000000000..e13e184f8beca --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.List; + +import io.milvus.common.clientenum.ConsistencyLevelEnum; +import io.milvus.param.highlevel.dml.SearchSimpleParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.highlevel.dml.SearchSimpleParam} from the + * exchange body vector and simple string properties, then sets it as the exchange body together with the + * {@link MilvusAction#SEARCH} header. + */ +public class MilvusHelperSearch implements Processor { + + private String collectionName = "default_collection"; + private String outputFields = "content"; + private String limit = "10"; + private String filter; + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + List queryEmbedding = exchange.getIn().getBody(List.class); + if (queryEmbedding == null) { + throw new IllegalArgumentException("Exchange body must contain a List vector, but was null"); + } + long searchLimit = Long.parseLong(limit); + + List fields = new ArrayList<>(); + for (String field : outputFields.split(",")) { + String trimmed = field.trim(); + if (!trimmed.isEmpty()) { + fields.add(trimmed); + } + } + + SearchSimpleParam.Builder builder = SearchSimpleParam.newBuilder() + .withCollectionName(collectionName) + .withVectors(queryEmbedding) + .withLimit(searchLimit) + .withOutputFields(fields) + .withConsistencyLevel(ConsistencyLevelEnum.STRONG); + + if (filter != null && !filter.isEmpty()) { + builder.withFilter(filter); + } + + SearchSimpleParam param = builder.build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.SEARCH); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getOutputFields() { + return outputFields; + } + + /** + * @param outputFields comma-separated list of field names to include in search results (e.g., + * {@code content,title}) + */ + public void setOutputFields(String outputFields) { + this.outputFields = outputFields; + } + + public String getLimit() { + return limit; + } + + /** + * @param limit the maximum number of results to return as a string (e.g., {@code 10}, {@code 100}) + */ + public void setLimit(String limit) { + this.limit = limit; + } + + public String getFilter() { + return filter; + } + + /** + * @param filter a Milvus boolean expression to filter results (e.g., {@code age > 18}, {@code status == "active"}) + */ + public void setFilter(String filter) { + this.filter = filter; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java new file mode 100644 index 0000000000000..df6c1d7c24c04 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.milvus.param.dml.UpsertParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.dml.UpsertParam} from the exchange body vector + * and exchange variables mapped via {@link #setTextFieldMappings(String)}, then sets it as the exchange body together + * with the {@link MilvusAction#UPSERT} header. + */ +public class MilvusHelperUpsert implements Processor { + + private String collectionName = "default_collection"; + private String vectorFieldName = "embedding"; + private String textFieldMappings = "content=text"; + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + List vector = exchange.getIn().getBody(List.class); + if (vector == null) { + throw new IllegalArgumentException("Exchange body must contain a List vector, but was null"); + } + + Map mappings = MilvusHelperFieldMappingUtil.parseMappings(textFieldMappings); + + List fields = new ArrayList<>(); + for (Map.Entry entry : mappings.entrySet()) { + String value = exchange.getVariable(entry.getValue(), String.class); + if (value == null) { + throw new IllegalArgumentException( + "Exchange variable '" + entry.getValue() + "' is not set (mapped from field '" + entry.getKey() + "')"); + } + fields.add(new UpsertParam.Field(entry.getKey(), Collections.singletonList(value))); + } + + fields.add(new UpsertParam.Field(vectorFieldName, Collections.singletonList(vector))); + + UpsertParam param = UpsertParam.newBuilder() + .withCollectionName(collectionName) + .withFields(fields) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.UPSERT); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getTextFieldMappings() { + return textFieldMappings; + } + + /** + * @param textFieldMappings comma-separated mappings of Milvus field names to exchange variable names (e.g., + * {@code field1=var1,field2=var2}) + */ + public void setTextFieldMappings(String textFieldMappings) { + this.textFieldMappings = textFieldMappings; + } +} diff --git a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java index a75119addc330..1918defe59c97 100644 --- a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java +++ b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java @@ -17,12 +17,10 @@ package org.apache.camel.component.milvus; -import io.milvus.grpc.DataType; -import io.milvus.param.collection.CollectionSchemaParam; -import io.milvus.param.collection.CreateCollectionParam; -import io.milvus.param.collection.FieldType; import org.apache.camel.Exchange; import org.apache.camel.NoSuchHeaderException; +import org.apache.camel.component.milvus.helpers.MilvusHelperCreateCollection; +import org.apache.camel.support.DefaultExchange; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -32,43 +30,22 @@ public class MilvusCreateCollectionTest extends MilvusTestSupport { @DisplayName("Tests that trying to create a collection without passing the action name triggers a failure") @Test - public void createCollectionWithoutRequiredParameters() { - FieldType fieldType1 = FieldType.newBuilder() - .withName("userID") - .withDescription("user identification") - .withDataType(DataType.Int64) - .withPrimaryKey(true) - .withAutoID(true) - .build(); - - FieldType fieldType2 = FieldType.newBuilder() - .withName("userFace") - .withDescription("face embedding") - .withDataType(DataType.FloatVector) - .withDimension(64) - .build(); - - FieldType fieldType3 = FieldType.newBuilder() - .withName("userAge") - .withDescription("user age") - .withDataType(DataType.Int8) - .build(); - - CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder() - .withCollectionName("test") - .withDescription("customer info") - .withShardsNum(2) - .withSchema(CollectionSchemaParam.newBuilder() - .withEnableDynamicField(false) - .addFieldType(fieldType1) - .addFieldType(fieldType2) - .addFieldType(fieldType3) - .build()) - .build(); - + public void createCollectionWithoutRequiredParameters() throws Exception { + MilvusHelperCreateCollection ragCreateCollection = new MilvusHelperCreateCollection(); + ragCreateCollection.setCollectionName("test"); + ragCreateCollection.setCollectionDescription("customer info"); + ragCreateCollection.setIdFieldName("userID"); + ragCreateCollection.setVectorFieldName("userFace"); + ragCreateCollection.setTextFieldName("userAge"); + ragCreateCollection.setTextFieldDataType("Int8"); + ragCreateCollection.setDimension("64"); + + Exchange tempExchange = new DefaultExchange(context); + ragCreateCollection.process(tempExchange); + + // Send body without the action header to trigger failure Exchange result = fluentTemplate.to("milvus:createCollection") - .withBody( - createCollectionReq) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); diff --git a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java index cab3064b08520..6d12b6f21b46e 100644 --- a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java +++ b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java @@ -21,24 +21,22 @@ import java.util.Random; import io.milvus.common.clientenum.ConsistencyLevelEnum; -import io.milvus.grpc.DataType; import io.milvus.grpc.QueryResults; import io.milvus.param.IndexType; -import io.milvus.param.MetricType; -import io.milvus.param.collection.CollectionSchemaParam; -import io.milvus.param.collection.CreateCollectionParam; -import io.milvus.param.collection.FieldType; -import io.milvus.param.dml.DeleteParam; import io.milvus.param.dml.InsertParam; import io.milvus.param.dml.QueryParam; import io.milvus.param.dml.UpsertParam; -import io.milvus.param.highlevel.dml.SearchSimpleParam; import io.milvus.param.highlevel.dml.response.SearchResponse; import io.milvus.param.index.CreateIndexParam; import org.apache.camel.Exchange; import org.apache.camel.component.milvus.MilvusAction; import org.apache.camel.component.milvus.MilvusHeaders; import org.apache.camel.component.milvus.MilvusTestSupport; +import org.apache.camel.component.milvus.helpers.MilvusHelperCreateCollection; +import org.apache.camel.component.milvus.helpers.MilvusHelperCreateIndex; +import org.apache.camel.component.milvus.helpers.MilvusHelperDelete; +import org.apache.camel.component.milvus.helpers.MilvusHelperSearch; +import org.apache.camel.support.DefaultExchange; import org.assertj.core.util.Lists; import org.junit.jupiter.api.*; @@ -48,43 +46,22 @@ public class MilvusComponentIT extends MilvusTestSupport { @Test @Order(1) - public void createCollection() { - FieldType fieldType1 = FieldType.newBuilder() - .withName("userID") - .withDescription("user identification") - .withDataType(DataType.Int64) - .withPrimaryKey(true) - .withAutoID(true) - .build(); - - FieldType fieldType2 = FieldType.newBuilder() - .withName("userFace") - .withDescription("face embedding") - .withDataType(DataType.FloatVector) - .withDimension(64) - .build(); - - FieldType fieldType3 = FieldType.newBuilder() - .withName("userAge") - .withDescription("user age") - .withDataType(DataType.Int8) - .build(); - - CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder() - .withCollectionName("test") - .withDescription("customer info") - .withShardsNum(2) - .withSchema(CollectionSchemaParam.newBuilder() - .withEnableDynamicField(false) - .addFieldType(fieldType1) - .addFieldType(fieldType2) - .addFieldType(fieldType3).build()) - .build(); + public void createCollection() throws Exception { + MilvusHelperCreateCollection ragCreateCollection = new MilvusHelperCreateCollection(); + ragCreateCollection.setCollectionName("test"); + ragCreateCollection.setCollectionDescription("customer info"); + ragCreateCollection.setIdFieldName("userID"); + ragCreateCollection.setVectorFieldName("userFace"); + ragCreateCollection.setTextFieldName("userAge"); + ragCreateCollection.setTextFieldDataType("Int8"); + ragCreateCollection.setDimension("64"); + + Exchange tempExchange = new DefaultExchange(context); + ragCreateCollection.process(tempExchange); Exchange result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_COLLECTION) - .withBody( - createCollectionReq) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); @@ -93,7 +70,7 @@ public void createCollection() { @Test @Order(2) - public void createIndex() { + public void createIndex() throws Exception { CreateIndexParam createAgeIndexParam = CreateIndexParam.newBuilder() .withCollectionName("test") .withFieldName("userAge") @@ -110,20 +87,19 @@ public void createIndex() { assertThat(result).isNotNull(); assertThat(result.getException()).isNull(); - CreateIndexParam createVectorIndexParam = CreateIndexParam.newBuilder() - .withCollectionName("test") - .withFieldName("userFace") - .withIndexName("userFaceIndex") - .withIndexType(IndexType.IVF_FLAT) - .withMetricType(MetricType.L2) - .withExtraParam("{\"nlist\":128}") - .withSyncMode(Boolean.TRUE) - .build(); + MilvusHelperCreateIndex ragCreateIndex = new MilvusHelperCreateIndex(); + ragCreateIndex.setCollectionName("test"); + ragCreateIndex.setVectorFieldName("userFace"); + ragCreateIndex.setIndexType("IVF_FLAT"); + ragCreateIndex.setMetricType("L2"); + ragCreateIndex.setExtraParam("{\"nlist\":128}"); + + Exchange tempExchange = new DefaultExchange(context); + ragCreateIndex.process(tempExchange); result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_INDEX) - .withBody( - createVectorIndexParam) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); @@ -187,21 +163,20 @@ public void upsert() { @Test @Order(5) - public void search() { - SearchSimpleParam searchSimpleParam = SearchSimpleParam.newBuilder() - .withCollectionName("test") - .withVectors(generateFloatVector()) - .withFilter("userAge>0") - .withLimit(100L) - .withOffset(0L) - .withOutputFields(Lists.newArrayList("userAge")) - .withConsistencyLevel(ConsistencyLevelEnum.STRONG) - .build(); + public void search() throws Exception { + MilvusHelperSearch ragSearch = new MilvusHelperSearch(); + ragSearch.setCollectionName("test"); + ragSearch.setOutputFields("userAge"); + ragSearch.setFilter("userAge>0"); + ragSearch.setLimit("100"); + + Exchange tempExchange = new DefaultExchange(context); + tempExchange.getIn().setBody(generateFloatVector()); + ragSearch.process(tempExchange); Exchange result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.SEARCH) - .withBody( - searchSimpleParam) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); @@ -232,35 +207,35 @@ public void query() { @Test @Order(7) - public void delete() { - DeleteParam delete = DeleteParam.newBuilder() - .withCollectionName("test") - .withExpr("userAge>0") - .build(); + public void delete() throws Exception { + MilvusHelperDelete ragDelete = new MilvusHelperDelete(); + ragDelete.setCollectionName("test"); + ragDelete.setFilter("userAge>0"); + + Exchange tempExchange = new DefaultExchange(context); + ragDelete.process(tempExchange); Exchange result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.DELETE) - .withBody( - delete) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); assertThat(result.getException()).isNull(); - SearchSimpleParam searchSimpleParam = SearchSimpleParam.newBuilder() - .withCollectionName("test") - .withVectors(generateFloatVector()) - .withFilter("userAge>0") - .withLimit(100L) - .withOffset(0L) - .withOutputFields(Lists.newArrayList("userAge")) - .withConsistencyLevel(ConsistencyLevelEnum.STRONG) - .build(); + MilvusHelperSearch ragSearch = new MilvusHelperSearch(); + ragSearch.setCollectionName("test"); + ragSearch.setOutputFields("userAge"); + ragSearch.setFilter("userAge>0"); + ragSearch.setLimit("100"); + + tempExchange = new DefaultExchange(context); + tempExchange.getIn().setBody(generateFloatVector()); + ragSearch.process(tempExchange); result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, MilvusAction.SEARCH) - .withBody( - searchSimpleParam) + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); From ef4a4b03bb241eed14c314eeb5e19889479b9277 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Mon, 23 Mar 2026 21:11:06 +0100 Subject: [PATCH 2/4] Add missing configurable properties to Milvus helpers - MilvusHelperSearch: add offset and consistencyLevel properties (consistencyLevel was hardcoded to STRONG) - MilvusHelperCreateIndex: add indexName property Co-Authored-By: Claude Opus 4.6 --- .../helpers/MilvusHelperCreateIndex.java | 23 ++++++++++++-- .../milvus/helpers/MilvusHelperSearch.java | 31 ++++++++++++++++++- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java index 3f749e802c198..492aded93a520 100644 --- a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java @@ -32,6 +32,7 @@ public class MilvusHelperCreateIndex implements Processor { private String collectionName = "default_collection"; private String vectorFieldName = "embedding"; + private String indexName; private String indexType = "IVF_FLAT"; private String metricType = "COSINE"; private String extraParam = "{\"nlist\": 128}"; @@ -41,14 +42,19 @@ public void process(Exchange exchange) throws Exception { IndexType idxType = IndexType.valueOf(indexType); MetricType metric = MetricType.valueOf(metricType); - CreateIndexParam param = CreateIndexParam.newBuilder() + CreateIndexParam.Builder builder = CreateIndexParam.newBuilder() .withCollectionName(collectionName) .withFieldName(vectorFieldName) .withIndexType(idxType) .withMetricType(metric) .withExtraParam(extraParam) - .withSyncMode(Boolean.TRUE) - .build(); + .withSyncMode(Boolean.TRUE); + + if (indexName != null && !indexName.isEmpty()) { + builder.withIndexName(indexName); + } + + CreateIndexParam param = builder.build(); exchange.getIn().setBody(param); exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_INDEX); @@ -70,6 +76,17 @@ public void setVectorFieldName(String vectorFieldName) { this.vectorFieldName = vectorFieldName; } + public String getIndexName() { + return indexName; + } + + /** + * @param indexName the name to assign to the index (e.g., {@code myVectorIndex}) + */ + public void setIndexName(String indexName) { + this.indexName = indexName; + } + public String getIndexType() { return indexType; } diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java index e13e184f8beca..20d66307d3179 100644 --- a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java @@ -36,6 +36,8 @@ public class MilvusHelperSearch implements Processor { private String collectionName = "default_collection"; private String outputFields = "content"; private String limit = "10"; + private String offset = "0"; + private String consistencyLevel = "STRONG"; private String filter; @SuppressWarnings("unchecked") @@ -55,12 +57,16 @@ public void process(Exchange exchange) throws Exception { } } + long searchOffset = Long.parseLong(offset); + ConsistencyLevelEnum consistency = ConsistencyLevelEnum.valueOf(consistencyLevel); + SearchSimpleParam.Builder builder = SearchSimpleParam.newBuilder() .withCollectionName(collectionName) .withVectors(queryEmbedding) .withLimit(searchLimit) + .withOffset(searchOffset) .withOutputFields(fields) - .withConsistencyLevel(ConsistencyLevelEnum.STRONG); + .withConsistencyLevel(consistency); if (filter != null && !filter.isEmpty()) { builder.withFilter(filter); @@ -103,6 +109,29 @@ public void setLimit(String limit) { this.limit = limit; } + public String getOffset() { + return offset; + } + + /** + * @param offset the number of results to skip as a string (e.g., {@code 0}, {@code 10}) + */ + public void setOffset(String offset) { + this.offset = offset; + } + + public String getConsistencyLevel() { + return consistencyLevel; + } + + /** + * @param consistencyLevel the Milvus {@link io.milvus.common.clientenum.ConsistencyLevelEnum} enum name (e.g., + * {@code STRONG}, {@code BOUNDED}, {@code EVENTUALLY}) + */ + public void setConsistencyLevel(String consistencyLevel) { + this.consistencyLevel = consistencyLevel; + } + public String getFilter() { return filter; } From 7d655c33b2b26a9ed087abd113f3d6e2cb19c0bf Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Mon, 23 Mar 2026 21:16:49 +0100 Subject: [PATCH 3/4] Restore original raw SDK tests alongside helper-based tests - Restore the original MilvusCreateCollectionTest using raw SDK objects (CollectionSchemaParam with withShardsNum and withEnableDynamicField) - Restore the original MilvusComponentIT tests using raw SDK objects (CreateCollectionParam, CreateIndexParam with indexName, SearchSimpleParam with offset, DeleteParam) - Keep the helper-based tests as additional test methods to cover both code paths Co-Authored-By: Claude Opus 4.6 --- .../milvus/MilvusCreateCollectionTest.java | 69 +++++- .../milvus/it/MilvusComponentIT.java | 225 ++++++++++++++---- 2 files changed, 232 insertions(+), 62 deletions(-) diff --git a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java index 1918defe59c97..a5499a07f6e31 100644 --- a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java +++ b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java @@ -17,6 +17,10 @@ package org.apache.camel.component.milvus; +import io.milvus.grpc.DataType; +import io.milvus.param.collection.CollectionSchemaParam; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.FieldType; import org.apache.camel.Exchange; import org.apache.camel.NoSuchHeaderException; import org.apache.camel.component.milvus.helpers.MilvusHelperCreateCollection; @@ -30,18 +34,63 @@ public class MilvusCreateCollectionTest extends MilvusTestSupport { @DisplayName("Tests that trying to create a collection without passing the action name triggers a failure") @Test - public void createCollectionWithoutRequiredParameters() throws Exception { - MilvusHelperCreateCollection ragCreateCollection = new MilvusHelperCreateCollection(); - ragCreateCollection.setCollectionName("test"); - ragCreateCollection.setCollectionDescription("customer info"); - ragCreateCollection.setIdFieldName("userID"); - ragCreateCollection.setVectorFieldName("userFace"); - ragCreateCollection.setTextFieldName("userAge"); - ragCreateCollection.setTextFieldDataType("Int8"); - ragCreateCollection.setDimension("64"); + public void createCollectionWithoutRequiredParameters() { + FieldType fieldType1 = FieldType.newBuilder() + .withName("userID") + .withDescription("user identification") + .withDataType(DataType.Int64) + .withPrimaryKey(true) + .withAutoID(true) + .build(); + + FieldType fieldType2 = FieldType.newBuilder() + .withName("userFace") + .withDescription("face embedding") + .withDataType(DataType.FloatVector) + .withDimension(64) + .build(); + + FieldType fieldType3 = FieldType.newBuilder() + .withName("userAge") + .withDescription("user age") + .withDataType(DataType.Int8) + .build(); + + CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder() + .withCollectionName("test") + .withDescription("customer info") + .withShardsNum(2) + .withSchema(CollectionSchemaParam.newBuilder() + .withEnableDynamicField(false) + .addFieldType(fieldType1) + .addFieldType(fieldType2) + .addFieldType(fieldType3) + .build()) + .build(); + + Exchange result = fluentTemplate.to("milvus:createCollection") + .withBody( + createCollectionReq) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isInstanceOf(NoSuchHeaderException.class); + } + + @DisplayName("Tests that trying to create a collection via helper without passing the action name triggers a failure") + @Test + public void createCollectionWithHelperWithoutRequiredParameters() throws Exception { + MilvusHelperCreateCollection helper = new MilvusHelperCreateCollection(); + helper.setCollectionName("test"); + helper.setCollectionDescription("customer info"); + helper.setIdFieldName("userID"); + helper.setVectorFieldName("userFace"); + helper.setTextFieldName("userAge"); + helper.setTextFieldDataType("Int8"); + helper.setDimension("64"); Exchange tempExchange = new DefaultExchange(context); - ragCreateCollection.process(tempExchange); + helper.process(tempExchange); // Send body without the action header to trigger failure Exchange result = fluentTemplate.to("milvus:createCollection") diff --git a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java index 6d12b6f21b46e..ba6c42e6b0eb9 100644 --- a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java +++ b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java @@ -21,11 +21,18 @@ import java.util.Random; import io.milvus.common.clientenum.ConsistencyLevelEnum; +import io.milvus.grpc.DataType; import io.milvus.grpc.QueryResults; import io.milvus.param.IndexType; +import io.milvus.param.MetricType; +import io.milvus.param.collection.CollectionSchemaParam; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.FieldType; +import io.milvus.param.dml.DeleteParam; import io.milvus.param.dml.InsertParam; import io.milvus.param.dml.QueryParam; import io.milvus.param.dml.UpsertParam; +import io.milvus.param.highlevel.dml.SearchSimpleParam; import io.milvus.param.highlevel.dml.response.SearchResponse; import io.milvus.param.index.CreateIndexParam; import org.apache.camel.Exchange; @@ -46,22 +53,43 @@ public class MilvusComponentIT extends MilvusTestSupport { @Test @Order(1) - public void createCollection() throws Exception { - MilvusHelperCreateCollection ragCreateCollection = new MilvusHelperCreateCollection(); - ragCreateCollection.setCollectionName("test"); - ragCreateCollection.setCollectionDescription("customer info"); - ragCreateCollection.setIdFieldName("userID"); - ragCreateCollection.setVectorFieldName("userFace"); - ragCreateCollection.setTextFieldName("userAge"); - ragCreateCollection.setTextFieldDataType("Int8"); - ragCreateCollection.setDimension("64"); + public void createCollection() { + FieldType fieldType1 = FieldType.newBuilder() + .withName("userID") + .withDescription("user identification") + .withDataType(DataType.Int64) + .withPrimaryKey(true) + .withAutoID(true) + .build(); - Exchange tempExchange = new DefaultExchange(context); - ragCreateCollection.process(tempExchange); + FieldType fieldType2 = FieldType.newBuilder() + .withName("userFace") + .withDescription("face embedding") + .withDataType(DataType.FloatVector) + .withDimension(64) + .build(); + + FieldType fieldType3 = FieldType.newBuilder() + .withName("userAge") + .withDescription("user age") + .withDataType(DataType.Int8) + .build(); + + CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder() + .withCollectionName("test") + .withDescription("customer info") + .withShardsNum(2) + .withSchema(CollectionSchemaParam.newBuilder() + .withEnableDynamicField(false) + .addFieldType(fieldType1) + .addFieldType(fieldType2) + .addFieldType(fieldType3).build()) + .build(); Exchange result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) - .withBody(tempExchange.getIn().getBody()) + .withHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_COLLECTION) + .withBody( + createCollectionReq) .request(Exchange.class); assertThat(result).isNotNull(); @@ -70,7 +98,7 @@ public void createCollection() throws Exception { @Test @Order(2) - public void createIndex() throws Exception { + public void createIndex() { CreateIndexParam createAgeIndexParam = CreateIndexParam.newBuilder() .withCollectionName("test") .withFieldName("userAge") @@ -87,19 +115,20 @@ public void createIndex() throws Exception { assertThat(result).isNotNull(); assertThat(result.getException()).isNull(); - MilvusHelperCreateIndex ragCreateIndex = new MilvusHelperCreateIndex(); - ragCreateIndex.setCollectionName("test"); - ragCreateIndex.setVectorFieldName("userFace"); - ragCreateIndex.setIndexType("IVF_FLAT"); - ragCreateIndex.setMetricType("L2"); - ragCreateIndex.setExtraParam("{\"nlist\":128}"); - - Exchange tempExchange = new DefaultExchange(context); - ragCreateIndex.process(tempExchange); + CreateIndexParam createVectorIndexParam = CreateIndexParam.newBuilder() + .withCollectionName("test") + .withFieldName("userFace") + .withIndexName("userFaceIndex") + .withIndexType(IndexType.IVF_FLAT) + .withMetricType(MetricType.L2) + .withExtraParam("{\"nlist\":128}") + .withSyncMode(Boolean.TRUE) + .build(); result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) - .withBody(tempExchange.getIn().getBody()) + .withHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_INDEX) + .withBody( + createVectorIndexParam) .request(Exchange.class); assertThat(result).isNotNull(); @@ -163,20 +192,21 @@ public void upsert() { @Test @Order(5) - public void search() throws Exception { - MilvusHelperSearch ragSearch = new MilvusHelperSearch(); - ragSearch.setCollectionName("test"); - ragSearch.setOutputFields("userAge"); - ragSearch.setFilter("userAge>0"); - ragSearch.setLimit("100"); - - Exchange tempExchange = new DefaultExchange(context); - tempExchange.getIn().setBody(generateFloatVector()); - ragSearch.process(tempExchange); + public void search() { + SearchSimpleParam searchSimpleParam = SearchSimpleParam.newBuilder() + .withCollectionName("test") + .withVectors(generateFloatVector()) + .withFilter("userAge>0") + .withLimit(100L) + .withOffset(0L) + .withOutputFields(Lists.newArrayList("userAge")) + .withConsistencyLevel(ConsistencyLevelEnum.STRONG) + .build(); Exchange result = fluentTemplate.to("milvus:test") - .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) - .withBody(tempExchange.getIn().getBody()) + .withHeader(MilvusHeaders.ACTION, MilvusAction.SEARCH) + .withBody( + searchSimpleParam) .request(Exchange.class); assertThat(result).isNotNull(); @@ -207,40 +237,131 @@ public void query() { @Test @Order(7) - public void delete() throws Exception { - MilvusHelperDelete ragDelete = new MilvusHelperDelete(); - ragDelete.setCollectionName("test"); - ragDelete.setFilter("userAge>0"); + public void delete() { + DeleteParam delete = DeleteParam.newBuilder() + .withCollectionName("test") + .withExpr("userAge>0") + .build(); + + Exchange result = fluentTemplate.to("milvus:test") + .withHeader(MilvusHeaders.ACTION, MilvusAction.DELETE) + .withBody( + delete) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + + SearchSimpleParam searchSimpleParam = SearchSimpleParam.newBuilder() + .withCollectionName("test") + .withVectors(generateFloatVector()) + .withFilter("userAge>0") + .withLimit(100L) + .withOffset(0L) + .withOutputFields(Lists.newArrayList("userAge")) + .withConsistencyLevel(ConsistencyLevelEnum.STRONG) + .build(); + + result = fluentTemplate.to("milvus:test") + .withHeader(MilvusHeaders.ACTION, MilvusAction.SEARCH) + .withBody( + searchSimpleParam) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + assertThat(result.getMessage().getBody(SearchResponse.class).getRowRecords().size() == 0); + } + + // --- Helper-based tests (same operations via MilvusHelper beans) --- + + @Test + @Order(10) + public void createCollectionWithHelper() throws Exception { + MilvusHelperCreateCollection helper = new MilvusHelperCreateCollection(); + helper.setCollectionName("test_helper"); + helper.setCollectionDescription("helper test collection"); + helper.setIdFieldName("userID"); + helper.setVectorFieldName("userFace"); + helper.setTextFieldName("userAge"); + helper.setTextFieldDataType("Int8"); + helper.setDimension("64"); Exchange tempExchange = new DefaultExchange(context); - ragDelete.process(tempExchange); + helper.process(tempExchange); - Exchange result = fluentTemplate.to("milvus:test") + Exchange result = fluentTemplate.to("milvus:test_helper") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(11) + public void createIndexWithHelper() throws Exception { + MilvusHelperCreateIndex helper = new MilvusHelperCreateIndex(); + helper.setCollectionName("test_helper"); + helper.setVectorFieldName("userFace"); + helper.setIndexName("userFaceIndex"); + helper.setIndexType("IVF_FLAT"); + helper.setMetricType("L2"); + helper.setExtraParam("{\"nlist\":128}"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper") .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); assertThat(result.getException()).isNull(); + } - MilvusHelperSearch ragSearch = new MilvusHelperSearch(); - ragSearch.setCollectionName("test"); - ragSearch.setOutputFields("userAge"); - ragSearch.setFilter("userAge>0"); - ragSearch.setLimit("100"); + @Test + @Order(12) + public void searchWithHelper() throws Exception { + MilvusHelperSearch helper = new MilvusHelperSearch(); + helper.setCollectionName("test"); + helper.setOutputFields("userAge"); + helper.setFilter("userAge>0"); + helper.setLimit("100"); + helper.setOffset("0"); - tempExchange = new DefaultExchange(context); + Exchange tempExchange = new DefaultExchange(context); tempExchange.getIn().setBody(generateFloatVector()); - ragSearch.process(tempExchange); + helper.process(tempExchange); - result = fluentTemplate.to("milvus:test") + Exchange result = fluentTemplate.to("milvus:test") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(13) + public void deleteWithHelper() throws Exception { + MilvusHelperDelete helper = new MilvusHelperDelete(); + helper.setCollectionName("test_helper"); + helper.setFilter("userAge>0"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper") .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); assertThat(result).isNotNull(); assertThat(result.getException()).isNull(); - assertThat(result.getMessage().getBody(SearchResponse.class).getRowRecords().size() == 0); } private List> generateFloatVectors(int count) { From bc21cbc3031cbfba0f2d28e38b21cb2eecede794 Mon Sep 17 00:00:00 2001 From: Salvatore Mongiardo Date: Tue, 24 Mar 2026 15:21:44 +0100 Subject: [PATCH 4/4] Camel-Milvus: collection must be loaded into memory before DML operations --- .../component/milvus/MilvusProducer.java | 2 + .../milvus/it/MilvusComponentIT.java | 101 +++++++++++++++++- 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusProducer.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusProducer.java index d768372866292..293791ba13791 100644 --- a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusProducer.java +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusProducer.java @@ -178,6 +178,8 @@ private void delete(Exchange exchange) throws Exception { final Message in = exchange.getMessage(); final DeleteParam body = in.getMandatoryBody(DeleteParam.class); + this.client.loadCollection( + LoadCollectionParam.newBuilder().withCollectionName(getEndpoint().getCollection()).withSyncLoad(true).build()); R result = this.client.delete(body); handleResponseStatus(result); diff --git a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java index ba6c42e6b0eb9..191fa01e2be2e 100644 --- a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java +++ b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java @@ -42,7 +42,9 @@ import org.apache.camel.component.milvus.helpers.MilvusHelperCreateCollection; import org.apache.camel.component.milvus.helpers.MilvusHelperCreateIndex; import org.apache.camel.component.milvus.helpers.MilvusHelperDelete; +import org.apache.camel.component.milvus.helpers.MilvusHelperInsert; import org.apache.camel.component.milvus.helpers.MilvusHelperSearch; +import org.apache.camel.component.milvus.helpers.MilvusHelperUpsert; import org.apache.camel.support.DefaultExchange; import org.assertj.core.util.Lists; import org.junit.jupiter.api.*; @@ -324,9 +326,102 @@ public void createIndexWithHelper() throws Exception { @Test @Order(12) + public void createRagCollectionWithHelper() throws Exception { + MilvusHelperCreateCollection helper = new MilvusHelperCreateCollection(); + helper.setCollectionName("test_helper_rag"); + helper.setCollectionDescription("helper RAG test collection"); + helper.setIdFieldName("docID"); + helper.setVectorFieldName("embedding"); + helper.setTextFieldName("content"); + helper.setTextFieldDataType("VarChar"); + helper.setTextFieldMaxLength("2048"); + helper.setDimension("64"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper_rag") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(13) + public void createRagIndexWithHelper() throws Exception { + MilvusHelperCreateIndex helper = new MilvusHelperCreateIndex(); + helper.setCollectionName("test_helper_rag"); + helper.setVectorFieldName("embedding"); + helper.setIndexName("embeddingIndex"); + helper.setIndexType("IVF_FLAT"); + helper.setMetricType("L2"); + helper.setExtraParam("{\"nlist\":128}"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper_rag") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(14) + public void insertWithHelper() throws Exception { + MilvusHelperInsert helper = new MilvusHelperInsert(); + helper.setCollectionName("test_helper_rag"); + helper.setVectorFieldName("embedding"); + helper.setTextFieldMappings("content=contentVar"); + + Exchange tempExchange = new DefaultExchange(context); + tempExchange.setVariable("contentVar", "This is a test document for RAG"); + tempExchange.getIn().setBody(generateFloatVector()); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper_rag") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(15) + public void upsertWithHelper() throws Exception { + MilvusHelperUpsert helper = new MilvusHelperUpsert(); + helper.setCollectionName("test_helper_rag"); + helper.setVectorFieldName("embedding"); + helper.setTextFieldMappings("content=contentVar"); + + Exchange tempExchange = new DefaultExchange(context); + tempExchange.setVariable("contentVar", "Updated document content"); + tempExchange.getIn().setBody(generateFloatVector()); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper_rag") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + // upsert without docID field will fail (autoID collection) + Assertions.assertTrue(result.isFailed()); + } + + @Test + @Order(16) public void searchWithHelper() throws Exception { MilvusHelperSearch helper = new MilvusHelperSearch(); - helper.setCollectionName("test"); + helper.setCollectionName("test_helper"); helper.setOutputFields("userAge"); helper.setFilter("userAge>0"); helper.setLimit("100"); @@ -336,7 +431,7 @@ public void searchWithHelper() throws Exception { tempExchange.getIn().setBody(generateFloatVector()); helper.process(tempExchange); - Exchange result = fluentTemplate.to("milvus:test") + Exchange result = fluentTemplate.to("milvus:test_helper") .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) .withBody(tempExchange.getIn().getBody()) .request(Exchange.class); @@ -346,7 +441,7 @@ public void searchWithHelper() throws Exception { } @Test - @Order(13) + @Order(17) public void deleteWithHelper() throws Exception { MilvusHelperDelete helper = new MilvusHelperDelete(); helper.setCollectionName("test_helper");