Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions providers/common/ai/docs/hooks/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ Choosing a hook
- Direct LangChain access for tasks that compose ``Runnable``\\s, use the
LangChain agent surface, or need LangChain-native chat / embedding model
objects. Independent of the pydantic-ai-backed operators.
* - :class:`~airflow.providers.common.ai.hooks.llamaindex.LlamaIndexHook`
- Backs the LlamaIndex ``LlamaIndexEmbeddingOperator`` and
``LlamaIndexRetrievalOperator``.
Returns LlamaIndex-native ``BaseEmbedding`` / ``LLM`` objects (OpenAI
by default). For non-OpenAI vendors, pass a pre-built
``BaseEmbedding`` / ``LLM`` instance straight to the operator and
bypass the hook.

Hook guides
-----------
Expand Down
115 changes: 115 additions & 0 deletions providers/common/ai/docs/hooks/llamaindex.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

.. _howto/hook:llamaindex:

``LlamaIndexHook``
==================

Use :class:`~airflow.providers.common.ai.hooks.llamaindex.LlamaIndexHook` to
bridge an Airflow connection to `LlamaIndex <https://docs.llamaindex.ai/>`__
chat and embedding models. The hook reads credentials (API key, optional
base URL) from a connection of type ``llamaindex`` and returns native
LlamaIndex objects ready to pass to ``VectorStoreIndex(..., embed_model=...)``,
``load_index_from_storage(..., embed_model=...)``, or
``index.as_retriever(..., llm=...)``.

The hook deliberately does **not** mutate LlamaIndex's global ``Settings``
singleton. Operators pass the resolved model directly to LlamaIndex
constructors, so concurrent tasks in the same worker don't race on shared
state.

OpenAI by default, BYO for other vendors
----------------------------------------

LlamaIndex does not ship a universal ``init_chat_model`` /
``init_embedding_model`` equivalent (each vendor is a separate package
under ``llama-index-llms-*`` / ``llama-index-embeddings-*`` with its own
constructor kwargs). The hook therefore covers the OpenAI-compatible
surface that matches LlamaIndex's own ``resolve_embed_model("default")``
behaviour:

- ``hook.get_embedding_model()`` returns an ``OpenAIEmbedding`` configured
from the connection.
- ``hook.get_llm()`` returns an ``OpenAI`` LLM configured from the
connection.

For other vendors (Cohere, Bedrock, Vertex AI, HuggingFace, ...),
instantiate the LlamaIndex class directly in a ``@task`` and pass it to
the operator's ``embed_model=`` / ``llm=`` parameter -- both
:class:`~airflow.providers.common.ai.operators.llamaindex_embedding.LlamaIndexEmbeddingOperator`
and
:class:`~airflow.providers.common.ai.operators.llamaindex_retrieval.LlamaIndexRetrievalOperator`
accept a pre-built ``BaseEmbedding`` / ``LLM`` instance and bypass the
hook:

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_hook.py
:language: python
:start-after: [START howto_hook_llamaindex_byo_embed_model]
:end-before: [END howto_hook_llamaindex_byo_embed_model]

Install the per-vendor LlamaIndex integration package separately:
``pip install llama-index-embeddings-cohere``, ``...-bedrock``,
``...-huggingface``, ``llama-index-llms-anthropic``, etc.

Connection Configuration
------------------------

The hook reads credentials from the Airflow connection of type ``llamaindex``:

- **password** -- API key (passed as ``api_key`` to ``OpenAIEmbedding`` /
``OpenAI``).
- **host** -- Optional base URL (passed as ``api_base``; useful for custom
OpenAI-compatible endpoints, Ollama, vLLM).
- **extra** JSON --
``{"embed_model": "text-embedding-3-small", "llm_model": "gpt-4o"}`` --
default model identifiers stored on the connection.

Parameters
----------

.. list-table::
:header-rows: 1
:widths: 25 25 50

* - Parameter
- Default
- Description
* - ``llm_conn_id``
- ``llamaindex_default``
- Airflow connection ID for the LLM/embedding provider.
* - ``embed_conn_id``
- ``None`` (falls back to ``llm_conn_id``)
- Optional separate Airflow connection ID for the embedding provider.
* - ``embed_model``
- ``None`` (falls back to ``extra["embed_model"]``)
- Embedding model name, e.g. ``text-embedding-3-small``.
* - ``llm_model``
- ``None`` (falls back to ``extra["llm_model"]``)
- LLM model name, e.g. ``gpt-4o``. Required when calling ``get_llm()``.

