From 15d89b67a8a1f237269758c4b1462146af895af1 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Mon, 15 Dec 2025 20:59:23 +0530 Subject: [PATCH 1/8] chore: Update Langgraph dependency to v1 --- src/langchain_google_cloud_sql_pg/async_checkpoint.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/langchain_google_cloud_sql_pg/async_checkpoint.py b/src/langchain_google_cloud_sql_pg/async_checkpoint.py index fc875991..c410287d 100644 --- a/src/langchain_google_cloud_sql_pg/async_checkpoint.py +++ b/src/langchain_google_cloud_sql_pg/async_checkpoint.py @@ -276,7 +276,7 @@ async def aput( async with self.pool.connect() as conn: type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) - serialized_metadata = self.jsonplus_serde.dumps(metadata) + _, serialized_metadata = self.jsonplus_serde.dumps_typed(metadata) await conn.execute( text(query), { @@ -409,7 +409,7 @@ async def alist( (value["type"], value["checkpoint"]) ), metadata=( - self.jsonplus_serde.loads(value["metadata"]) # type: ignore + self.jsonplus_serde.loads_typed(("json", value["metadata"])) # type: ignore if value["metadata"] is not None else {} ), @@ -494,7 +494,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: }, checkpoint=self.serde.loads_typed((value["type"], value["checkpoint"])), metadata=( - self.jsonplus_serde.loads(value["metadata"]) # type: ignore + self.jsonplus_serde.loads_typed(("json", value["metadata"])) # type: ignore if value["metadata"] is not None else {} ), From e6dc37dc9fa2db6d854aafdf5997623f9f659f17 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Mon, 15 Dec 2025 21:00:00 +0530 Subject: [PATCH 2/8] Upgrade langgraph to version 1.0.4 Updated langgraph version to 1.0.4. --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index adf7896c..bacbe361 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ cloud-sql-python-connector[asyncpg]==1.18.5 numpy==2.3.3; python_version >= "3.11" numpy==2.2.6; python_version == "3.10" -langgraph==0.6.10 +langgraph==1.0.4 langchain-postgres==0.0.16 From b96ada35c0c4d7c8c8e4a1ac219a9be83fb9abc4 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Mon, 15 Dec 2025 21:00:16 +0530 Subject: [PATCH 3/8] Update langgraph dependency version to 1.0.4 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 24282b77..6fd918ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ test = [ "pytest==8.4.2", "pytest-cov==7.0.0", "langchain-tests==0.3.22", - "langgraph==0.6.10" + "langgraph==1.0.4" ] [build-system] From 0029b2915585a22d80df2c9e0f7f1cfa8484878e Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Mon, 15 Dec 2025 21:13:07 +0530 Subject: [PATCH 4/8] Switch to json for metadata handling Replaced jsonplus_serde with json for metadata serialization and deserialization. --- src/langchain_google_cloud_sql_pg/async_checkpoint.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/langchain_google_cloud_sql_pg/async_checkpoint.py b/src/langchain_google_cloud_sql_pg/async_checkpoint.py index c410287d..20d995be 100644 --- a/src/langchain_google_cloud_sql_pg/async_checkpoint.py +++ b/src/langchain_google_cloud_sql_pg/async_checkpoint.py @@ -276,7 +276,7 @@ async def aput( async with self.pool.connect() as conn: type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) - _, serialized_metadata = self.jsonplus_serde.dumps_typed(metadata) + serialized_metadata = json.dumps(metadata) await conn.execute( text(query), { @@ -409,7 +409,7 @@ async def alist( (value["type"], value["checkpoint"]) ), metadata=( - self.jsonplus_serde.loads_typed(("json", value["metadata"])) # type: ignore + json.loads(value["metadata"]) # type: ignore if value["metadata"] is not None else {} ), @@ -494,7 +494,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: }, checkpoint=self.serde.loads_typed((value["type"], value["checkpoint"])), metadata=( - self.jsonplus_serde.loads_typed(("json", value["metadata"])) # type: ignore + json.loads(value["metadata"]) # type: ignore if value["metadata"] is not None else {} ), From 188e0c005c02995ab96c5ec53afd228eabae0e31 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Mon, 15 Dec 2025 21:47:02 +0530 Subject: [PATCH 5/8] Update async_checkpoint.py --- src/langchain_google_cloud_sql_pg/async_checkpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/langchain_google_cloud_sql_pg/async_checkpoint.py b/src/langchain_google_cloud_sql_pg/async_checkpoint.py index 20d995be..fcc83f32 100644 --- a/src/langchain_google_cloud_sql_pg/async_checkpoint.py +++ b/src/langchain_google_cloud_sql_pg/async_checkpoint.py @@ -276,7 +276,7 @@ async def aput( async with self.pool.connect() as conn: type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) - serialized_metadata = json.dumps(metadata) + _, serialized_metadata = self.jsonplus_serde.dumps_typed(metadata) await conn.execute( text(query), { From 0249ecac1cd56b3f948d3e27de377042f476620f Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Mon, 15 Dec 2025 22:12:41 +0530 Subject: [PATCH 6/8] Refactor async functions to synchronous in create_embeddings.py --- .../create_embeddings.py | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/samples/langchain_on_vertexai/create_embeddings.py b/samples/langchain_on_vertexai/create_embeddings.py index 105a86df..2321624d 100644 --- a/samples/langchain_on_vertexai/create_embeddings.py +++ b/samples/langchain_on_vertexai/create_embeddings.py @@ -32,8 +32,8 @@ from langchain_google_cloud_sql_pg import PostgresEngine, PostgresVectorStore -async def create_databases(): - engine = await PostgresEngine.afrom_instance( +def create_databases(): + engine = PostgresEngine.from_instance( PROJECT_ID, REGION, INSTANCE, @@ -41,15 +41,18 @@ async def create_databases(): user=USER, password=PASSWORD, ) - async with engine._pool.connect() as conn: - await conn.execute(text("COMMIT")) - await conn.execute(text(f'DROP DATABASE IF EXISTS "{DATABASE}"')) - await conn.execute(text(f'CREATE DATABASE "{DATABASE}"')) - await engine.close() + + async def _create_logic(): + async with engine._pool.connect() as conn: + await conn.execute(text("COMMIT")) + await conn.execute(text(f'DROP DATABASE IF EXISTS "{DATABASE}"')) + await conn.execute(text(f'CREATE DATABASE "{DATABASE}"')) + + engine._run_as_sync(_create_logic()) -async def create_vectorstore(): - engine = await PostgresEngine.afrom_instance( +def create_vectorstore(): + engine = PostgresEngine.from_instance( PROJECT_ID, REGION, INSTANCE, @@ -58,11 +61,11 @@ async def create_vectorstore(): password=PASSWORD, ) - await engine.ainit_vectorstore_table( + engine.init_vectorstore_table( table_name=TABLE_NAME, vector_size=768, overwrite_existing=True ) - await engine.ainit_chat_history_table(table_name=CHAT_TABLE_NAME) + engine.init_chat_history_table(table_name=CHAT_TABLE_NAME) rm = resourcemanager_v3.ProjectsClient() res = rm.get_project( @@ -76,7 +79,7 @@ async def grant_select(engine): await conn.execute(text(f'GRANT SELECT ON {TABLE_NAME} TO "{IAM_USER}";')) await conn.commit() - await engine._run_as_async(grant_select(engine)) + engine._run_as_sync(grant_select(engine)) metadata = [ "show_id", @@ -91,21 +94,22 @@ async def grant_select(engine): loader = CSVLoader(file_path="./movies.csv", metadata_columns=metadata) docs = loader.load() - vector_store = await PostgresVectorStore.create( + vector_store = PostgresVectorStore.create_sync( engine, table_name=TABLE_NAME, embedding_service=VertexAIEmbeddings( - model_name="textembedding-gecko@latest", project=PROJECT_ID + model_name="text-embedding-004", project=PROJECT_ID ), ) ids = [str(uuid.uuid4()) for i in range(len(docs))] - await vector_store.aadd_documents(docs, ids=ids) + vector_store.add_documents(docs, ids=ids) -async def main(): - await create_databases() - await create_vectorstore() +def main(): + create_databases() + create_vectorstore() -asyncio.run(main()) +if __name__ == "__main__": + main() From 975cb879f1d2de15d97bae55541806bc19072da2 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Mon, 15 Dec 2025 22:13:38 +0530 Subject: [PATCH 7/8] Convert database and vector store functions to async Refactor create_databases and create_vectorstore functions to be asynchronous. Update database initialization and vector store creation to use async/await syntax. --- .../create_embeddings.py | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/samples/langchain_on_vertexai/create_embeddings.py b/samples/langchain_on_vertexai/create_embeddings.py index 2321624d..105a86df 100644 --- a/samples/langchain_on_vertexai/create_embeddings.py +++ b/samples/langchain_on_vertexai/create_embeddings.py @@ -32,8 +32,8 @@ from langchain_google_cloud_sql_pg import PostgresEngine, PostgresVectorStore -def create_databases(): - engine = PostgresEngine.from_instance( +async def create_databases(): + engine = await PostgresEngine.afrom_instance( PROJECT_ID, REGION, INSTANCE, @@ -41,18 +41,15 @@ def create_databases(): user=USER, password=PASSWORD, ) - - async def _create_logic(): - async with engine._pool.connect() as conn: - await conn.execute(text("COMMIT")) - await conn.execute(text(f'DROP DATABASE IF EXISTS "{DATABASE}"')) - await conn.execute(text(f'CREATE DATABASE "{DATABASE}"')) - - engine._run_as_sync(_create_logic()) + async with engine._pool.connect() as conn: + await conn.execute(text("COMMIT")) + await conn.execute(text(f'DROP DATABASE IF EXISTS "{DATABASE}"')) + await conn.execute(text(f'CREATE DATABASE "{DATABASE}"')) + await engine.close() -def create_vectorstore(): - engine = PostgresEngine.from_instance( +async def create_vectorstore(): + engine = await PostgresEngine.afrom_instance( PROJECT_ID, REGION, INSTANCE, @@ -61,11 +58,11 @@ def create_vectorstore(): password=PASSWORD, ) - engine.init_vectorstore_table( + await engine.ainit_vectorstore_table( table_name=TABLE_NAME, vector_size=768, overwrite_existing=True ) - engine.init_chat_history_table(table_name=CHAT_TABLE_NAME) + await engine.ainit_chat_history_table(table_name=CHAT_TABLE_NAME) rm = resourcemanager_v3.ProjectsClient() res = rm.get_project( @@ -79,7 +76,7 @@ async def grant_select(engine): await conn.execute(text(f'GRANT SELECT ON {TABLE_NAME} TO "{IAM_USER}";')) await conn.commit() - engine._run_as_sync(grant_select(engine)) + await engine._run_as_async(grant_select(engine)) metadata = [ "show_id", @@ -94,22 +91,21 @@ async def grant_select(engine): loader = CSVLoader(file_path="./movies.csv", metadata_columns=metadata) docs = loader.load() - vector_store = PostgresVectorStore.create_sync( + vector_store = await PostgresVectorStore.create( engine, table_name=TABLE_NAME, embedding_service=VertexAIEmbeddings( - model_name="text-embedding-004", project=PROJECT_ID + model_name="textembedding-gecko@latest", project=PROJECT_ID ), ) ids = [str(uuid.uuid4()) for i in range(len(docs))] - vector_store.add_documents(docs, ids=ids) + await vector_store.aadd_documents(docs, ids=ids) -def main(): - create_databases() - create_vectorstore() +async def main(): + await create_databases() + await create_vectorstore() -if __name__ == "__main__": - main() +asyncio.run(main()) From 5757a89069483506ae2a49d5f9135fd0023804a9 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Tue, 16 Dec 2025 23:57:54 +0530 Subject: [PATCH 8/8] Update async_checkpoint.py --- src/langchain_google_cloud_sql_pg/async_checkpoint.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/langchain_google_cloud_sql_pg/async_checkpoint.py b/src/langchain_google_cloud_sql_pg/async_checkpoint.py index fcc83f32..32eef521 100644 --- a/src/langchain_google_cloud_sql_pg/async_checkpoint.py +++ b/src/langchain_google_cloud_sql_pg/async_checkpoint.py @@ -276,7 +276,9 @@ async def aput( async with self.pool.connect() as conn: type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) - _, serialized_metadata = self.jsonplus_serde.dumps_typed(metadata) + serialized_metadata = json.dumps(metadata, ensure_ascii=False).encode( + "utf-8", "ignore" + ) await conn.execute( text(query), {