From 0d638a7724c4e97d566561b250175bd8cc7ee57a Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Fri, 13 Sep 2024 18:18:11 -0400 Subject: [PATCH 1/2] Add FastAPI + Uvicorn server Co-authored-by: Arun Pa --- .../python/server/fastapi_uvicorn/README.md | 34 ++++++++ .../python/server/fastapi_uvicorn/server.py | 86 +++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 http/get_simple/python/server/fastapi_uvicorn/README.md create mode 100644 http/get_simple/python/server/fastapi_uvicorn/server.py diff --git a/http/get_simple/python/server/fastapi_uvicorn/README.md b/http/get_simple/python/server/fastapi_uvicorn/README.md new file mode 100644 index 0000000..9a8072a --- /dev/null +++ b/http/get_simple/python/server/fastapi_uvicorn/README.md @@ -0,0 +1,34 @@ + + +# HTTP GET Arrow Data: Simple Python Server Example with FastAPI and Uvicorn + +This directory contains a minimal example of an HTTP server implemented in Python using the [FastAPI](https://fastapi.tiangolo.com) framework and the [Uvicorn](https://www.uvicorn.org) web server. This example: +1. Creates a list of record batches and populates it with synthesized data. +2. Listens for HTTP GET requests from clients. +3. Upon receiving a request, sends an HTTP 200 response with the body containing an Arrow IPC stream of record batches. + +To run this example: + +```sh +pip install fastapi +pip install "uvicorn[standard]" +pip install pyarrow +uvicorn server:app --port 8008 +``` diff --git a/http/get_simple/python/server/fastapi_uvicorn/server.py b/http/get_simple/python/server/fastapi_uvicorn/server.py new file mode 100644 index 0000000..2e4ba1b --- /dev/null +++ b/http/get_simple/python/server/fastapi_uvicorn/server.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +from random import randbytes +import io +from fastapi import FastAPI +from fastapi.responses import StreamingResponse + +schema = pa.schema([ + ("a", pa.int64()), + ("b", pa.int64()), + ("c", pa.int64()), + ("d", pa.int64()) +]) + + +def GetPutData(): + total_records = 100000000 + length = 4096 + ncolumns = 4 + + arrays = [] + + for x in range(0, ncolumns): + buffer = pa.py_buffer(randbytes(length * 8)) + arrays.append(pa.Int64Array.from_buffers( + pa.int64(), length, [None, buffer], null_count=0)) + + batch = pa.record_batch(arrays, schema) + batches = [] + + records = 0 + while records < total_records: + if records + length > total_records: + last_length = total_records - records + batches.append(batch.slice(0, last_length)) + records += last_length + else: + batches.append(batch) + records += length + + return batches + + +def generate_bytes(schema, batches): + with pa.RecordBatchReader.from_batches(schema, batches) as source, \ + io.BytesIO() as sink, \ + pa.ipc.new_stream(sink, schema) as writer: + for batch in source: + sink.seek(0) + writer.write_batch(batch) + sink.truncate() + yield sink.getvalue() + + sink.seek(0) + writer.close() + sink.truncate() + yield sink.getvalue() + + +batches = GetPutData() + +app = FastAPI() + + +@app.get("/") +def main(): + return StreamingResponse( + generate_bytes(schema, batches), + media_type="application/vnd.apache.arrow.stream" + ) From dfedc55da9ae24be4599e5d04d274c5df12a6c7f Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 17 Sep 2024 16:53:15 -0400 Subject: [PATCH 2/2] Use getbuffer() not getvalue() --- .../python/server/fastapi_uvicorn/README.md | 11 +++++++++++ .../python/server/fastapi_uvicorn/server.py | 6 ++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/http/get_simple/python/server/fastapi_uvicorn/README.md b/http/get_simple/python/server/fastapi_uvicorn/README.md index 9a8072a..5b5a446 100644 --- a/http/get_simple/python/server/fastapi_uvicorn/README.md +++ b/http/get_simple/python/server/fastapi_uvicorn/README.md @@ -32,3 +32,14 @@ pip install "uvicorn[standard]" pip install pyarrow uvicorn server:app --port 8008 ``` + +> [!NOTE] +> This example requires Starlette 0.38.0 or newer, which added support for `memoryview` in `StreamingResponse`. If using an older version of Starlette, change both instances of: +> ```py +> with sink.getbuffer() as buffer: +> yield buffer +> ``` +> to: +> ```py +> yield sink.getvalue() +> ``` diff --git a/http/get_simple/python/server/fastapi_uvicorn/server.py b/http/get_simple/python/server/fastapi_uvicorn/server.py index 2e4ba1b..f97b6af 100644 --- a/http/get_simple/python/server/fastapi_uvicorn/server.py +++ b/http/get_simple/python/server/fastapi_uvicorn/server.py @@ -65,12 +65,14 @@ def generate_bytes(schema, batches): sink.seek(0) writer.write_batch(batch) sink.truncate() - yield sink.getvalue() + with sink.getbuffer() as buffer: + yield buffer sink.seek(0) writer.close() sink.truncate() - yield sink.getvalue() + with sink.getbuffer() as buffer: + yield buffer batches = GetPutData()