From 5b2f0866fef2afb675287c8d9213888083bdb221 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 10 Feb 2024 18:09:26 +0530 Subject: [PATCH 01/34] basic migration --- airflow/providers/pinecone/hooks/pinecone.py | 24 +++++++++---------- airflow/providers/pinecone/provider.yaml | 2 +- generated/provider_dependencies.json | 2 +- pyproject.toml | 2 +- .../providers/pinecone/hooks/test_pinecone.py | 3 ++- 5 files changed, 16 insertions(+), 17 deletions(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index 6a116250f4922..b1f33e43a12a8 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Any import pinecone +from pinecone import Pinecone from airflow.hooks.base import BaseHook @@ -63,24 +64,22 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: """Return custom field behaviour.""" return { "hidden_fields": ["port", "schema"], - "relabeling": {"login": "Pinecone Environment", "password": "Pinecone API key"}, + "relabeling": {"password": "Pinecone API key"}, } def __init__(self, conn_id: str = default_conn_name) -> None: self.conn_id = conn_id - self.get_conn() + self.conn = self.get_conn() - def get_conn(self) -> None: + def get_conn(self) -> Pinecone: pinecone_connection = self.get_connection(self.conn_id) api_key = pinecone_connection.password - pinecone_environment = pinecone_connection.login pinecone_host = pinecone_connection.host extras = pinecone_connection.extra_dejson pinecone_project_id = extras.get("project_id") log_level = extras.get("log_level", None) - pinecone.init( + return Pinecone( api_key=api_key, - environment=pinecone_environment, host=pinecone_host, project_name=pinecone_project_id, log_level=log_level, @@ -88,18 +87,17 @@ def get_conn(self) -> None: def test_connection(self) -> tuple[bool, str]: try: - self.list_indexes() + self.conn.list_indexes() return True, "Connection established" except Exception as e: return False, str(e) - @staticmethod - def list_indexes() -> Any: + def list_indexes(self) -> Any: """Retrieve a list of all indexes in your project.""" - return pinecone.list_indexes() + return self.conn.list_indexes() - @staticmethod def upsert( + self, index_name: str, vectors: list[Any], namespace: str = "", @@ -125,7 +123,7 @@ def upsert( :param show_progress: Whether to show a progress bar using tqdm. Applied only if batch_size is provided. """ - index = pinecone.Index(index_name) + index = self.conn.Index(index_name) return index.upsert( vectors=vectors, namespace=namespace, @@ -134,8 +132,8 @@ def upsert( **kwargs, ) - @staticmethod def create_index( + self, index_name: str, dimension: int, index_type: str | None = "approximated", diff --git a/airflow/providers/pinecone/provider.yaml b/airflow/providers/pinecone/provider.yaml index 3caf3c69578cc..abc7a8316f6e2 100644 --- a/airflow/providers/pinecone/provider.yaml +++ b/airflow/providers/pinecone/provider.yaml @@ -44,7 +44,7 @@ dependencies: # Pinecone Python SDK v3.0.0 was released at 2024-01-16 and introduce some breaking changes. # It's crucial to adhere to the v3.0.0 Migration Guide before the upper-bound limitation can be removed. # https://canyon-quilt-082.notion.site/Pinecone-Python-SDK-v3-0-0-Migration-Guide-056d3897d7634bf7be399676a4757c7b - - pinecone-client>=2.2.4,<3.0 + - pinecone-client~=3.0.0 hooks: - integration-name: Pinecone diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 5991796ee599c..5b3c612a69248 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -899,7 +899,7 @@ "pinecone": { "deps": [ "apache-airflow>=2.6.0", - "pinecone-client>=2.2.4,<3.0" + "pinecone-client~=3.0.0" ], "devel-deps": [], "cross-providers-deps": [], diff --git a/pyproject.toml b/pyproject.toml index f93cd2aeb6a05..bacb78bfedccb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -862,7 +862,7 @@ pgvector = [ # source: airflow/providers/pgvector/provider.yaml "pgvector>=0.2.3", ] pinecone = [ # source: airflow/providers/pinecone/provider.yaml - "pinecone-client>=2.2.4,<3.0", + "pinecone-client~=3.0.0", ] postgres = [ # source: airflow/providers/postgres/provider.yaml "apache-airflow[common_sql]", diff --git a/tests/providers/pinecone/hooks/test_pinecone.py b/tests/providers/pinecone/hooks/test_pinecone.py index fb076cc0a38de..2fc1421b0c770 100644 --- a/tests/providers/pinecone/hooks/test_pinecone.py +++ b/tests/providers/pinecone/hooks/test_pinecone.py @@ -32,9 +32,10 @@ def setup_method(self): mock_conn.password = "test_password" mock_get_connection.return_value = mock_conn self.pinecone_hook = PineconeHook() + self.pinecone_hook.conn self.index_name = "test_index" - @patch("airflow.providers.pinecone.hooks.pinecone.pinecone.Index") + @patch("airflow.providers.pinecone.hooks.pinecone.Pinecone.Index") def test_upsert(self, mock_index): """Test the upsert_data_async method of PineconeHook for correct data insertion asynchronously.""" data = [("id1", [1.0, 2.0, 3.0], {"meta": "data"})] From b6a3d48c48568e9f40c20b237613383b3546ab9b Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 17 Feb 2024 07:09:48 +0530 Subject: [PATCH 02/34] use self.conn --- airflow/providers/pinecone/hooks/pinecone.py | 105 ++++++++----------- 1 file changed, 42 insertions(+), 63 deletions(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index b1f33e43a12a8..c10324004a975 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -21,8 +21,7 @@ import itertools from typing import TYPE_CHECKING, Any -import pinecone -from pinecone import Pinecone +from pinecone import Pinecone, PodSpec, ServerlessSpec from airflow.hooks.base import BaseHook @@ -132,75 +131,58 @@ def upsert( **kwargs, ) + @staticmethod + def get_pod_spec_obj(environment, replicas, shards, pods, pod_type, metadata_config, source_collection): + return PodSpec( + environment=environment, + replicas=replicas, + shards=shards, + pods=pods, + pod_type=pod_type, + metadata_config=metadata_config, + source_collection=source_collection, + ) + + @staticmethod + def get_serverless_spec_obj(cloud, region): + return ServerlessSpec(cloud=cloud, region=region) + def create_index( self, index_name: str, dimension: int, - index_type: str | None = "approximated", + spec: ServerlessSpec | PodSpec, metric: str | None = "cosine", - replicas: int | None = 1, - shards: int | None = 1, - pods: int | None = 1, - pod_type: str | None = "p1", - index_config: dict[str, str] | None = None, - metadata_config: dict[str, str] | None = None, - source_collection: str | None = "", timeout: int | None = None, ) -> None: - """ - Create a new index. - - .. seealso:: https://docs.pinecone.io/reference/create_index/ - - :param index_name: The name of the index to create. - :param dimension: the dimension of vectors that would be inserted in the index - :param index_type: type of index, one of {"approximated", "exact"}, defaults to "approximated". - :param metric: type of metric used in the vector index, one of {"cosine", "dotproduct", "euclidean"} - :param replicas: the number of replicas, defaults to 1. - :param shards: the number of shards per index, defaults to 1. - :param pods: Total number of pods to be used by the index. pods = shard*replicas - :param pod_type: the pod type to be used for the index. can be one of p1 or s1. - :param index_config: Advanced configuration options for the index - :param metadata_config: Configuration related to the metadata index - :param source_collection: Collection name to create the index from - :param timeout: Timeout for wait until index gets ready. - """ - pinecone.create_index( + self.conn.create_index( name=index_name, - timeout=timeout, - index_type=index_type, dimension=dimension, + spec=spec, metric=metric, - pods=pods, - replicas=replicas, - shards=shards, - pod_type=pod_type, - metadata_config=metadata_config, - source_collection=source_collection, - index_config=index_config, + timeout=timeout, ) - @staticmethod - def describe_index(index_name: str) -> Any: + def describe_index(self, index_name: str) -> Any: """ Retrieve information about a specific index. :param index_name: The name of the index to describe. """ - return pinecone.describe_index(name=index_name) + return self.conn.describe_index(name=index_name) - @staticmethod - def delete_index(index_name: str, timeout: int | None = None) -> None: + def delete_index(self, index_name: str, timeout: int | None = None) -> None: """ Delete a specific index. :param index_name: the name of the index. :param timeout: Timeout for wait until index gets ready. """ - pinecone.delete_index(name=index_name, timeout=timeout) + self.conn.delete_index(name=index_name, timeout=timeout) - @staticmethod - def configure_index(index_name: str, replicas: int | None = None, pod_type: str | None = "") -> None: + def configure_index( + self, index_name: str, replicas: int | None = None, pod_type: str | None = "" + ) -> None: """ Change the current configuration of the index. @@ -208,43 +190,40 @@ def configure_index(index_name: str, replicas: int | None = None, pod_type: str :param replicas: The new number of replicas. :param pod_type: the new pod_type for the index. """ - pinecone.configure_index(name=index_name, replicas=replicas, pod_type=pod_type) + self.conn.configure_index(name=index_name, replicas=replicas, pod_type=pod_type) @staticmethod - def create_collection(collection_name: str, index_name: str) -> None: + def create_collection(self, collection_name: str, index_name: str) -> None: """ Create a new collection from a specified index. :param collection_name: The name of the collection to create. :param index_name: The name of the source index. """ - pinecone.create_collection(name=collection_name, source=index_name) + self.conn.create_collection(name=collection_name, source=index_name) - @staticmethod - def delete_collection(collection_name: str) -> None: + def delete_collection(self, collection_name: str) -> None: """ Delete a specific collection. :param collection_name: The name of the collection to delete. """ - pinecone.delete_collection(collection_name) + self.conn.delete_collection(collection_name) - @staticmethod - def describe_collection(collection_name: str) -> Any: + def describe_collection(self, collection_name: str) -> Any: """ Retrieve information about a specific collection. :param collection_name: The name of the collection to describe. """ - return pinecone.describe_collection(collection_name) + return self.conn.describe_collection(collection_name) - @staticmethod - def list_collections() -> Any: + def list_collections(self) -> Any: """Retrieve a list of all collections in the current project.""" - return pinecone.list_collections() + return self.conn.list_collections() - @staticmethod def query_vector( + self, index_name: str, vector: list[Any], query_id: str | None = None, @@ -272,7 +251,7 @@ def query_vector( :param sparse_vector: sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]}, where the lists each have the same length. """ - index = pinecone.Index(index_name) + index = self.conn.Index(index_name) return index.query( vector=vector, id=query_id, @@ -310,7 +289,7 @@ def upsert_data_async( :param pool_threads: Number of threads for parallel upserting. If async_req is True, this must be provided. """ responses = [] - with pinecone.Index(index_name, pool_threads=pool_threads) as index: + with self.conn.Index(index_name, pool_threads=pool_threads) as index: if async_req and pool_threads: async_results = [index.upsert(vectors=chunk, async_req=True) for chunk in self._chunks(data)] responses = [async_result.get() for async_result in async_results] @@ -320,8 +299,8 @@ def upsert_data_async( responses.append(response) return responses - @staticmethod def describe_index_stats( + self, index_name: str, stats_filter: dict[str, str | float | int | bool | list[Any] | dict[Any, Any]] | None = None, **kwargs: Any, @@ -337,5 +316,5 @@ def describe_index_stats( :param stats_filter: If this parameter is present, the operation only returns statistics for vectors that satisfy the filter. See https://www.pinecone.io/docs/metadata-filtering/ """ - index = pinecone.Index(index_name) + index = self.conn.Index(index_name) return index.describe_index_stats(filter=stats_filter, **kwargs) From c3d61deab05c7b4120b4131942c43cdf9c58f05c Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 08:34:15 +0530 Subject: [PATCH 03/34] replace conn with pc property --- airflow/providers/pinecone/hooks/pinecone.py | 111 +++++++++++++------ airflow/providers/pinecone/provider.yaml | 2 +- generated/provider_dependencies.json | 2 +- 3 files changed, 82 insertions(+), 33 deletions(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index acdedec913279..b1e6d5e86a9ce 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -20,6 +20,7 @@ from __future__ import annotations import itertools +from functools import cached_property from typing import TYPE_CHECKING, Any from pinecone import Pinecone, PodSpec, ServerlessSpec @@ -64,37 +65,78 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: """Return custom field behaviour.""" return { "hidden_fields": ["port", "schema"], - "relabeling": {"password": "Pinecone API key"}, + "relabeling": { + "login": "Pinecone Environment", + "host": "Pinecone Region", + "password": "Pinecone API key", + }, } - def __init__(self, conn_id: str = default_conn_name) -> None: + def __init__( + self, + conn_id: str = default_conn_name, + environment: str | None = None, + region: str | None = None, + api_key: str | None = None, + ) -> None: self.conn_id = conn_id + self._environment = environment + self._region = region + self._api_key = api_key self.conn = self.get_conn() - def get_conn(self) -> Pinecone: - pinecone_connection = self.get_connection(self.conn_id) - api_key = pinecone_connection.password - pinecone_host = pinecone_connection.host - extras = pinecone_connection.extra_dejson + @property + def api_key(self): + if self._api_key: + return self._api_key + key = self.conn.password + if not key: + raise LookupError("Pinecone API Key not found in connection") + return key + + @cached_property + def environment(self): + if self._environment: + return self._environment + env = self.conn.login + if not env: + raise LookupError("Pinecone environment not found in connection") + return env + + @cached_property + def region(self): + if self._region: + return self._region + region = self.conn.host + if not region: + raise LookupError("Pinecone region not found in connection") + return region + + @cached_property + def pc(self): + """Pinecone object to interact with Pinecone.""" + pinecone_host = self.conn.host + extras = self.conn.extra_dejson pinecone_project_id = extras.get("project_id") log_level = extras.get("log_level", None) + return Pinecone( - api_key=api_key, - host=pinecone_host, - project_name=pinecone_project_id, - log_level=log_level, + api_key=self.api_key, host=pinecone_host, project_id=pinecone_project_id, log_level=log_level ) + def get_conn(self) -> Pinecone: + return self.get_connection(self.conn_id) + def test_connection(self) -> tuple[bool, str]: try: - self.conn.list_indexes() + self.pc.list_indexes() return True, "Connection established" except Exception as e: return False, str(e) def list_indexes(self) -> Any: """Retrieve a list of all indexes in your project.""" - return self.conn.list_indexes() + return self.pc.list_indexes() def upsert( self, @@ -123,7 +165,7 @@ def upsert( :param show_progress: Whether to show a progress bar using tqdm. Applied only if batch_size is provided. """ - index = self.conn.Index(index_name) + index = self.pc.Index(index_name) return index.upsert( vectors=vectors, namespace=namespace, @@ -132,10 +174,18 @@ def upsert( **kwargs, ) - @staticmethod - def get_pod_spec_obj(environment, replicas, shards, pods, pod_type, metadata_config, source_collection): + def get_pod_spec_obj( + self, + replicas, + shards, + pods, + pod_type, + metadata_config, + source_collection, + environment: str | None = None, + ): return PodSpec( - environment=environment, + environment=environment or self.environment, replicas=replicas, shards=shards, pods=pods, @@ -144,9 +194,8 @@ def get_pod_spec_obj(environment, replicas, shards, pods, pod_type, metadata_con source_collection=source_collection, ) - @staticmethod - def get_serverless_spec_obj(cloud, region): - return ServerlessSpec(cloud=cloud, region=region) + def get_serverless_spec_obj(self, cloud, region: str | None = None): + return ServerlessSpec(cloud=cloud, region=region or self.region) def create_index( self, @@ -156,7 +205,7 @@ def create_index( metric: str | None = "cosine", timeout: int | None = None, ) -> None: - self.conn.create_index( + self.pc.create_index( name=index_name, dimension=dimension, spec=spec, @@ -170,7 +219,7 @@ def describe_index(self, index_name: str) -> Any: :param index_name: The name of the index to describe. """ - return self.conn.describe_index(name=index_name) + return self.pc.describe_index(name=index_name) def delete_index(self, index_name: str, timeout: int | None = None) -> None: """ @@ -179,7 +228,7 @@ def delete_index(self, index_name: str, timeout: int | None = None) -> None: :param index_name: the name of the index. :param timeout: Timeout for wait until index gets ready. """ - self.conn.delete_index(name=index_name, timeout=timeout) + self.pc.delete_index(name=index_name, timeout=timeout) def configure_index( self, index_name: str, replicas: int | None = None, pod_type: str | None = "" @@ -191,7 +240,7 @@ def configure_index( :param replicas: The new number of replicas. :param pod_type: the new pod_type for the index. """ - self.conn.configure_index(name=index_name, replicas=replicas, pod_type=pod_type) + self.pc.configure_index(name=index_name, replicas=replicas, pod_type=pod_type) @staticmethod def create_collection(self, collection_name: str, index_name: str) -> None: @@ -201,7 +250,7 @@ def create_collection(self, collection_name: str, index_name: str) -> None: :param collection_name: The name of the collection to create. :param index_name: The name of the source index. """ - self.conn.create_collection(name=collection_name, source=index_name) + self.pc.create_collection(name=collection_name, source=index_name) def delete_collection(self, collection_name: str) -> None: """ @@ -209,7 +258,7 @@ def delete_collection(self, collection_name: str) -> None: :param collection_name: The name of the collection to delete. """ - self.conn.delete_collection(collection_name) + self.pc.delete_collection(collection_name) def describe_collection(self, collection_name: str) -> Any: """ @@ -217,11 +266,11 @@ def describe_collection(self, collection_name: str) -> Any: :param collection_name: The name of the collection to describe. """ - return self.conn.describe_collection(collection_name) + return self.pc.describe_collection(collection_name) def list_collections(self) -> Any: """Retrieve a list of all collections in the current project.""" - return self.conn.list_collections() + return self.pc.list_collections() def query_vector( self, @@ -252,7 +301,7 @@ def query_vector( :param sparse_vector: sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]}, where the lists each have the same length. """ - index = self.conn.Index(index_name) + index = self.pc.Index(index_name) return index.query( vector=vector, id=query_id, @@ -290,7 +339,7 @@ def upsert_data_async( :param pool_threads: Number of threads for parallel upserting. If async_req is True, this must be provided. """ responses = [] - with self.conn.Index(index_name, pool_threads=pool_threads) as index: + with self.pc.Index(index_name, pool_threads=pool_threads) as index: if async_req and pool_threads: async_results = [index.upsert(vectors=chunk, async_req=True) for chunk in self._chunks(data)] responses = [async_result.get() for async_result in async_results] @@ -317,5 +366,5 @@ def describe_index_stats( :param stats_filter: If this parameter is present, the operation only returns statistics for vectors that satisfy the filter. See https://www.pinecone.io/docs/metadata-filtering/ """ - index = self.conn.Index(index_name) + index = self.pc.Index(index_name) return index.describe_index_stats(filter=stats_filter, **kwargs) diff --git a/airflow/providers/pinecone/provider.yaml b/airflow/providers/pinecone/provider.yaml index ba84949a1d8c8..9ef2562533d74 100644 --- a/airflow/providers/pinecone/provider.yaml +++ b/airflow/providers/pinecone/provider.yaml @@ -45,7 +45,7 @@ dependencies: # Pinecone Python SDK v3.0.0 was released at 2024-01-16 and introduce some breaking changes. # It's crucial to adhere to the v3.0.0 Migration Guide before the upper-bound limitation can be removed. # https://canyon-quilt-082.notion.site/Pinecone-Python-SDK-v3-0-0-Migration-Guide-056d3897d7634bf7be399676a4757c7b - - pinecone-client~=3.0.0 + - pinecone-client>=3.0.0 hooks: - integration-name: Pinecone diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index f6cafd2cbaea9..9d27aea47f6d5 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -908,7 +908,7 @@ "pinecone": { "deps": [ "apache-airflow>=2.6.0", - "pinecone-client~=3.0.0" + "pinecone-client>=3.0.0" ], "devel-deps": [], "cross-providers-deps": [], From cb83e6304ca809a19a5b2059d4d2ddd2b2ddc8fc Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 11:48:33 +0530 Subject: [PATCH 04/34] Add operators for creating index --- airflow/providers/pinecone/hooks/pinecone.py | 5 +- .../providers/pinecone/operators/pinecone.py | 101 ++++++++++++++++++ 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index b1e6d5e86a9ce..6cdb63e01c6b3 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -53,6 +53,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: from wtforms import StringField return { + "region": StringField(lazy_gettext("Pinecone Region"), widget=BS3TextFieldWidget(), default=None), "log_level": StringField(lazy_gettext("Log Level"), widget=BS3TextFieldWidget(), default=None), "project_id": StringField( lazy_gettext("Project ID"), @@ -67,7 +68,7 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: "hidden_fields": ["port", "schema"], "relabeling": { "login": "Pinecone Environment", - "host": "Pinecone Region", + "host": "Pinecone Host", "password": "Pinecone API key", }, } @@ -107,7 +108,7 @@ def environment(self): def region(self): if self._region: return self._region - region = self.conn.host + region = self.conn.extra_dejson.get("region") if not region: raise LookupError("Pinecone region not found in connection") return region diff --git a/airflow/providers/pinecone/operators/pinecone.py b/airflow/providers/pinecone/operators/pinecone.py index 1c757d8fa541c..73f7c674f9c98 100644 --- a/airflow/providers/pinecone/operators/pinecone.py +++ b/airflow/providers/pinecone/operators/pinecone.py @@ -22,6 +22,7 @@ from airflow.models import BaseOperator from airflow.providers.pinecone.hooks.pinecone import PineconeHook +from airflow.utils.context import Context if TYPE_CHECKING: from airflow.utils.context import Context @@ -81,3 +82,103 @@ def execute(self, context: Context) -> None: ) self.log.info("Successfully ingested data into Pinecone index %s.", self.index_name) + + +class CreatePodIndexOperator(BaseOperator): + """Create a pod based index in Pinecone.""" + + def __init__( + self, + *, + conn_id: str = PineconeHook.default_conn_name, + index_name: str, + dimension: int, + api_key: str | None = None, + environment: str | None = None, + replicas: int | None = None, + shards: int | None = None, + pods: int | None = None, + pod_type: str | None = None, + metadata_config: dict | None = None, + source_collection: str | None = None, + metric: str | None = None, + timeout: int | None = None, + **kwargs: Any, + ): + super().__init__(**kwargs) + self.conn_id = conn_id + self.index_name = index_name + self.api_key = api_key + self.dimension = dimension + self.environment = environment + self.replicas = replicas + self.shards = shards + self.pods = pods + self.pod_type = pod_type + self.metadata_config = metadata_config + self.source_collection = source_collection + self.metric = metric + self.timeout = timeout + + @cached_property + def hook(self) -> PineconeHook: + return PineconeHook(conn_id=self.conn_id, environment=self.environment, api_key=self.api_key) + + def execute(self, context: Context) -> None: + pod_spec_obj = self.hook.get_pod_spec_obj( + replicas=self.replicas, + shards=self.shards, + pods=self.pods, + pod_type=self.pod_type, + metadata_config=self.metadata_config, + source_collection=self.source_collection, + environment=self.environment, + ) + self.hook.create_index( + index_name=self.index_name, + dimension=self.dimension, + spec=pod_spec_obj, + metric=self.metric, + timeout=self.timeout, + ) + + +class CreateServerlessIndexOperator(BaseOperator): + """Create a serverless index in Pinecone.""" + + def __init__( + self, + *, + conn_id: str = PineconeHook.default_conn_name, + index_name: str, + dimension: int, + cloud: str, + api_key: str | None = None, + region: str | None = None, + metric: str | None = None, + timeout: int | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.conn_id = conn_id + self.index_name = index_name + self.dimension = dimension + self.api_key = api_key + self.cloud = cloud + self.region = region + self.metric = metric + self.timeout = timeout + + @cached_property + def hook(self) -> PineconeHook: + return PineconeHook(conn_id=self.conn_id, region=self.region, api_key=self.api_key) + + def execute(self, context: Context) -> None: + serverless_spec_obj = self.hook.get_serverless_spec_obj(cloud=self.cloud, region=self.region) + self.hook.create_index( + index_name=self.index_name, + dimension=self.dimension, + spec=serverless_spec_obj, + metric=self.metric, + timeout=self.timeout, + ) From df28c842e2b0a55a8f1c499fb4fe2deeadc70c50 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 12:11:30 +0530 Subject: [PATCH 05/34] support PINECONE_DEBUG_CURL --- airflow/providers/pinecone/hooks/pinecone.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index 6cdb63e01c6b3..5d4668da7c463 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -20,6 +20,7 @@ from __future__ import annotations import itertools +import os from functools import cached_property from typing import TYPE_CHECKING, Any @@ -50,11 +51,11 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext - from wtforms import StringField + from wtforms import BooleanField, StringField return { "region": StringField(lazy_gettext("Pinecone Region"), widget=BS3TextFieldWidget(), default=None), - "log_level": StringField(lazy_gettext("Log Level"), widget=BS3TextFieldWidget(), default=None), + "debug_curl": BooleanField(lazy_gettext("PINECONE_DEBUG_CURL"), default=False), "project_id": StringField( lazy_gettext("Project ID"), widget=BS3TextFieldWidget(), @@ -119,11 +120,10 @@ def pc(self): pinecone_host = self.conn.host extras = self.conn.extra_dejson pinecone_project_id = extras.get("project_id") - log_level = extras.get("log_level", None) - - return Pinecone( - api_key=self.api_key, host=pinecone_host, project_id=pinecone_project_id, log_level=log_level - ) + enable_curl_debug = extras.get("debug_curl") + if enable_curl_debug: + os.environ["PINECONE_DEBUG_CURL"] = "true" + return Pinecone(api_key=self.api_key, host=pinecone_host, project_id=pinecone_project_id) def get_conn(self) -> Pinecone: return self.get_connection(self.conn_id) From d2b6c184a67856a00ccb45bf73b0f8a2420e25f8 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 14:12:06 +0530 Subject: [PATCH 06/34] update tests --- airflow/providers/pinecone/hooks/pinecone.py | 12 +++--- .../providers/pinecone/hooks/test_pinecone.py | 41 ++++++++++++++++--- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index 5d4668da7c463..d6426428e822a 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -177,12 +177,12 @@ def upsert( def get_pod_spec_obj( self, - replicas, - shards, - pods, - pod_type, - metadata_config, - source_collection, + replicas: int | None = None, + shards: int | None = None, + pods: int | None = None, + pod_type: str | None = "p1.x1", + metadata_config: dict | None = None, + source_collection: str | None = None, environment: str | None = None, ): return PodSpec( diff --git a/tests/providers/pinecone/hooks/test_pinecone.py b/tests/providers/pinecone/hooks/test_pinecone.py index 2fc1421b0c770..82a01e53198d9 100644 --- a/tests/providers/pinecone/hooks/test_pinecone.py +++ b/tests/providers/pinecone/hooks/test_pinecone.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import os from unittest.mock import Mock, patch from airflow.providers.pinecone.hooks.pinecone import PineconeHook @@ -28,8 +29,9 @@ def setup_method(self): with patch("airflow.models.Connection.get_connection_from_secrets") as mock_get_connection: mock_conn = Mock() mock_conn.host = "pinecone.io" - mock_conn.login = "test_user" - mock_conn.password = "test_password" + mock_conn.login = "us-west1-gcp" # Pinecone Environment + mock_conn.password = "test_password" # Pinecone API Key + mock_conn.extra_dejson = {"region": "us-east-1", "debug_curl": True} mock_get_connection.return_value = mock_conn self.pinecone_hook = PineconeHook() self.pinecone_hook.conn @@ -50,11 +52,38 @@ def test_list_indexes(self, mock_list_indexes): self.pinecone_hook.list_indexes() mock_list_indexes.assert_called_once() + @patch("airflow.providers.pinecone.hooks.pinecone.PineconeHook.list_indexes") + def test_debug_curl_setting(self, mock_list_indexes): + """Test that the PINECONE_DEBUG_CURL environment variable is set when initializing Pinecone Object.""" + self.pinecone_hook.list_indexes() + mock_list_indexes.assert_called_once() + assert os.environ.get("PINECONE_DEBUG_CURL") == "true" + + @patch("airflow.providers.pinecone.hooks.pinecone.PineconeHook.create_index") + def test_create_index_for_pod_based(self, mock_create_index): + """Test that the create_index method of PineconeHook is called with correct arguments for pod based index.""" + pod_spec = self.pinecone_hook.get_pod_spec_obj() + self.pinecone_hook.create_index(index_name=self.index_name, dimension=128, spec=pod_spec) + mock_create_index.assert_called_once_with(index_name="test_index", dimension=128, spec=pod_spec) + @patch("airflow.providers.pinecone.hooks.pinecone.PineconeHook.create_index") - def test_create_index(self, mock_create_index): - """Test that the create_index method of PineconeHook is called with correct arguments.""" - self.pinecone_hook.create_index(index_name=self.index_name, dimension=128) - mock_create_index.assert_called_once_with(index_name="test_index", dimension=128) + def test_create_index_for_serverless_based(self, mock_create_index): + """Test that the create_index method of PineconeHook is called with correct arguments for serverless index.""" + serverless_spec = self.pinecone_hook.get_serverless_spec_obj(cloud="aws") + self.pinecone_hook.create_index(index_name=self.index_name, dimension=128, spec=serverless_spec) + mock_create_index.assert_called_once_with( + index_name="test_index", dimension=128, spec=serverless_spec + ) + + def test_get_pod_spec_obj(self): + """Test that the get_pod_spec_obj method of PineconeHook returns the correct pod spec object.""" + pod_spec = self.pinecone_hook.get_pod_spec_obj() + assert pod_spec.environment == "us-west1-gcp" + + def test_get_serverless_spec_obj(self): + """Test that the get_serverless_spec_obj method of PineconeHook returns the correct serverless spec object.""" + serverless_spec = self.pinecone_hook.get_serverless_spec_obj(cloud="gcp") + assert serverless_spec.region == "us-east-1" @patch("airflow.providers.pinecone.hooks.pinecone.PineconeHook.describe_index") def test_describe_index(self, mock_describe_index): From 23271dc546232e424ee00a24a9ecf0d855a16525 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 15:09:48 +0530 Subject: [PATCH 07/34] update changelog --- airflow/providers/pinecone/CHANGELOG.rst | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index 7b2a20deb023c..36ae26dc73d8f 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -20,6 +20,29 @@ Changelog --------- +2.0.0 +..... + +Breaking changes +~~~~~~~~~~~~~~~~ + +.. note:: + This release of provider has breaking changes from previous versions. Changes are based on + the migration guide from pinecone - + +* PineconeHook's ``create_index`` method now requies to pass a Specification object for creation of Pod or Serverless based index. + +Features +~~~~~~~~ + +* supports pod based index creation using ``CreatePodIndexOperator`` Operator +* Supports Serverless index creation using ``CreateServerlessIndexOperator`` Operator +* Supports PINECONE_DEBUG_CURL environment variable via Connection to enable curl debugging + +Misc +~~~~ +* Support ``pinecone-client`` version >=3.0.0 + 1.1.2 ..... From ed4781f0e1d96dcaadce8fa57ab5e480204922e1 Mon Sep 17 00:00:00 2001 From: Kalyan Date: Sat, 13 Apr 2024 15:52:33 +0530 Subject: [PATCH 08/34] Update airflow/providers/pinecone/CHANGELOG.rst Co-authored-by: Andrey Anshin --- airflow/providers/pinecone/CHANGELOG.rst | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index 36ae26dc73d8f..872d6d3852fb9 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -30,19 +30,6 @@ Breaking changes This release of provider has breaking changes from previous versions. Changes are based on the migration guide from pinecone - -* PineconeHook's ``create_index`` method now requies to pass a Specification object for creation of Pod or Serverless based index. - -Features -~~~~~~~~ - -* supports pod based index creation using ``CreatePodIndexOperator`` Operator -* Supports Serverless index creation using ``CreateServerlessIndexOperator`` Operator -* Supports PINECONE_DEBUG_CURL environment variable via Connection to enable curl debugging - -Misc -~~~~ -* Support ``pinecone-client`` version >=3.0.0 - 1.1.2 ..... From 2d355a0033a6003251745b7846f43fdb9bd29038 Mon Sep 17 00:00:00 2001 From: Kalyan Date: Sat, 13 Apr 2024 15:53:44 +0530 Subject: [PATCH 09/34] Update airflow/providers/pinecone/CHANGELOG.rst Co-authored-by: Andrey Anshin --- airflow/providers/pinecone/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index 872d6d3852fb9..e6a195abaa06e 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -26,7 +26,7 @@ Changelog Breaking changes ~~~~~~~~~~~~~~~~ -.. note:: +.. warning:: This release of provider has breaking changes from previous versions. Changes are based on the migration guide from pinecone - From 358f682be880686fb7fddd80a073da938233bcb0 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 18:40:47 +0530 Subject: [PATCH 10/34] update Changelog --- airflow/providers/pinecone/CHANGELOG.rst | 17 +++++++++++++++++ airflow/providers/pinecone/hooks/pinecone.py | 16 +++++++++------- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index e6a195abaa06e..b5a74ea788d9b 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -30,6 +30,23 @@ Breaking changes This release of provider has breaking changes from previous versions. Changes are based on the migration guide from pinecone - +* ``log_level`` field is removed from the Connections as it is not used by the provider anymore. +* ``PineconeHook.get_conn`` now returns ``Connection`` object instead of ``PineconeConnection`` object. Use ``pc`` property to access the Pinecone client. +* Following ``PineconeHook`` methods are no longer staticmethods: + * ``PineconeHook.list_indexes`` + * ``PineconeHook.upsert`` + * ``PineconeHook.create_index`` + * ``PineconeHook.describe_index`` + * ``PineconeHook.delete_index`` + * ``PineconeHook.configure_index`` + * ``PineconeHook.create_collection`` + * ``PineconeHook.delete_collection`` + * ``PineconeHook.describe_collection`` + * ``PineconeHook.list_collections`` + * ``PineconeHook.query_vector`` + * ``PineconeHook.describe_index_stats`` +* ``PineconeHook.create_index`` is updated to accept a ``ServerlessSpec`` or ``PodSpec`` instead of directly accpeting index related configurations + 1.1.2 ..... diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index d6426428e822a..c6a8064fbe4a7 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -32,6 +32,8 @@ from pinecone.core.client.model.sparse_values import SparseValues from pinecone.core.client.models import DescribeIndexStatsResponse, QueryResponse, UpsertResponse + from airflow.models.connection import Connection + class PineconeHook(BaseHook): """ @@ -88,7 +90,7 @@ def __init__( self.conn = self.get_conn() @property - def api_key(self): + def api_key(self) -> str: if self._api_key: return self._api_key key = self.conn.password @@ -97,7 +99,7 @@ def api_key(self): return key @cached_property - def environment(self): + def environment(self) -> str: if self._environment: return self._environment env = self.conn.login @@ -106,7 +108,7 @@ def environment(self): return env @cached_property - def region(self): + def region(self) -> str: if self._region: return self._region region = self.conn.extra_dejson.get("region") @@ -115,7 +117,7 @@ def region(self): return region @cached_property - def pc(self): + def pc(self) -> Pinecone: """Pinecone object to interact with Pinecone.""" pinecone_host = self.conn.host extras = self.conn.extra_dejson @@ -125,7 +127,7 @@ def pc(self): os.environ["PINECONE_DEBUG_CURL"] = "true" return Pinecone(api_key=self.api_key, host=pinecone_host, project_id=pinecone_project_id) - def get_conn(self) -> Pinecone: + def get_conn(self) -> Connection: return self.get_connection(self.conn_id) def test_connection(self) -> tuple[bool, str]: @@ -184,7 +186,7 @@ def get_pod_spec_obj( metadata_config: dict | None = None, source_collection: str | None = None, environment: str | None = None, - ): + ) -> PodSpec: return PodSpec( environment=environment or self.environment, replicas=replicas, @@ -195,7 +197,7 @@ def get_pod_spec_obj( source_collection=source_collection, ) - def get_serverless_spec_obj(self, cloud, region: str | None = None): + def get_serverless_spec_obj(self, cloud, region: str | None = None) -> ServerlessSpec: return ServerlessSpec(cloud=cloud, region=region or self.region) def create_index( From 7bd914f45eea82ee45683e4ef06f0da10499f992 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 18:41:30 +0530 Subject: [PATCH 11/34] fix spelling --- airflow/providers/pinecone/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index b5a74ea788d9b..fde12af995730 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -32,7 +32,7 @@ Breaking changes * ``log_level`` field is removed from the Connections as it is not used by the provider anymore. * ``PineconeHook.get_conn`` now returns ``Connection`` object instead of ``PineconeConnection`` object. Use ``pc`` property to access the Pinecone client. -* Following ``PineconeHook`` methods are no longer staticmethods: +* Following ``PineconeHook`` methods are no longer static methods: * ``PineconeHook.list_indexes`` * ``PineconeHook.upsert`` * ``PineconeHook.create_index`` From 3c308a614d816d07380d24050d9b35c4b3f0f114 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 18:49:44 +0530 Subject: [PATCH 12/34] update Changelog --- airflow/providers/pinecone/CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index fde12af995730..09ef35d641939 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -46,6 +46,7 @@ Breaking changes * ``PineconeHook.query_vector`` * ``PineconeHook.describe_index_stats`` * ``PineconeHook.create_index`` is updated to accept a ``ServerlessSpec`` or ``PodSpec`` instead of directly accpeting index related configurations +* To initialize ``PineconeHook`` object,ß API Key needs to be passed as an argument to the constructor or be updated in the connection. 1.1.2 ..... From f2ae94c6b93bf8dc6346dbe7a3ccae3d800c8017 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 18:50:03 +0530 Subject: [PATCH 13/34] update Changelog --- airflow/providers/pinecone/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index 09ef35d641939..2ea551d5a51dc 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -46,7 +46,7 @@ Breaking changes * ``PineconeHook.query_vector`` * ``PineconeHook.describe_index_stats`` * ``PineconeHook.create_index`` is updated to accept a ``ServerlessSpec`` or ``PodSpec`` instead of directly accpeting index related configurations -* To initialize ``PineconeHook`` object,ß API Key needs to be passed as an argument to the constructor or be updated in the connection. +* To initialize ``PineconeHook`` object,API Key needs to be passed as an argument to the constructor or be updated in the connection. 1.1.2 ..... From ab501bfad9cd5e1a4f40661b83233a7af6cc7aa8 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 18:58:45 +0530 Subject: [PATCH 14/34] fix spelling --- airflow/providers/pinecone/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index 2ea551d5a51dc..33f1eca61b975 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -45,7 +45,7 @@ Breaking changes * ``PineconeHook.list_collections`` * ``PineconeHook.query_vector`` * ``PineconeHook.describe_index_stats`` -* ``PineconeHook.create_index`` is updated to accept a ``ServerlessSpec`` or ``PodSpec`` instead of directly accpeting index related configurations +* ``PineconeHook.create_index`` is updated to accept a ``ServerlessSpec`` or ``PodSpec`` instead of directly accepting index related configurations * To initialize ``PineconeHook`` object,API Key needs to be passed as an argument to the constructor or be updated in the connection. 1.1.2 From 8d733c1a27b2175ff8748035b2f99cc61facb050 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 19:00:45 +0530 Subject: [PATCH 15/34] update docs --- docs/apache-airflow-providers-pinecone/connections.rst | 8 +++++++- .../operators/pinecone.rst | 10 ++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-pinecone/connections.rst b/docs/apache-airflow-providers-pinecone/connections.rst index 07054a9388b79..d468bf7ef51b6 100644 --- a/docs/apache-airflow-providers-pinecone/connections.rst +++ b/docs/apache-airflow-providers-pinecone/connections.rst @@ -34,10 +34,16 @@ Host (optional) Host URL to connect to a specific Pinecone index. Pinecone Environment (required) - Specify your Pinecone environment to connect to. + Specify your Pinecone environment for pod based indexes. Pinecone API key (required) Specify your Pinecone API Key to connect. Project ID (required) Project ID corresponding to your API Key. + +Pinecone Region (optional) + Specify the region for Serverless Indexes in Pinecone. + +PINECONE_DEBUG_CURL (optional) + Set to ``true`` to enable curl debug output. diff --git a/docs/apache-airflow-providers-pinecone/operators/pinecone.rst b/docs/apache-airflow-providers-pinecone/operators/pinecone.rst index 71f847919fa80..92ab822070c56 100644 --- a/docs/apache-airflow-providers-pinecone/operators/pinecone.rst +++ b/docs/apache-airflow-providers-pinecone/operators/pinecone.rst @@ -38,3 +38,13 @@ An example using the operator in this way: :dedent: 4 :start-after: [START howto_operator_pinecone_ingest] :end-before: [END howto_operator_pinecone_ingest] + + +CreatePodIndexOperator +====================== + +Use the :class:`~airflow.providers.pinecone.operators.pinecone.CreatePodIndexOperator` to +interact with Pinecone APIs to create Pod based Index. + +Using the Operator +^^^^^^^^^^^^^^^^^^ From 7fa7b7b6a49cf345ecc9944ec0fbd6130acb776d Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 21:20:19 +0530 Subject: [PATCH 16/34] add example dags for new operators --- airflow/providers/pinecone/CHANGELOG.rst | 2 +- .../pinecone/example_create_pod_index.py | 52 +++++++++++++++++++ .../example_create_serverless_index.py | 51 ++++++++++++++++++ 3 files changed, 104 insertions(+), 1 deletion(-) create mode 100644 tests/system/providers/pinecone/example_create_pod_index.py create mode 100644 tests/system/providers/pinecone/example_create_serverless_index.py diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index 33f1eca61b975..791f2593439fe 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -32,7 +32,7 @@ Breaking changes * ``log_level`` field is removed from the Connections as it is not used by the provider anymore. * ``PineconeHook.get_conn`` now returns ``Connection`` object instead of ``PineconeConnection`` object. Use ``pc`` property to access the Pinecone client. -* Following ``PineconeHook`` methods are no longer static methods: +* Following ``PineconeHook`` methods are converted from static methods to instance methods. Hence, Initialization is required to use these now: * ``PineconeHook.list_indexes`` * ``PineconeHook.upsert`` * ``PineconeHook.create_index`` diff --git a/tests/system/providers/pinecone/example_create_pod_index.py b/tests/system/providers/pinecone/example_create_pod_index.py new file mode 100644 index 0000000000000..19d15f709009f --- /dev/null +++ b/tests/system/providers/pinecone/example_create_pod_index.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from datetime import datetime + +from airflow import DAG +from airflow.providers.pinecone.operators.pinecone import CreatePodIndexOperator + +index_name = os.getenv("INDEX_NAME", "test") + + +with DAG( + "example_pinecone_create_pod_index", + schedule="@once", + start_date=datetime(2024, 1, 1), + catchup=False, +) as dag: + # [START howto_operator_create_pod_index] + # reference: https://docs.pinecone.io/reference/api/control-plane/create_index + CreatePodIndexOperator( + task_id="pinecone_create_pod_index", + index_name=index_name, + dimension=3, + api_key="test", + replicas=1, + shards=1, + pods=1, + pod_type="p1.x1", + ) + # [END howto_operator_create_pod_index] + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/pinecone/example_create_serverless_index.py b/tests/system/providers/pinecone/example_create_serverless_index.py new file mode 100644 index 0000000000000..45ce887373d6c --- /dev/null +++ b/tests/system/providers/pinecone/example_create_serverless_index.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from datetime import datetime + +from airflow import DAG +from airflow.providers.pinecone.operators.pinecone import CreateServerlessIndexOperator + +index_name = os.getenv("INDEX_NAME", "test") + + +with DAG( + "example_pinecone_create_serverless_index", + schedule="@once", + start_date=datetime(2024, 1, 1), + catchup=False, +) as dag: + # [START howto_operator_create_serverless_index] + # reference: https://docs.pinecone.io/reference/api/control-plane/create_index + CreateServerlessIndexOperator( + task_id="pinecone_create_serverless_index", + index_name=index_name, + dimension=128, + cloud="aws", + api_key="test", + region="us-west-2", + metric="cosine", + ) + # [END howto_operator_create_serverless_index] + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From a212adab5ec3756c86d99f1d939183f96ef6202f Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 21:59:05 +0530 Subject: [PATCH 17/34] update docs --- airflow/providers/pinecone/CHANGELOG.rst | 26 ++++++++++--------- .../index.rst | 4 +-- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index 791f2593439fe..1ec7496ddf7d5 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -33,18 +33,20 @@ Breaking changes * ``log_level`` field is removed from the Connections as it is not used by the provider anymore. * ``PineconeHook.get_conn`` now returns ``Connection`` object instead of ``PineconeConnection`` object. Use ``pc`` property to access the Pinecone client. * Following ``PineconeHook`` methods are converted from static methods to instance methods. Hence, Initialization is required to use these now: - * ``PineconeHook.list_indexes`` - * ``PineconeHook.upsert`` - * ``PineconeHook.create_index`` - * ``PineconeHook.describe_index`` - * ``PineconeHook.delete_index`` - * ``PineconeHook.configure_index`` - * ``PineconeHook.create_collection`` - * ``PineconeHook.delete_collection`` - * ``PineconeHook.describe_collection`` - * ``PineconeHook.list_collections`` - * ``PineconeHook.query_vector`` - * ``PineconeHook.describe_index_stats`` + + + ``PineconeHook.list_indexes`` + + ``PineconeHook.upsert`` + + ``PineconeHook.create_index`` + + ``PineconeHook.describe_index`` + + ``PineconeHook.delete_index`` + + ``PineconeHook.configure_index`` + + ``PineconeHook.create_collection`` + + ``PineconeHook.delete_collection`` + + ``PineconeHook.describe_collection`` + + ``PineconeHook.list_collections`` + + ``PineconeHook.query_vector`` + + ``PineconeHook.describe_index_stats`` + * ``PineconeHook.create_index`` is updated to accept a ``ServerlessSpec`` or ``PodSpec`` instead of directly accepting index related configurations * To initialize ``PineconeHook`` object,API Key needs to be passed as an argument to the constructor or be updated in the connection. diff --git a/docs/apache-airflow-providers-pinecone/index.rst b/docs/apache-airflow-providers-pinecone/index.rst index d82935e3181c8..91913b916844c 100644 --- a/docs/apache-airflow-providers-pinecone/index.rst +++ b/docs/apache-airflow-providers-pinecone/index.rst @@ -69,7 +69,7 @@ Package apache-airflow-providers-pinecone `Pinecone `__ -Release: 1.1.2 +Release: 2.0.0 Provider package ---------------- @@ -93,5 +93,5 @@ The minimum Apache Airflow version supported by this provider package is ``2.6.0 PIP package Version required =================== ================== ``apache-airflow`` ``>=2.6.0`` -``pinecone-client`` ``>=2.2.4,<3.0`` +``pinecone-client`` ``>=3.0.0`` =================== ================== From a486b8692da72c58cbbf8f40af76ae43c77ca3c0 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 13 Apr 2024 23:54:58 +0530 Subject: [PATCH 18/34] update docs --- airflow/providers/pinecone/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index 1ec7496ddf7d5..4565d065639a3 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -48,7 +48,7 @@ Breaking changes + ``PineconeHook.describe_index_stats`` * ``PineconeHook.create_index`` is updated to accept a ``ServerlessSpec`` or ``PodSpec`` instead of directly accepting index related configurations -* To initialize ``PineconeHook`` object,API Key needs to be passed as an argument to the constructor or be updated in the connection. +* To initialize ``PineconeHook`` object, API key needs to be passed via argument or the connection. 1.1.2 ..... From b2724d190c2d2ecee3e7391f8dc8e9e37b93994d Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sun, 14 Apr 2024 00:23:50 +0530 Subject: [PATCH 19/34] update docs --- docs/apache-airflow-providers-pinecone/connections.rst | 4 ++-- .../operators/pinecone.rst | 10 ---------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/docs/apache-airflow-providers-pinecone/connections.rst b/docs/apache-airflow-providers-pinecone/connections.rst index d468bf7ef51b6..50a72b133a950 100644 --- a/docs/apache-airflow-providers-pinecone/connections.rst +++ b/docs/apache-airflow-providers-pinecone/connections.rst @@ -33,13 +33,13 @@ Configuring the Connection Host (optional) Host URL to connect to a specific Pinecone index. -Pinecone Environment (required) +Pinecone Environment (optional) Specify your Pinecone environment for pod based indexes. Pinecone API key (required) Specify your Pinecone API Key to connect. -Project ID (required) +Project ID (optional) Project ID corresponding to your API Key. Pinecone Region (optional) diff --git a/docs/apache-airflow-providers-pinecone/operators/pinecone.rst b/docs/apache-airflow-providers-pinecone/operators/pinecone.rst index 92ab822070c56..71f847919fa80 100644 --- a/docs/apache-airflow-providers-pinecone/operators/pinecone.rst +++ b/docs/apache-airflow-providers-pinecone/operators/pinecone.rst @@ -38,13 +38,3 @@ An example using the operator in this way: :dedent: 4 :start-after: [START howto_operator_pinecone_ingest] :end-before: [END howto_operator_pinecone_ingest] - - -CreatePodIndexOperator -====================== - -Use the :class:`~airflow.providers.pinecone.operators.pinecone.CreatePodIndexOperator` to -interact with Pinecone APIs to create Pod based Index. - -Using the Operator -^^^^^^^^^^^^^^^^^^ From d00ebe2c485408d905e53eca8cf2113a7d81b362 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sun, 14 Apr 2024 10:50:23 +0530 Subject: [PATCH 20/34] update docs --- .../operators/pinecone.rst | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/apache-airflow-providers-pinecone/operators/pinecone.rst b/docs/apache-airflow-providers-pinecone/operators/pinecone.rst index 71f847919fa80..b69bd1db51fea 100644 --- a/docs/apache-airflow-providers-pinecone/operators/pinecone.rst +++ b/docs/apache-airflow-providers-pinecone/operators/pinecone.rst @@ -38,3 +38,24 @@ An example using the operator in this way: :dedent: 4 :start-after: [START howto_operator_pinecone_ingest] :end-before: [END howto_operator_pinecone_ingest] + + +CreatePodIndexOperator +====================== + +Use the :class:`~airflow.providers.pinecone.operators.pinecone.CreatePodIndexOperator` to +interact with Pinecone APIs to create a Pod based Index. + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +The CreatePodIndexOperator requires the index details as well as the pod configuration details. ``api_key``, ``environment`` can be +passed via arguments to the operator or via the connection. + +An example using the operator in this way: + +.. exampleinclude:: /../../tests/system/providers/pinecone/example_dag_pod_index.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_create_pod_index] + :end-before: [END howto_operator_create_pod_index] From f920fb832aba504faaf5bdd6136db8942d43906f Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sun, 14 Apr 2024 11:38:53 +0530 Subject: [PATCH 21/34] update docstrings --- .../providers/pinecone/operators/pinecone.py | 39 ++++++++++++++++++- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/airflow/providers/pinecone/operators/pinecone.py b/airflow/providers/pinecone/operators/pinecone.py index 73f7c674f9c98..c7b945b42e805 100644 --- a/airflow/providers/pinecone/operators/pinecone.py +++ b/airflow/providers/pinecone/operators/pinecone.py @@ -85,7 +85,27 @@ def execute(self, context: Context) -> None: class CreatePodIndexOperator(BaseOperator): - """Create a pod based index in Pinecone.""" + """ + Create a pod based index in Pinecone. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CreatePodIndexOperator` + + :param conn_id: The connection id to use when connecting to Pinecone. + :param index_name: Name of the Pinecone index. + :param dimension: The dimension of the vectors to be indexed. + :param api_key: The API key to use when connecting to Pinecone. + :param environment: The environment to use when creating the index. + :param replicas: The number of replicas to use. + :param shards: The number of shards to use. + :param pods: The number of pods to use. + :param pod_type: The type of pod to use. + :param metadata_config: The metadata configuration to use. + :param source_collection: The source collection to use. + :param metric: The metric to use. + :param timeout: The timeout to use. + """ def __init__( self, @@ -144,7 +164,22 @@ def execute(self, context: Context) -> None: class CreateServerlessIndexOperator(BaseOperator): - """Create a serverless index in Pinecone.""" + """ + Create a serverless index in Pinecone. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CreateServerlessIndexOperator` + + :param conn_id: The connection id to use when connecting to Pinecone. + :param index_name: Name of the Pinecone index. + :param dimension: The dimension of the vectors to be indexed. + :param cloud: The cloud to use when creating the index. + :param api_key: The API key to use when connecting to Pinecone. + :param region: The region to use when creating the index. + :param metric: The metric to use. + :param timeout: The timeout to use. + """ def __init__( self, From dbd8e304bcadf6c914374bcf7aabc03b37433961 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sun, 14 Apr 2024 12:28:55 +0530 Subject: [PATCH 22/34] update docs --- .../operators/pinecone.rst | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/docs/apache-airflow-providers-pinecone/operators/pinecone.rst b/docs/apache-airflow-providers-pinecone/operators/pinecone.rst index b69bd1db51fea..b50e5300f09a9 100644 --- a/docs/apache-airflow-providers-pinecone/operators/pinecone.rst +++ b/docs/apache-airflow-providers-pinecone/operators/pinecone.rst @@ -15,10 +15,13 @@ specific language governing permissions and limitations under the License. +Operators +--------- + .. _howto/operator:PineconeIngestOperator: -PineconeIngestOperator -====================== +Ingest data into a pinecone index +================================= Use the :class:`~airflow.providers.pinecone.operators.pinecone.PineconeIngestOperator` to interact with Pinecone APIs to ingest vectors. @@ -39,9 +42,10 @@ An example using the operator in this way: :start-after: [START howto_operator_pinecone_ingest] :end-before: [END howto_operator_pinecone_ingest] +.. _howto/operator:CreatePodIndexOperator: -CreatePodIndexOperator -====================== +Create a Pod based Index +======================== Use the :class:`~airflow.providers.pinecone.operators.pinecone.CreatePodIndexOperator` to interact with Pinecone APIs to create a Pod based Index. @@ -49,13 +53,36 @@ interact with Pinecone APIs to create a Pod based Index. Using the Operator ^^^^^^^^^^^^^^^^^^ -The CreatePodIndexOperator requires the index details as well as the pod configuration details. ``api_key``, ``environment`` can be +The ``CreatePodIndexOperator`` requires the index details as well as the pod configuration details. ``api_key``, ``environment`` can be passed via arguments to the operator or via the connection. An example using the operator in this way: -.. exampleinclude:: /../../tests/system/providers/pinecone/example_dag_pod_index.py +.. exampleinclude:: /../../tests/system/providers/pinecone/example_create_pod_index.py :language: python :dedent: 4 :start-after: [START howto_operator_create_pod_index] :end-before: [END howto_operator_create_pod_index] + + +.. _howto/operator:CreateServerlessIndexOperator: + +Create a Serverless Index +========================= + +Use the :class:`~airflow.providers.pinecone.operators.pinecone.CreateServerlessIndexOperator` to +interact with Pinecone APIs to create a Pod based Index. + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +The ``CreateServerlessIndexOperator`` requires the index details as well as the Serverless configuration details. ``api_key``, ``environment`` can be +passed via arguments to the operator or via the connection. + +An example using the operator in this way: + +.. exampleinclude:: /../../tests/system/providers/pinecone/example_create_serverless_index.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_create_serverless_index] + :end-before: [END howto_operator_create_serverless_index] From 6367618dca5cb72cf30e6dc3b3c81bd4484b6816 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 16 Apr 2024 05:11:56 +0530 Subject: [PATCH 23/34] accept api_key only from connections --- airflow/providers/pinecone/hooks/pinecone.py | 9 +-------- airflow/providers/pinecone/operators/pinecone.py | 10 ++-------- .../providers/pinecone/example_create_pod_index.py | 1 - .../pinecone/example_create_serverless_index.py | 1 - 4 files changed, 3 insertions(+), 18 deletions(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index c6a8064fbe4a7..7964ee2378ff9 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -77,22 +77,15 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: } def __init__( - self, - conn_id: str = default_conn_name, - environment: str | None = None, - region: str | None = None, - api_key: str | None = None, + self, conn_id: str = default_conn_name, environment: str | None = None, region: str | None = None ) -> None: self.conn_id = conn_id self._environment = environment self._region = region - self._api_key = api_key self.conn = self.get_conn() @property def api_key(self) -> str: - if self._api_key: - return self._api_key key = self.conn.password if not key: raise LookupError("Pinecone API Key not found in connection") diff --git a/airflow/providers/pinecone/operators/pinecone.py b/airflow/providers/pinecone/operators/pinecone.py index c7b945b42e805..8431276206e07 100644 --- a/airflow/providers/pinecone/operators/pinecone.py +++ b/airflow/providers/pinecone/operators/pinecone.py @@ -95,7 +95,6 @@ class CreatePodIndexOperator(BaseOperator): :param conn_id: The connection id to use when connecting to Pinecone. :param index_name: Name of the Pinecone index. :param dimension: The dimension of the vectors to be indexed. - :param api_key: The API key to use when connecting to Pinecone. :param environment: The environment to use when creating the index. :param replicas: The number of replicas to use. :param shards: The number of shards to use. @@ -113,7 +112,6 @@ def __init__( conn_id: str = PineconeHook.default_conn_name, index_name: str, dimension: int, - api_key: str | None = None, environment: str | None = None, replicas: int | None = None, shards: int | None = None, @@ -128,7 +126,6 @@ def __init__( super().__init__(**kwargs) self.conn_id = conn_id self.index_name = index_name - self.api_key = api_key self.dimension = dimension self.environment = environment self.replicas = replicas @@ -142,7 +139,7 @@ def __init__( @cached_property def hook(self) -> PineconeHook: - return PineconeHook(conn_id=self.conn_id, environment=self.environment, api_key=self.api_key) + return PineconeHook(conn_id=self.conn_id, environment=self.environment) def execute(self, context: Context) -> None: pod_spec_obj = self.hook.get_pod_spec_obj( @@ -175,7 +172,6 @@ class CreateServerlessIndexOperator(BaseOperator): :param index_name: Name of the Pinecone index. :param dimension: The dimension of the vectors to be indexed. :param cloud: The cloud to use when creating the index. - :param api_key: The API key to use when connecting to Pinecone. :param region: The region to use when creating the index. :param metric: The metric to use. :param timeout: The timeout to use. @@ -188,7 +184,6 @@ def __init__( index_name: str, dimension: int, cloud: str, - api_key: str | None = None, region: str | None = None, metric: str | None = None, timeout: int | None = None, @@ -198,7 +193,6 @@ def __init__( self.conn_id = conn_id self.index_name = index_name self.dimension = dimension - self.api_key = api_key self.cloud = cloud self.region = region self.metric = metric @@ -206,7 +200,7 @@ def __init__( @cached_property def hook(self) -> PineconeHook: - return PineconeHook(conn_id=self.conn_id, region=self.region, api_key=self.api_key) + return PineconeHook(conn_id=self.conn_id, region=self.region) def execute(self, context: Context) -> None: serverless_spec_obj = self.hook.get_serverless_spec_obj(cloud=self.cloud, region=self.region) diff --git a/tests/system/providers/pinecone/example_create_pod_index.py b/tests/system/providers/pinecone/example_create_pod_index.py index 19d15f709009f..9b6f7d7d8882d 100644 --- a/tests/system/providers/pinecone/example_create_pod_index.py +++ b/tests/system/providers/pinecone/example_create_pod_index.py @@ -37,7 +37,6 @@ task_id="pinecone_create_pod_index", index_name=index_name, dimension=3, - api_key="test", replicas=1, shards=1, pods=1, diff --git a/tests/system/providers/pinecone/example_create_serverless_index.py b/tests/system/providers/pinecone/example_create_serverless_index.py index 45ce887373d6c..a7924e63ef338 100644 --- a/tests/system/providers/pinecone/example_create_serverless_index.py +++ b/tests/system/providers/pinecone/example_create_serverless_index.py @@ -38,7 +38,6 @@ index_name=index_name, dimension=128, cloud="aws", - api_key="test", region="us-west-2", metric="cosine", ) From 8c9f421785ae70f623a72c545038cb285e8fc191 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 16 Apr 2024 21:05:10 +0530 Subject: [PATCH 24/34] Add missing docstrings --- airflow/providers/pinecone/hooks/pinecone.py | 26 ++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index 7964ee2378ff9..2cd9e61be3f02 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -180,6 +180,17 @@ def get_pod_spec_obj( source_collection: str | None = None, environment: str | None = None, ) -> PodSpec: + """ + Get a PodSpec object. + + :param replicas: The number of replicas. + :param shards: The number of shards. + :param pods: The number of pods. + :param pod_type: The type of pod. + :param metadata_config: The metadata configuration. + :param source_collection: The source collection. + :param environment: The environment to use when creating the index. + """ return PodSpec( environment=environment or self.environment, replicas=replicas, @@ -191,6 +202,12 @@ def get_pod_spec_obj( ) def get_serverless_spec_obj(self, cloud, region: str | None = None) -> ServerlessSpec: + """ + Get a ServerlessSpec object. + + :param cloud: The cloud provider. + :param region: The region to use when creating the index. + """ return ServerlessSpec(cloud=cloud, region=region or self.region) def create_index( @@ -201,6 +218,15 @@ def create_index( metric: str | None = "cosine", timeout: int | None = None, ) -> None: + """ + Create a new index. + + :param index_name: The name of the index. + :param dimension: The dimension of the vectors to be indexed. + :param spec: Pass a `ServerlessSpec` object to create a serverless index or a `PodSpec` object to create a pod index. + :param metric: The metric to use. + :param timeout: The timeout to use. + """ self.pc.create_index( name=index_name, dimension=dimension, From 11ef4b56e72e4bc2f095f2ee9189c8d8444ff0db Mon Sep 17 00:00:00 2001 From: Kalyan Date: Thu, 18 Apr 2024 15:37:53 +0530 Subject: [PATCH 25/34] Update airflow/providers/pinecone/hooks/pinecone.py Co-authored-by: Wei Lee --- airflow/providers/pinecone/hooks/pinecone.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index 2cd9e61be3f02..6f747027df881 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -201,7 +201,7 @@ def get_pod_spec_obj( source_collection=source_collection, ) - def get_serverless_spec_obj(self, cloud, region: str | None = None) -> ServerlessSpec: + def get_serverless_spec_obj(self, cloud: str, region: str | None = None) -> ServerlessSpec: """ Get a ServerlessSpec object. From 4b7246e8f4d5574f65f8b33d2a623739ea5a8668 Mon Sep 17 00:00:00 2001 From: Kalyan Date: Thu, 18 Apr 2024 15:39:08 +0530 Subject: [PATCH 26/34] Update airflow/providers/pinecone/hooks/pinecone.py Co-authored-by: Wei Lee --- airflow/providers/pinecone/hooks/pinecone.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index 6f747027df881..8b95a814008aa 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -264,7 +264,6 @@ def configure_index( """ self.pc.configure_index(name=index_name, replicas=replicas, pod_type=pod_type) - @staticmethod def create_collection(self, collection_name: str, index_name: str) -> None: """ Create a new collection from a specified index. From a1b8e58856592fecb48e0def7c29d47c97fe20f9 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Thu, 18 Apr 2024 15:48:20 +0530 Subject: [PATCH 27/34] update spec methods to be keyword only --- airflow/providers/pinecone/hooks/pinecone.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index 8b95a814008aa..6aa6d1d00dbb4 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -172,6 +172,7 @@ def upsert( def get_pod_spec_obj( self, + *, replicas: int | None = None, shards: int | None = None, pods: int | None = None, @@ -201,7 +202,7 @@ def get_pod_spec_obj( source_collection=source_collection, ) - def get_serverless_spec_obj(self, cloud: str, region: str | None = None) -> ServerlessSpec: + def get_serverless_spec_obj(self, *, cloud: str, region: str | None = None) -> ServerlessSpec: """ Get a ServerlessSpec object. From 0ba015bc314f59200c915862461efffeb39fdc68 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 26 Apr 2024 16:29:14 +0530 Subject: [PATCH 28/34] rename pinecone connection object --- airflow/providers/pinecone/hooks/pinecone.py | 30 ++++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index 6aa6d1d00dbb4..a1fa7101886fe 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -110,7 +110,7 @@ def region(self) -> str: return region @cached_property - def pc(self) -> Pinecone: + def pinecone_client(self) -> Pinecone: """Pinecone object to interact with Pinecone.""" pinecone_host = self.conn.host extras = self.conn.extra_dejson @@ -125,14 +125,14 @@ def get_conn(self) -> Connection: def test_connection(self) -> tuple[bool, str]: try: - self.pc.list_indexes() + self.pinecone_client.list_indexes() return True, "Connection established" except Exception as e: return False, str(e) def list_indexes(self) -> Any: """Retrieve a list of all indexes in your project.""" - return self.pc.list_indexes() + return self.pinecone_client.list_indexes() def upsert( self, @@ -161,7 +161,7 @@ def upsert( :param show_progress: Whether to show a progress bar using tqdm. Applied only if batch_size is provided. """ - index = self.pc.Index(index_name) + index = self.pinecone_client.Index(index_name) return index.upsert( vectors=vectors, namespace=namespace, @@ -228,7 +228,7 @@ def create_index( :param metric: The metric to use. :param timeout: The timeout to use. """ - self.pc.create_index( + self.pinecone_client.create_index( name=index_name, dimension=dimension, spec=spec, @@ -242,7 +242,7 @@ def describe_index(self, index_name: str) -> Any: :param index_name: The name of the index to describe. """ - return self.pc.describe_index(name=index_name) + return self.pinecone_client.describe_index(name=index_name) def delete_index(self, index_name: str, timeout: int | None = None) -> None: """ @@ -251,7 +251,7 @@ def delete_index(self, index_name: str, timeout: int | None = None) -> None: :param index_name: the name of the index. :param timeout: Timeout for wait until index gets ready. """ - self.pc.delete_index(name=index_name, timeout=timeout) + self.pinecone_client.delete_index(name=index_name, timeout=timeout) def configure_index( self, index_name: str, replicas: int | None = None, pod_type: str | None = "" @@ -263,7 +263,7 @@ def configure_index( :param replicas: The new number of replicas. :param pod_type: the new pod_type for the index. """ - self.pc.configure_index(name=index_name, replicas=replicas, pod_type=pod_type) + self.pinecone_client.configure_index(name=index_name, replicas=replicas, pod_type=pod_type) def create_collection(self, collection_name: str, index_name: str) -> None: """ @@ -272,7 +272,7 @@ def create_collection(self, collection_name: str, index_name: str) -> None: :param collection_name: The name of the collection to create. :param index_name: The name of the source index. """ - self.pc.create_collection(name=collection_name, source=index_name) + self.pinecone_client.create_collection(name=collection_name, source=index_name) def delete_collection(self, collection_name: str) -> None: """ @@ -280,7 +280,7 @@ def delete_collection(self, collection_name: str) -> None: :param collection_name: The name of the collection to delete. """ - self.pc.delete_collection(collection_name) + self.pinecone_client.delete_collection(collection_name) def describe_collection(self, collection_name: str) -> Any: """ @@ -288,11 +288,11 @@ def describe_collection(self, collection_name: str) -> Any: :param collection_name: The name of the collection to describe. """ - return self.pc.describe_collection(collection_name) + return self.pinecone_client.describe_collection(collection_name) def list_collections(self) -> Any: """Retrieve a list of all collections in the current project.""" - return self.pc.list_collections() + return self.pinecone_client.list_collections() def query_vector( self, @@ -323,7 +323,7 @@ def query_vector( :param sparse_vector: sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]}, where the lists each have the same length. """ - index = self.pc.Index(index_name) + index = self.pinecone_client.Index(index_name) return index.query( vector=vector, id=query_id, @@ -361,7 +361,7 @@ def upsert_data_async( :param pool_threads: Number of threads for parallel upserting. If async_req is True, this must be provided. """ responses = [] - with self.pc.Index(index_name, pool_threads=pool_threads) as index: + with self.pinecone_client.Index(index_name, pool_threads=pool_threads) as index: if async_req and pool_threads: async_results = [index.upsert(vectors=chunk, async_req=True) for chunk in self._chunks(data)] responses = [async_result.get() for async_result in async_results] @@ -388,5 +388,5 @@ def describe_index_stats( :param stats_filter: If this parameter is present, the operation only returns statistics for vectors that satisfy the filter. See https://www.pinecone.io/docs/metadata-filtering/ """ - index = self.pc.Index(index_name) + index = self.pinecone_client.Index(index_name) return index.describe_index_stats(filter=stats_filter, **kwargs) From 7347fae40b11841a2f3a1d6954c531935005ea7d Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 26 Apr 2024 16:32:34 +0530 Subject: [PATCH 29/34] make conn a property --- airflow/providers/pinecone/hooks/pinecone.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index a1fa7101886fe..dcf33f4a44e69 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -82,7 +82,6 @@ def __init__( self.conn_id = conn_id self._environment = environment self._region = region - self.conn = self.get_conn() @property def api_key(self) -> str: @@ -120,7 +119,8 @@ def pinecone_client(self) -> Pinecone: os.environ["PINECONE_DEBUG_CURL"] = "true" return Pinecone(api_key=self.api_key, host=pinecone_host, project_id=pinecone_project_id) - def get_conn(self) -> Connection: + @cached_property + def conn(self) -> Connection: return self.get_connection(self.conn_id) def test_connection(self) -> tuple[bool, str]: From a4340b7aa33108f94efc973221bf05f56d275309 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 26 Apr 2024 16:35:02 +0530 Subject: [PATCH 30/34] remove comments from provider.yaml --- airflow/providers/pinecone/provider.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/airflow/providers/pinecone/provider.yaml b/airflow/providers/pinecone/provider.yaml index 1fc3cee31613d..0c6e3d9b4c586 100644 --- a/airflow/providers/pinecone/provider.yaml +++ b/airflow/providers/pinecone/provider.yaml @@ -42,9 +42,6 @@ integrations: dependencies: - apache-airflow>=2.7.0 - # Pinecone Python SDK v3.0.0 was released at 2024-01-16 and introduce some breaking changes. - # It's crucial to adhere to the v3.0.0 Migration Guide before the upper-bound limitation can be removed. - # https://canyon-quilt-082.notion.site/Pinecone-Python-SDK-v3-0-0-Migration-Guide-056d3897d7634bf7be399676a4757c7b - pinecone-client>=3.0.0 hooks: From 3d59d966fbaa6477c005db6f7ffcd7ee3e4569c5 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 26 Apr 2024 16:46:25 +0530 Subject: [PATCH 31/34] update docs --- airflow/providers/pinecone/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index 4565d065639a3..f7333d7fd1362 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -31,7 +31,7 @@ Breaking changes the migration guide from pinecone - * ``log_level`` field is removed from the Connections as it is not used by the provider anymore. -* ``PineconeHook.get_conn`` now returns ``Connection`` object instead of ``PineconeConnection`` object. Use ``pc`` property to access the Pinecone client. +* ``PineconeHook.get_conn`` is removed in favour of ``conn`` property which returns the Connection object. Use ``pinecone_client`` property to access the Pinecone client. * Following ``PineconeHook`` methods are converted from static methods to instance methods. Hence, Initialization is required to use these now: + ``PineconeHook.list_indexes`` From fe42c1cb03a81e3b4557486de5ee96b6e32a2296 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 27 Apr 2024 06:07:15 +0530 Subject: [PATCH 32/34] update docstring of create_index to talk about spec object creation --- airflow/providers/pinecone/hooks/pinecone.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index dcf33f4a44e69..2e211f301ac7e 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -225,6 +225,7 @@ def create_index( :param index_name: The name of the index. :param dimension: The dimension of the vectors to be indexed. :param spec: Pass a `ServerlessSpec` object to create a serverless index or a `PodSpec` object to create a pod index. + ``get_serverless_spec_obj`` and ``get_pod_spec_obj`` can be used to create the Spec objects. :param metric: The metric to use. :param timeout: The timeout to use. """ From fb18c8bd1f431938a3ba5c13897c4c45eb2061be Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 27 Apr 2024 06:21:26 +0530 Subject: [PATCH 33/34] fix spelling --- airflow/providers/pinecone/CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/CHANGELOG.rst b/airflow/providers/pinecone/CHANGELOG.rst index f7333d7fd1362..a1482f953467a 100644 --- a/airflow/providers/pinecone/CHANGELOG.rst +++ b/airflow/providers/pinecone/CHANGELOG.rst @@ -31,7 +31,7 @@ Breaking changes the migration guide from pinecone - * ``log_level`` field is removed from the Connections as it is not used by the provider anymore. -* ``PineconeHook.get_conn`` is removed in favour of ``conn`` property which returns the Connection object. Use ``pinecone_client`` property to access the Pinecone client. +* ``PineconeHook.get_conn`` is removed in favor of ``conn`` property which returns the Connection object. Use ``pinecone_client`` property to access the Pinecone client. * Following ``PineconeHook`` methods are converted from static methods to instance methods. Hence, Initialization is required to use these now: + ``PineconeHook.list_indexes`` From a6ff7deade0eee689756b46c90c3bd149ee4cfd6 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 27 Apr 2024 06:23:38 +0530 Subject: [PATCH 34/34] fix docstring --- airflow/providers/pinecone/hooks/pinecone.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/pinecone/hooks/pinecone.py b/airflow/providers/pinecone/hooks/pinecone.py index 2e211f301ac7e..a04ae60ce8391 100644 --- a/airflow/providers/pinecone/hooks/pinecone.py +++ b/airflow/providers/pinecone/hooks/pinecone.py @@ -225,7 +225,7 @@ def create_index( :param index_name: The name of the index. :param dimension: The dimension of the vectors to be indexed. :param spec: Pass a `ServerlessSpec` object to create a serverless index or a `PodSpec` object to create a pod index. - ``get_serverless_spec_obj`` and ``get_pod_spec_obj`` can be used to create the Spec objects. + ``get_serverless_spec_obj`` and ``get_pod_spec_obj`` can be used to create the Spec objects. :param metric: The metric to use. :param timeout: The timeout to use. """