Dependencies
------------

Install the ``llamaindex`` extra::

pip install apache-airflow-providers-common-ai[llamaindex]

That extra installs ``llama-index-core``, ``llama-index-embeddings-openai``,
and ``llama-index-llms-openai`` -- enough to back the hook's default
OpenAI return values. For other LlamaIndex vendor packages, install
their integration package separately.
6 changes: 3 additions & 3 deletions providers/common/ai/docs/operators/document_loader.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ No chunking
The operator parses files into documents; it does **not** split them into
fixed-size chunks. The right chunking strategy depends on the embedding
model and is intentionally left to a downstream text-splitter or embedding
operator (LlamaIndex's ``EmbeddingOperator``, LangChain's text splitters,
operator (LlamaIndex's ``LlamaIndexEmbeddingOperator``, LangChain's text splitters,
...).

Format coverage roadmap
Expand All @@ -172,7 +172,7 @@ Composing with downstream embedding operators
---------------------------------------------

The output format (``list[dict(text, metadata)]``) is designed to feed
directly into embedding operators. With LlamaIndex's ``EmbeddingOperator``:
directly into embedding operators. With LlamaIndex's ``LlamaIndexEmbeddingOperator``:

.. code-block:: python

Expand All @@ -181,7 +181,7 @@ directly into embedding operators. With LlamaIndex's ``EmbeddingOperator``:
source_path="/data/docs/*.pdf",
)

