This commit is contained in:
Mason Daugherty
2025-11-23 01:28:15 -05:00
parent 75a4e2557c
commit 640b33e43e
3 changed files with 71 additions and 60 deletions
@@ -21,26 +21,35 @@ class MilvusCollectionHybridSearchRetriever(BaseRetriever):
collection: Collection
"""Milvus Collection object."""
rerank: BaseRanker
"""Milvus ranker object. Such as WeightedRanker or RRFRanker."""
anns_fields: List[str]
"""The names of vector fields that are used for ANNS search."""
field_embeddings: List[Union[Embeddings, BaseSparseEmbedding]]
"""The embedding functions of each vector fields,
which can be either Embeddings or BaseSparseEmbedding."""
field_search_params: Optional[List[Dict]] = None
"""The search parameters of each vector fields.
If not specified, the default search parameters will be used."""
field_limits: Optional[List[int]] = None
"""Limit number of results for each ANNS field.
If not specified, the default top_k will be used."""
field_exprs: Optional[List[Optional[str]]] = None
"""The boolean expression for filtering the search results."""
top_k: int = 4
"""Final top-K number of documents to retrieve."""
text_field: str = "text"
"""The text field name,
which will be used as the `page_content` of a `Document` object."""
output_fields: Optional[List[str]] = None
"""Final output fields of the documents.
If not specified, all fields except the vector fields will be used as output fields,
@@ -82,9 +91,9 @@ class MilvusCollectionHybridSearchRetriever(BaseRetriever):
self.collection.load()
def _validate_fields_num(self) -> None:
assert (
len(self.anns_fields) >= 2
), "At least two fields are required for hybrid search."
assert len(self.anns_fields) >= 2, (
"At least two fields are required for hybrid search."
)
lengths = [len(self.anns_fields)]
if self.field_limits is not None:
lengths.append(len(self.field_limits))
@@ -102,17 +111,17 @@ class MilvusCollectionHybridSearchRetriever(BaseRetriever):
def _validate_fields_name(self) -> None:
collection_fields = [x.name for x in self.collection.schema.fields]
for field in self.anns_fields:
assert (
field in collection_fields
), f"{field} is not a valid field in the collection."
assert (
self.text_field in collection_fields
), f"{self.text_field} is not a valid field in the collection."
assert field in collection_fields, (
f"{field} is not a valid field in the collection."
)
assert self.text_field in collection_fields, (
f"{self.text_field} is not a valid field in the collection."
)
for field in self.output_fields: # type: ignore[union-attr]
if not self.collection.schema.enable_dynamic_field:
assert (
field in collection_fields
), f"{field} is not a valid field in the collection."
assert field in collection_fields, (
f"{field} is not a valid field in the collection."
)
def _get_output_fields(self) -> List[str]:
if self.output_fields:
@@ -9,7 +9,7 @@ from langchain_core.retrievers import BaseRetriever
class ZillizCloudPipelineRetriever(BaseRetriever):
"""`Zilliz Cloud Pipeline` retriever.
Parameters:
Attributes:
pipeline_ids: A dictionary of pipeline ids.
Valid keys: "ingestion", "search", "deletion".
token: Zilliz Cloud's token. Defaults to "".
@@ -18,7 +18,9 @@ class ZillizCloudPipelineRetriever(BaseRetriever):
"""
pipeline_ids: Dict
token: str = ""
cloud_region: str = "gcp-us-west1"
def _get_relevant_documents(
@@ -326,9 +326,9 @@ class Milvus(VectorStore):
"Either `embedding_function` or `builtin_function` should be provided."
)
self.embedding_func: Optional[
Union[EmbeddingType, List[EmbeddingType]]
] = self._from_list(embedding_function)
self.embedding_func: Optional[Union[EmbeddingType, List[EmbeddingType]]] = (
self._from_list(embedding_function)
)
self.builtin_func: Optional[
Union[BaseMilvusBuiltInFunction, List[BaseMilvusBuiltInFunction]]
] = self._from_list(builtin_function)
@@ -1233,8 +1233,8 @@ class Milvus(VectorStore):
metadatas: Optional metadata for each text
ids: Optional IDs for each text
force_ids: If force_ids, when auto_id is True and ids is not None,
it will return a list containing the customized ids, otherwise,
it will not contain the customized ids.
it will return a list containing the customized ids, otherwise,
it will not contain the customized ids.
Returns:
List of dictionaries ready for insertion
@@ -1242,14 +1242,14 @@ class Milvus(VectorStore):
insert_list: list[dict] = []
for vector_field_embeddings in embeddings:
assert len(texts) == len(
vector_field_embeddings
), "Mismatched lengths of texts and embeddings."
assert len(texts) == len(vector_field_embeddings), (
"Mismatched lengths of texts and embeddings."
)
if metadatas is not None:
assert len(texts) == len(
metadatas
), "Mismatched lengths of texts and metadatas."
assert len(texts) == len(metadatas), (
"Mismatched lengths of texts and metadatas."
)
for i, text in zip(range(len(texts)), texts):
entity_dict = {}
@@ -1302,12 +1302,14 @@ class Milvus(VectorStore):
that they all fit in memory.
metadatas (Optional[List[dict]]): Metadata dicts attached to each of
the texts. Defaults to None.
should be less than 65535 bytes. Required and work when auto_id is False.
timeout (Optional[float]): Timeout for each batch insert. Defaults
to None.
batch_size (int, optional): Batch size to use for insertion.
Defaults to 1000.
ids (Optional[List[str]]): List of text ids. The length of each item
should be less than 65535 bytes.
Required and work when `auto_id` is `False`.
Raises:
MilvusException: Failure to add texts
@@ -1324,13 +1326,13 @@ class Milvus(VectorStore):
)
self.auto_id = True
elif not self.auto_id and ids: # Check ids
assert len(set(ids)) == len(
texts
), "Different lengths of texts and unique ids are provided."
assert len(set(ids)) == len(texts), (
"Different lengths of texts and unique ids are provided."
)
assert all(isinstance(x, str) for x in ids), "All ids should be strings."
assert all(
len(x.encode()) <= 65_535 for x in ids
), "Each id should be a string less than 65535 bytes."
assert all(len(x.encode()) <= 65_535 for x in ids), (
"Each id should be a string less than 65535 bytes."
)
elif self.auto_id and ids:
logger.warning(
@@ -1417,12 +1419,12 @@ class Milvus(VectorStore):
or list of vectors for each text (in case of multi-vector)
metadatas (Optional[List[dict]]): Metadata dicts attached to each of
the texts. Defaults to None.
should be less than 65535 bytes. Required and work when auto_id is False.
timeout (Optional[float]): Timeout for each batch insert. Defaults
to None.
batch_size (int, optional): Batch size to use for insertion.
Defaults to 1000.
ids (Optional[List[str]]): List of text ids. The length of each item
should be less than 65535 bytes. Required and work when auto_id is False.
Raises:
MilvusException: Failure to add texts and embeddings
@@ -1523,7 +1525,7 @@ class Milvus(VectorStore):
else:
log_entity[k] = v
logger.error(
"Failed to %s batch starting at entity: %s/%s. " "First entity data: %s",
"Failed to %s batch starting at entity: %s/%s. First entity data: %s",
operation_name,
batch_index + 1,
total_count,
@@ -1770,7 +1772,7 @@ class Milvus(VectorStore):
query (str): The text being searched.
k (int, optional): The amount of results to return. Defaults to 4.
param (dict | list[dict], optional): The search params for the specified
index. Defaults to None.
index. Defaults to None.
expr (str, optional): Filtering expression. Defaults to None.
timeout (float, optional): How long to wait before timeout error.
Defaults to None.
@@ -2068,8 +2070,7 @@ class Milvus(VectorStore):
return _map_ip_to_similarity
else:
raise ValueError(
"No supported normalization function"
f" for metric type: {metric_type}."
f"No supported normalization function for metric type: {metric_type}."
)
def delete(
@@ -2091,13 +2092,13 @@ class Milvus(VectorStore):
if isinstance(ids, list) and len(ids) > 0:
if expr is not None:
logger.warning(
"Both ids and expr are provided. " "Ignore expr and delete by ids."
"Both ids and expr are provided. Ignore expr and delete by ids."
)
expr = f"{self._primary_field} in {ids}"
else:
assert isinstance(
expr, str
), "Either ids list or expr string must be provided."
assert isinstance(expr, str), (
"Either ids list or expr string must be provided."
)
try:
self.client.delete(self.collection_name, filter=expr, **kwargs)
return True
@@ -2128,8 +2129,7 @@ class Milvus(VectorStore):
conn.schema_cache.pop(self.collection_name, None)
except Exception as e:
logger.warning(
f"Failed to clear sync schema cache for "
f"{self.collection_name}: {e}"
f"Failed to clear sync schema cache for {self.collection_name}: {e}"
)
# Clear schema cache from async client
@@ -2201,7 +2201,7 @@ class Milvus(VectorStore):
if isinstance(ids, list) and len(ids) > 0:
if auto_id:
logger.warning(
"Both ids and auto_id are provided. " "Ignore auto_id and use ids."
"Both ids and auto_id are provided. Ignore auto_id and use ids."
)
auto_id = False
else:
@@ -2307,9 +2307,9 @@ class Milvus(VectorStore):
if not ids:
self.add_documents(documents=documents, **kwargs)
else:
assert len(set(ids)) == len(
documents
), "Different lengths of documents and unique ids are provided."
assert len(set(ids)) == len(documents), (
"Different lengths of documents and unique ids are provided."
)
embeddings_functions: List[EmbeddingType] = self._as_list(self.embedding_func)
embeddings: List = []
@@ -2474,12 +2474,12 @@ class Milvus(VectorStore):
that they all fit in memory.
metadatas (Optional[List[dict]]): Metadata dicts attached to each of
the texts. Defaults to None.
should be less than 65535 bytes. Required and work when auto_id is False.
timeout (Optional[float]): Timeout for each batch insert. Defaults
to None.
batch_size (int, optional): Batch size to use for insertion.
Defaults to 1000.
ids (Optional[List[str]]): List of text ids. The length of each item
should be less than 65535 bytes. Required and work when auto_id is False.
Raises:
MilvusException: Failure to add texts
@@ -2496,9 +2496,9 @@ class Milvus(VectorStore):
)
self.auto_id = True
elif not self.auto_id and ids: # Check ids
assert len(set(ids)) == len(
texts
), "Different lengths of texts and unique ids are provided."
assert len(set(ids)) == len(texts), (
"Different lengths of texts and unique ids are provided."
)
elif self.auto_id and ids:
logger.warning(
@@ -2565,12 +2565,12 @@ class Milvus(VectorStore):
or list of vectors for each text (in case of multi-vector)
metadatas (Optional[List[dict]]): Metadata dicts attached to each of
the texts. Defaults to None.
should be less than 65535 bytes. Required and work when auto_id is False.
timeout (Optional[float]): Timeout for each batch insert. Defaults
to None.
batch_size (int, optional): Batch size to use for insertion.
Defaults to 1000.
ids (Optional[List[str]]): List of text ids. The length of each item
should be less than 65535 bytes. Required and work when auto_id is False.
Raises:
MilvusException: Failure to add texts and embeddings
@@ -2872,7 +2872,7 @@ class Milvus(VectorStore):
query (str): The text being searched.
k (int, optional): The amount of results to return. Defaults to 4.
param (dict | list[dict], optional): The search params for the specified
index. Defaults to None.
index. Defaults to None.
expr (str, optional): Filtering expression. Defaults to None.
timeout (float, optional): How long to wait before timeout error.
Defaults to None.
@@ -3126,13 +3126,13 @@ class Milvus(VectorStore):
if isinstance(ids, list) and len(ids) > 0:
if expr is not None:
logger.warning(
"Both ids and expr are provided. " "Ignore expr and delete by ids."
"Both ids and expr are provided. Ignore expr and delete by ids."
)
expr = f"{self._primary_field} in {ids}"
else:
assert isinstance(
expr, str
), "Either ids list or expr string must be provided."
assert isinstance(expr, str), (
"Either ids list or expr string must be provided."
)
try:
await self.aclient.delete(self.collection_name, filter=expr, **kwargs)
return True
@@ -3197,7 +3197,7 @@ class Milvus(VectorStore):
if isinstance(ids, list) and len(ids) > 0:
if auto_id:
logger.warning(
"Both ids and auto_id are provided. " "Ignore auto_id and use ids."
"Both ids and auto_id are provided. Ignore auto_id and use ids."
)
auto_id = False
else:
@@ -3288,9 +3288,9 @@ class Milvus(VectorStore):
await self.aadd_documents(documents=documents, **kwargs)
return
assert len(set(ids)) == len(
documents
), "Different lengths of documents and unique ids are provided."
assert len(set(ids)) == len(documents), (
"Different lengths of documents and unique ids are provided."
)
embeddings_functions: List[EmbeddingType] = self._as_list(self.embedding_func)
embeddings: List = []