diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusProducer.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusProducer.java index d768372866292..293791ba13791 100644 --- a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusProducer.java +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/MilvusProducer.java @@ -178,6 +178,8 @@ private void delete(Exchange exchange) throws Exception { final Message in = exchange.getMessage(); final DeleteParam body = in.getMandatoryBody(DeleteParam.class); + this.client.loadCollection( + LoadCollectionParam.newBuilder().withCollectionName(getEndpoint().getCollection()).withSyncLoad(true).build()); R result = this.client.delete(body); handleResponseStatus(result); diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java new file mode 100644 index 0000000000000..19f81db370086 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateCollection.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import io.milvus.grpc.DataType; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.FieldType; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.collection.CreateCollectionParam} from simple + * string properties and sets it as the exchange body together with the {@link MilvusAction#CREATE_COLLECTION} header. + */ +public class MilvusHelperCreateCollection implements Processor { + + private String collectionName = "default_collection"; + private String collectionDescription = "Default collection"; + private String idFieldName = "id"; + private String dimension = "768"; + private String textFieldName = "content"; + private String textFieldDataType = "VarChar"; + private String vectorFieldName = "embedding"; + private String vectorDataType = "FloatVector"; + private String textFieldMaxLength = "2048"; + private String additionalTextFields; + + @Override + public void process(Exchange exchange) throws Exception { + int vectorDim = Integer.parseInt(dimension); + int maxLength = Integer.parseInt(textFieldMaxLength); + DataType textDataType = DataType.valueOf(textFieldDataType); + + FieldType idField = FieldType.newBuilder() + .withName(idFieldName) + .withDataType(DataType.Int64) + .withPrimaryKey(true) + .withAutoID(true) + .build(); + + FieldType.Builder textFieldBuilder = FieldType.newBuilder() + .withName(textFieldName) + .withDataType(textDataType); + if (textDataType == DataType.VarChar) { + textFieldBuilder.withMaxLength(maxLength); + } + FieldType textField = textFieldBuilder.build(); + + DataType vecDataType = DataType.valueOf(vectorDataType); + + FieldType vectorField = FieldType.newBuilder() + .withName(vectorFieldName) + .withDataType(vecDataType) + .withDimension(vectorDim) + .build(); + + CreateCollectionParam.Builder builder = CreateCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withDescription(collectionDescription) + .addFieldType(idField) + .addFieldType(textField); + + if (additionalTextFields != null && !additionalTextFields.isBlank()) { + for (String fieldName : additionalTextFields.split(",")) { + String trimmed = fieldName.trim(); + if (!trimmed.isEmpty()) { + FieldType extraField = FieldType.newBuilder() + .withName(trimmed) + .withDataType(DataType.VarChar) + .withMaxLength(maxLength) + .build(); + builder.addFieldType(extraField); + } + } + } + + builder.addFieldType(vectorField); + + exchange.getIn().setBody(builder.build()); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_COLLECTION); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getCollectionDescription() { + return collectionDescription; + } + + public void setCollectionDescription(String collectionDescription) { + this.collectionDescription = collectionDescription; + } + + public String getIdFieldName() { + return idFieldName; + } + + public void setIdFieldName(String idFieldName) { + this.idFieldName = idFieldName; + } + + public String getDimension() { + return dimension; + } + + /** + * @param dimension the vector dimension as a string (e.g., {@code 768}, {@code 1536}) + */ + public void setDimension(String dimension) { + this.dimension = dimension; + } + + public String getTextFieldName() { + return textFieldName; + } + + public void setTextFieldName(String textFieldName) { + this.textFieldName = textFieldName; + } + + public String getTextFieldDataType() { + return textFieldDataType; + } + + /** + * @param textFieldDataType the Milvus {@link io.milvus.grpc.DataType} enum name (e.g., {@code VarChar}, + * {@code Int8}) + */ + public void setTextFieldDataType(String textFieldDataType) { + this.textFieldDataType = textFieldDataType; + } + + public String getVectorDataType() { + return vectorDataType; + } + + /** + * @param vectorDataType the Milvus {@link io.milvus.grpc.DataType} enum name for the vector field (e.g., + * {@code FloatVector}, {@code BinaryVector}, {@code Float16Vector}) + */ + public void setVectorDataType(String vectorDataType) { + this.vectorDataType = vectorDataType; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getTextFieldMaxLength() { + return textFieldMaxLength; + } + + /** + * @param textFieldMaxLength the maximum length for VarChar fields as a string (e.g., {@code 2048}) + */ + public void setTextFieldMaxLength(String textFieldMaxLength) { + this.textFieldMaxLength = textFieldMaxLength; + } + + public String getAdditionalTextFields() { + return additionalTextFields; + } + + /** + * @param additionalTextFields comma-separated list of extra VarChar field names (e.g., {@code title,author}) + */ + public void setAdditionalTextFields(String additionalTextFields) { + this.additionalTextFields = additionalTextFields; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java new file mode 100644 index 0000000000000..492aded93a520 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperCreateIndex.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import io.milvus.param.IndexType; +import io.milvus.param.MetricType; +import io.milvus.param.index.CreateIndexParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.index.CreateIndexParam} from simple string + * properties and sets it as the exchange body together with the {@link MilvusAction#CREATE_INDEX} header. + */ +public class MilvusHelperCreateIndex implements Processor { + + private String collectionName = "default_collection"; + private String vectorFieldName = "embedding"; + private String indexName; + private String indexType = "IVF_FLAT"; + private String metricType = "COSINE"; + private String extraParam = "{\"nlist\": 128}"; + + @Override + public void process(Exchange exchange) throws Exception { + IndexType idxType = IndexType.valueOf(indexType); + MetricType metric = MetricType.valueOf(metricType); + + CreateIndexParam.Builder builder = CreateIndexParam.newBuilder() + .withCollectionName(collectionName) + .withFieldName(vectorFieldName) + .withIndexType(idxType) + .withMetricType(metric) + .withExtraParam(extraParam) + .withSyncMode(Boolean.TRUE); + + if (indexName != null && !indexName.isEmpty()) { + builder.withIndexName(indexName); + } + + CreateIndexParam param = builder.build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.CREATE_INDEX); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getIndexName() { + return indexName; + } + + /** + * @param indexName the name to assign to the index (e.g., {@code myVectorIndex}) + */ + public void setIndexName(String indexName) { + this.indexName = indexName; + } + + public String getIndexType() { + return indexType; + } + + /** + * @param indexType the Milvus {@link io.milvus.param.IndexType} enum name (e.g., {@code IVF_FLAT}, {@code HNSW}) + */ + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + public String getMetricType() { + return metricType; + } + + /** + * @param metricType the Milvus {@link io.milvus.param.MetricType} enum name (e.g., {@code COSINE}, {@code L2}, + * {@code IP}) + */ + public void setMetricType(String metricType) { + this.metricType = metricType; + } + + public String getExtraParam() { + return extraParam; + } + + /** + * @param extraParam JSON string with index-specific parameters (e.g., {@code {"nlist": 128}}) + */ + public void setExtraParam(String extraParam) { + this.extraParam = extraParam; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java new file mode 100644 index 0000000000000..6a37abaca3bf8 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperDelete.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import io.milvus.param.dml.DeleteParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.dml.DeleteParam} from simple string properties + * and sets it as the exchange body together with the {@link MilvusAction#DELETE} header. + */ +public class MilvusHelperDelete implements Processor { + + private String collectionName = "default_collection"; + private String filter; + + @Override + public void process(Exchange exchange) throws Exception { + if (filter == null || filter.isEmpty()) { + throw new IllegalArgumentException( + "A filter expression is required for delete operations (e.g., \"id in [1, 2, 3]\")"); + } + + DeleteParam param = DeleteParam.newBuilder() + .withCollectionName(collectionName) + .withExpr(filter) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.DELETE); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getFilter() { + return filter; + } + + /** + * @param filter a Milvus boolean expression to select entities to delete (e.g., {@code id in [1, 2, 3]}, + * {@code age > 18}) + */ + public void setFilter(String filter) { + this.filter = filter; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java new file mode 100644 index 0000000000000..0cadfc9558447 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperFieldMappingUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Shared utility for parsing {@code textFieldMappings} strings used by {@link MilvusHelperInsert} and + * {@link MilvusHelperUpsert}. + */ +final class MilvusHelperFieldMappingUtil { + + private MilvusHelperFieldMappingUtil() { + } + + /** + * Parses a comma-separated mapping string into an ordered map of field name to variable name. + * + * @param textFieldMappings the mappings string (e.g., {@code field1=var1,field2=var2}) + * @return an ordered map from field name to variable name + * @throws IllegalArgumentException if a mapping entry does not contain exactly one {@code =} separator + */ + static Map parseMappings(String textFieldMappings) { + Map result = new LinkedHashMap<>(); + for (String mapping : textFieldMappings.split(",")) { + String[] pair = mapping.trim().split("="); + if (pair.length != 2) { + throw new IllegalArgumentException( + "Invalid textFieldMappings entry: '" + mapping.trim() + + "'. Expected format: fieldName=variableName"); + } + result.put(pair[0].trim(), pair[1].trim()); + } + return result; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java new file mode 100644 index 0000000000000..3578425dc214b --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperInsert.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.milvus.param.dml.InsertParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.dml.InsertParam} from the exchange body vector + * and exchange variables mapped via {@link #setTextFieldMappings(String)}, then sets it as the exchange body together + * with the {@link MilvusAction#INSERT} header. + */ +public class MilvusHelperInsert implements Processor { + + private String collectionName = "default_collection"; + private String vectorFieldName = "embedding"; + private String textFieldMappings = "content=text"; + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + List vector = exchange.getIn().getBody(List.class); + if (vector == null) { + throw new IllegalArgumentException("Exchange body must contain a List vector, but was null"); + } + + Map mappings = MilvusHelperFieldMappingUtil.parseMappings(textFieldMappings); + + List fields = new ArrayList<>(); + for (Map.Entry entry : mappings.entrySet()) { + String value = exchange.getVariable(entry.getValue(), String.class); + if (value == null) { + throw new IllegalArgumentException( + "Exchange variable '" + entry.getValue() + "' is not set (mapped from field '" + entry.getKey() + "')"); + } + fields.add(new InsertParam.Field(entry.getKey(), Collections.singletonList(value))); + } + + fields.add(new InsertParam.Field(vectorFieldName, Collections.singletonList(vector))); + + InsertParam param = InsertParam.newBuilder() + .withCollectionName(collectionName) + .withFields(fields) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.INSERT); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getTextFieldMappings() { + return textFieldMappings; + } + + /** + * @param textFieldMappings comma-separated mappings of Milvus field names to exchange variable names (e.g., + * {@code field1=var1,field2=var2}) + */ + public void setTextFieldMappings(String textFieldMappings) { + this.textFieldMappings = textFieldMappings; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java new file mode 100644 index 0000000000000..f2831a9f3baea --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperResultExtractor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.milvus.param.highlevel.dml.response.SearchResponse; +import io.milvus.response.QueryResultsWrapper; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * A Camel {@link Processor} that extracts field values from a Milvus + * {@link io.milvus.param.highlevel.dml.response.SearchResponse} into a list of ranked maps. Each map contains a + * {@code rank} entry and the requested output field values. The extracted list is set as the exchange body. + */ +public class MilvusHelperResultExtractor implements Processor { + + private String outputFields = "content"; + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody(extract(exchange)); + } + + public List> extract(Exchange exchange) { + SearchResponse response = exchange.getIn().getBody(SearchResponse.class); + List> extracted = new ArrayList<>(); + + String[] fields = outputFields.split(","); + + if (response != null) { + List records = response.getRowRecords(0); + int rank = 1; + for (QueryResultsWrapper.RowRecord record : records) { + Map item = new LinkedHashMap<>(); + item.put("rank", rank++); + Object score = record.get("score"); + if (score != null) { + item.put("score", score); + } + for (String field : fields) { + String trimmed = field.trim(); + if (!trimmed.isEmpty()) { + Object value = record.get(trimmed); + if (value != null) { + item.put(trimmed, value); + } + } + } + extracted.add(item); + } + } + return extracted; + } + + public String getOutputFields() { + return outputFields; + } + + /** + * @param outputFields comma-separated list of field names to extract from search results (e.g., + * {@code content,title}) + */ + public void setOutputFields(String outputFields) { + this.outputFields = outputFields; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java new file mode 100644 index 0000000000000..20d66307d3179 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperSearch.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.List; + +import io.milvus.common.clientenum.ConsistencyLevelEnum; +import io.milvus.param.highlevel.dml.SearchSimpleParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.highlevel.dml.SearchSimpleParam} from the + * exchange body vector and simple string properties, then sets it as the exchange body together with the + * {@link MilvusAction#SEARCH} header. + */ +public class MilvusHelperSearch implements Processor { + + private String collectionName = "default_collection"; + private String outputFields = "content"; + private String limit = "10"; + private String offset = "0"; + private String consistencyLevel = "STRONG"; + private String filter; + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + List queryEmbedding = exchange.getIn().getBody(List.class); + if (queryEmbedding == null) { + throw new IllegalArgumentException("Exchange body must contain a List vector, but was null"); + } + long searchLimit = Long.parseLong(limit); + + List fields = new ArrayList<>(); + for (String field : outputFields.split(",")) { + String trimmed = field.trim(); + if (!trimmed.isEmpty()) { + fields.add(trimmed); + } + } + + long searchOffset = Long.parseLong(offset); + ConsistencyLevelEnum consistency = ConsistencyLevelEnum.valueOf(consistencyLevel); + + SearchSimpleParam.Builder builder = SearchSimpleParam.newBuilder() + .withCollectionName(collectionName) + .withVectors(queryEmbedding) + .withLimit(searchLimit) + .withOffset(searchOffset) + .withOutputFields(fields) + .withConsistencyLevel(consistency); + + if (filter != null && !filter.isEmpty()) { + builder.withFilter(filter); + } + + SearchSimpleParam param = builder.build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.SEARCH); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getOutputFields() { + return outputFields; + } + + /** + * @param outputFields comma-separated list of field names to include in search results (e.g., + * {@code content,title}) + */ + public void setOutputFields(String outputFields) { + this.outputFields = outputFields; + } + + public String getLimit() { + return limit; + } + + /** + * @param limit the maximum number of results to return as a string (e.g., {@code 10}, {@code 100}) + */ + public void setLimit(String limit) { + this.limit = limit; + } + + public String getOffset() { + return offset; + } + + /** + * @param offset the number of results to skip as a string (e.g., {@code 0}, {@code 10}) + */ + public void setOffset(String offset) { + this.offset = offset; + } + + public String getConsistencyLevel() { + return consistencyLevel; + } + + /** + * @param consistencyLevel the Milvus {@link io.milvus.common.clientenum.ConsistencyLevelEnum} enum name (e.g., + * {@code STRONG}, {@code BOUNDED}, {@code EVENTUALLY}) + */ + public void setConsistencyLevel(String consistencyLevel) { + this.consistencyLevel = consistencyLevel; + } + + public String getFilter() { + return filter; + } + + /** + * @param filter a Milvus boolean expression to filter results (e.g., {@code age > 18}, {@code status == "active"}) + */ + public void setFilter(String filter) { + this.filter = filter; + } +} diff --git a/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java new file mode 100644 index 0000000000000..df6c1d7c24c04 --- /dev/null +++ b/components/camel-ai/camel-milvus/src/main/java/org/apache/camel/component/milvus/helpers/MilvusHelperUpsert.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milvus.helpers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.milvus.param.dml.UpsertParam; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.milvus.MilvusAction; +import org.apache.camel.component.milvus.MilvusHeaders; + +/** + * A Camel {@link Processor} that builds a Milvus {@link io.milvus.param.dml.UpsertParam} from the exchange body vector + * and exchange variables mapped via {@link #setTextFieldMappings(String)}, then sets it as the exchange body together + * with the {@link MilvusAction#UPSERT} header. + */ +public class MilvusHelperUpsert implements Processor { + + private String collectionName = "default_collection"; + private String vectorFieldName = "embedding"; + private String textFieldMappings = "content=text"; + + @SuppressWarnings("unchecked") + @Override + public void process(Exchange exchange) throws Exception { + List vector = exchange.getIn().getBody(List.class); + if (vector == null) { + throw new IllegalArgumentException("Exchange body must contain a List vector, but was null"); + } + + Map mappings = MilvusHelperFieldMappingUtil.parseMappings(textFieldMappings); + + List fields = new ArrayList<>(); + for (Map.Entry entry : mappings.entrySet()) { + String value = exchange.getVariable(entry.getValue(), String.class); + if (value == null) { + throw new IllegalArgumentException( + "Exchange variable '" + entry.getValue() + "' is not set (mapped from field '" + entry.getKey() + "')"); + } + fields.add(new UpsertParam.Field(entry.getKey(), Collections.singletonList(value))); + } + + fields.add(new UpsertParam.Field(vectorFieldName, Collections.singletonList(vector))); + + UpsertParam param = UpsertParam.newBuilder() + .withCollectionName(collectionName) + .withFields(fields) + .build(); + + exchange.getIn().setBody(param); + exchange.getIn().setHeader(MilvusHeaders.ACTION, MilvusAction.UPSERT); + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public void setVectorFieldName(String vectorFieldName) { + this.vectorFieldName = vectorFieldName; + } + + public String getTextFieldMappings() { + return textFieldMappings; + } + + /** + * @param textFieldMappings comma-separated mappings of Milvus field names to exchange variable names (e.g., + * {@code field1=var1,field2=var2}) + */ + public void setTextFieldMappings(String textFieldMappings) { + this.textFieldMappings = textFieldMappings; + } +} diff --git a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java index a75119addc330..a5499a07f6e31 100644 --- a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java +++ b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/MilvusCreateCollectionTest.java @@ -23,6 +23,8 @@ import io.milvus.param.collection.FieldType; import org.apache.camel.Exchange; import org.apache.camel.NoSuchHeaderException; +import org.apache.camel.component.milvus.helpers.MilvusHelperCreateCollection; +import org.apache.camel.support.DefaultExchange; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -74,4 +76,28 @@ public void createCollectionWithoutRequiredParameters() { assertThat(result).isNotNull(); assertThat(result.getException()).isInstanceOf(NoSuchHeaderException.class); } + + @DisplayName("Tests that trying to create a collection via helper without passing the action name triggers a failure") + @Test + public void createCollectionWithHelperWithoutRequiredParameters() throws Exception { + MilvusHelperCreateCollection helper = new MilvusHelperCreateCollection(); + helper.setCollectionName("test"); + helper.setCollectionDescription("customer info"); + helper.setIdFieldName("userID"); + helper.setVectorFieldName("userFace"); + helper.setTextFieldName("userAge"); + helper.setTextFieldDataType("Int8"); + helper.setDimension("64"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + // Send body without the action header to trigger failure + Exchange result = fluentTemplate.to("milvus:createCollection") + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isInstanceOf(NoSuchHeaderException.class); + } } diff --git a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java index cab3064b08520..191fa01e2be2e 100644 --- a/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java +++ b/components/camel-ai/camel-milvus/src/test/java/org/apache/camel/component/milvus/it/MilvusComponentIT.java @@ -39,6 +39,13 @@ import org.apache.camel.component.milvus.MilvusAction; import org.apache.camel.component.milvus.MilvusHeaders; import org.apache.camel.component.milvus.MilvusTestSupport; +import org.apache.camel.component.milvus.helpers.MilvusHelperCreateCollection; +import org.apache.camel.component.milvus.helpers.MilvusHelperCreateIndex; +import org.apache.camel.component.milvus.helpers.MilvusHelperDelete; +import org.apache.camel.component.milvus.helpers.MilvusHelperInsert; +import org.apache.camel.component.milvus.helpers.MilvusHelperSearch; +import org.apache.camel.component.milvus.helpers.MilvusHelperUpsert; +import org.apache.camel.support.DefaultExchange; import org.assertj.core.util.Lists; import org.junit.jupiter.api.*; @@ -268,6 +275,190 @@ public void delete() { assertThat(result.getMessage().getBody(SearchResponse.class).getRowRecords().size() == 0); } + // --- Helper-based tests (same operations via MilvusHelper beans) --- + + @Test + @Order(10) + public void createCollectionWithHelper() throws Exception { + MilvusHelperCreateCollection helper = new MilvusHelperCreateCollection(); + helper.setCollectionName("test_helper"); + helper.setCollectionDescription("helper test collection"); + helper.setIdFieldName("userID"); + helper.setVectorFieldName("userFace"); + helper.setTextFieldName("userAge"); + helper.setTextFieldDataType("Int8"); + helper.setDimension("64"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(11) + public void createIndexWithHelper() throws Exception { + MilvusHelperCreateIndex helper = new MilvusHelperCreateIndex(); + helper.setCollectionName("test_helper"); + helper.setVectorFieldName("userFace"); + helper.setIndexName("userFaceIndex"); + helper.setIndexType("IVF_FLAT"); + helper.setMetricType("L2"); + helper.setExtraParam("{\"nlist\":128}"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(12) + public void createRagCollectionWithHelper() throws Exception { + MilvusHelperCreateCollection helper = new MilvusHelperCreateCollection(); + helper.setCollectionName("test_helper_rag"); + helper.setCollectionDescription("helper RAG test collection"); + helper.setIdFieldName("docID"); + helper.setVectorFieldName("embedding"); + helper.setTextFieldName("content"); + helper.setTextFieldDataType("VarChar"); + helper.setTextFieldMaxLength("2048"); + helper.setDimension("64"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper_rag") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(13) + public void createRagIndexWithHelper() throws Exception { + MilvusHelperCreateIndex helper = new MilvusHelperCreateIndex(); + helper.setCollectionName("test_helper_rag"); + helper.setVectorFieldName("embedding"); + helper.setIndexName("embeddingIndex"); + helper.setIndexType("IVF_FLAT"); + helper.setMetricType("L2"); + helper.setExtraParam("{\"nlist\":128}"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper_rag") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(14) + public void insertWithHelper() throws Exception { + MilvusHelperInsert helper = new MilvusHelperInsert(); + helper.setCollectionName("test_helper_rag"); + helper.setVectorFieldName("embedding"); + helper.setTextFieldMappings("content=contentVar"); + + Exchange tempExchange = new DefaultExchange(context); + tempExchange.setVariable("contentVar", "This is a test document for RAG"); + tempExchange.getIn().setBody(generateFloatVector()); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper_rag") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(15) + public void upsertWithHelper() throws Exception { + MilvusHelperUpsert helper = new MilvusHelperUpsert(); + helper.setCollectionName("test_helper_rag"); + helper.setVectorFieldName("embedding"); + helper.setTextFieldMappings("content=contentVar"); + + Exchange tempExchange = new DefaultExchange(context); + tempExchange.setVariable("contentVar", "Updated document content"); + tempExchange.getIn().setBody(generateFloatVector()); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper_rag") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + // upsert without docID field will fail (autoID collection) + Assertions.assertTrue(result.isFailed()); + } + + @Test + @Order(16) + public void searchWithHelper() throws Exception { + MilvusHelperSearch helper = new MilvusHelperSearch(); + helper.setCollectionName("test_helper"); + helper.setOutputFields("userAge"); + helper.setFilter("userAge>0"); + helper.setLimit("100"); + helper.setOffset("0"); + + Exchange tempExchange = new DefaultExchange(context); + tempExchange.getIn().setBody(generateFloatVector()); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + + @Test + @Order(17) + public void deleteWithHelper() throws Exception { + MilvusHelperDelete helper = new MilvusHelperDelete(); + helper.setCollectionName("test_helper"); + helper.setFilter("userAge>0"); + + Exchange tempExchange = new DefaultExchange(context); + helper.process(tempExchange); + + Exchange result = fluentTemplate.to("milvus:test_helper") + .withHeader(MilvusHeaders.ACTION, tempExchange.getIn().getHeader(MilvusHeaders.ACTION)) + .withBody(tempExchange.getIn().getBody()) + .request(Exchange.class); + + assertThat(result).isNotNull(); + assertThat(result.getException()).isNull(); + } + private List> generateFloatVectors(int count) { Random ran = new Random(); List> vectors = new ArrayList<>();