embed = EmbeddingOperator(
embed = LlamaIndexEmbeddingOperator(
task_id="embed",
documents="{{ ti.xcom_pull(task_ids='load') }}",
llm_conn_id="openai_default",
Expand Down
6 changes: 6 additions & 0 deletions providers/common/ai/docs/operators/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ to pick the one that fits your use case:
* - Parse files (PDF, DOCX, CSV, etc.) into document dicts for embedding
- :class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator`
- *(no decorator)*
* - Chunk documents and produce embedding vectors
- :class:`~airflow.providers.common.ai.operators.llamaindex_embedding.LlamaIndexEmbeddingOperator`
- *(no decorator)*
* - Retrieve relevant chunks from a vector index
- :class:`~airflow.providers.common.ai.operators.llamaindex_retrieval.LlamaIndexRetrievalOperator`
- *(no decorator)*

**LLMOperator / @task.llm** — stateless, single-turn calls. Use this for classification,
summarization, extraction, or any prompt that produces one response. Supports structured output
Expand Down
119 changes: 119 additions & 0 deletions providers/common/ai/docs/operators/llamaindex_embedding.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

.. _howto/operator:llamaindex_embedding:

LlamaIndex ``LlamaIndexEmbeddingOperator``
==========================================

Chunk a ``list[dict]`` of documents and produce embedding vectors using
LlamaIndex. Designed to feed the output of
:class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator`
into vector storage (pgvector, Pinecone, Weaviate, ...).

The operator passes the embedding model **directly** to
``VectorStoreIndex(..., embed_model=...)`` -- it does not mutate
LlamaIndex's global ``Settings`` singleton, so concurrent tasks in the same
worker process don't race on shared model state.

Basic usage
-----------

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_hook.py
:language: python
:start-after: [START howto_hook_llamaindex_embed]
:end-before: [END howto_hook_llamaindex_embed]

``documents`` is templated, so ``loader.output`` (XCom direct) is resolved
to a native ``list[dict]`` before ``execute`` runs.

Bring-your-own embedding model
------------------------------

LlamaIndex doesn't ship a universal embedding-model initializer, so the
operator's ``embed_model`` parameter accepts either:

* a string model name (e.g. ``"text-embedding-3-small"``) -- the operator
constructs an ``OpenAIEmbedding`` via
:class:`~airflow.providers.common.ai.hooks.llamaindex.LlamaIndexHook`
using ``llm_conn_id`` / ``embed_conn_id``, or
* a pre-built ``BaseEmbedding`` instance -- bypass the hook entirely. Use
this for Cohere, Bedrock, Vertex, HuggingFace, etc.:

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_hook.py
:language: python
:start-after: [START howto_hook_llamaindex_byo_embed_model]
:end-before: [END howto_hook_llamaindex_byo_embed_model]

Persisting to cloud storage
---------------------------

``persist_dir`` accepts local paths and storage URIs (``s3://``, ``gs://``,
``azure://``, ``file://``) resolved via
:class:`~airflow.sdk.ObjectStoragePath`. Pass ``persist_conn_id`` to
point at the Airflow connection that holds the cloud credentials:

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_hook.py
:language: python
:start-after: [START howto_hook_llamaindex_cloud_persist]
:end-before: [END howto_hook_llamaindex_cloud_persist]

Parameters
----------

.. list-table::
:header-rows: 1
:widths: 25 75

* - Parameter
- Description
* - ``documents``
- ``list[dict]`` with ``text`` / ``metadata`` keys. Templated, so
binding ``loader.output`` resolves to the native list before
execute.
* - ``embed_model``
- String model name OR pre-built ``BaseEmbedding`` instance.
* - ``llm_conn_id``
- Airflow connection ID used when ``embed_model`` is a string. Falls
back to ``LlamaIndexHook.default_conn_name`` (``llamaindex_default``)
when ``None``.
* - ``embed_conn_id``
- Optional separate connection ID for the embedding provider. Falls
back to ``llm_conn_id`` when ``None``.
* - ``chunk_size``
- Sentence-splitter chunk size (default 512).
* - ``chunk_overlap``
- Overlap between chunks (default 50).
* - ``persist_dir``
- Local path or storage URI to persist the LlamaIndex index.
* - ``persist_conn_id``
- Cloud credentials connection ID for ``persist_dir`` URIs.

Output
------

Returns a dict with::

{
"document_count": int,
"chunk_count": int,
"persist_dir": str | None,
"chunks": [
{"text": str, "metadata": dict, "vector": list[float]},
...
],
}
109 changes: 109 additions & 0 deletions providers/common/ai/docs/operators/llamaindex_retrieval.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

.. _howto/operator:llamaindex_retrieval:

LlamaIndex ``LlamaIndexRetrievalOperator``
==========================================

Load a persisted LlamaIndex index and run similarity search. Designed to
sit between
:class:`~airflow.providers.common.ai.operators.llamaindex_embedding.LlamaIndexEmbeddingOperator`
(which builds the index) and
:class:`~airflow.providers.common.ai.operators.llm.LLMOperator` (which
synthesises an answer from the retrieved chunks).

Passes the embedding model **directly** to
``load_index_from_storage(..., embed_model=...)`` -- no LlamaIndex
``Settings`` mutation. The embedding model must match the one used when
the index was originally built.

Basic usage
-----------

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llamaindex_hook.py
:language: python
:start-after: [START howto_hook_llamaindex_retrieve]
:end-before: [END howto_hook_llamaindex_retrieve]

``query`` is templated, so DAG-run params, XCom, and Variables all flow
through cleanly.

Cloud-persisted indexes
-----------------------

``index_persist_dir`` accepts the same local-path-or-URI shape as
``LlamaIndexEmbeddingOperator.persist_dir``. Pass ``persist_conn_id`` to point at
the Airflow connection that holds cloud credentials. The operator raises
``FileNotFoundError`` with a clear "did you run LlamaIndexEmbeddingOperator first?"
message when the path is missing.

Bring-your-own embedding model
------------------------------

Same shape as ``LlamaIndexEmbeddingOperator``: ``embed_model`` accepts either a
string model name (OpenAI via the hook) or a pre-built ``BaseEmbedding``
instance for non-OpenAI vendors. See the BYO example in
:doc:`llamaindex_embedding`.

Parameters
----------

.. list-table::
:header-rows: 1
:widths: 25 75

* - Parameter
- Description
* - ``query``
- The query string. Templated.
* - ``index_persist_dir``
- Local path or storage URI pointing at the persisted index.
Templated.
* - ``persist_conn_id``
- Cloud credentials connection ID for ``index_persist_dir`` URIs.
Templated.
* - ``embed_model``
- String model name OR pre-built ``BaseEmbedding`` instance. Must
match the model used when the index was built. Templated.
* - ``llm_conn_id``
- Airflow connection ID used when ``embed_model`` is a string. Falls
back to ``LlamaIndexHook.default_conn_name`` (``llamaindex_default``)
when ``None``.
* - ``embed_conn_id``
- Optional separate connection ID for the embedding provider. Falls
back to ``llm_conn_id`` when ``None``.
* - ``top_k``
- Number of top similarity results to return (default 5).

Output
------

Returns a dict with::

{
"query": str,
"chunks": [
{
"text": str,
"score": float,
"metadata": dict,
"node_id": str,
},
...
],
}
Loading
Loading