From 85d66af1035d8427b1f57fc5c85c7411bc0204e8 Mon Sep 17 00:00:00 2001 From: Arun Pa Date: Tue, 26 Mar 2024 18:24:25 +0530 Subject: [PATCH 1/4] added arrow client using requests --- http/post_simple/python/client/README.md | 32 +++++++++++++++++ http/post_simple/python/client/client.py | 44 ++++++++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 http/post_simple/python/client/README.md create mode 100644 http/post_simple/python/client/client.py diff --git a/http/post_simple/python/client/README.md b/http/post_simple/python/client/README.md new file mode 100644 index 0000000..6c4ca61 --- /dev/null +++ b/http/post_simple/python/client/README.md @@ -0,0 +1,32 @@ + + +# HTTP Post Arrow Data: Simple Python Client Example using Requests + +This directory contains a minimal example of an HTTP client implemented in Python using the `requests` library. The client: +1. Sends an HTTP POST request to a server. +2. Receives an HTTP 200 response from the server, with the response body containing an Arrow IPC stream of record batches. +3. Adds the record batches to a list as they are received. + +To run this example, start one of the server examples in the parent directory, then: + +```sh +pip install pyarrow requests +python client.py +``` diff --git a/http/post_simple/python/client/client.py b/http/post_simple/python/client/client.py new file mode 100644 index 0000000..98ac9d5 --- /dev/null +++ b/http/post_simple/python/client/client.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import requests +import pyarrow as pa +import time + +start_time = time.time() + +params = {'n_records': 100000} +url = 'http://localhost:8008' +response = requests.post(url, data=params) +buffer = response.content + +batches = [] + +with pa.ipc.open_stream(buffer) as reader: + schema = reader.schema + try: + while True: + batches.append(reader.read_next_batch()) + except StopIteration: + pass + +end_time = time.time() +execution_time = end_time - start_time + +print(f"{len(buffer)} bytes received") +print(f"{len(batches)} record batches received") +print(f"{execution_time} seconds elapsed") From b31ce35244f696e5197b8bc9f988ebf6625b1335 Mon Sep 17 00:00:00 2001 From: Arun Pa Date: Tue, 26 Mar 2024 18:26:14 +0530 Subject: [PATCH 2/4] added http post server --- http/post_simple/python/server/README.md | 35 ++++++ http/post_simple/python/server/server.py | 133 +++++++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 http/post_simple/python/server/README.md create mode 100644 http/post_simple/python/server/server.py diff --git a/http/post_simple/python/server/README.md b/http/post_simple/python/server/README.md new file mode 100644 index 0000000..aee0b65 --- /dev/null +++ b/http/post_simple/python/server/README.md @@ -0,0 +1,35 @@ + + +# HTTP GET Arrow Data: Simple Python Server Example + +This directory contains a minimal example of an HTTP server implemented in Python. The server: +1. Creates a list of record batches and populates it with synthesized data. +2. Listens for HTTP POST requests from clients. +3. Upon receiving a request, sends an HTTP 200 response with the body containing an Arrow IPC stream of record batches. + +To run this example: + +```sh +pip install pyarrow +python server.py +``` + +> [!NOTE] +> This example uses Python's built-in [`http.server`](https://docs.python.org/3/library/http.server.html) module. This server does not implement chunked transfer encoding automatically like more sophisticated HTTP servers do, so this example implements it manually, with each chunk consisting of one Arrow record batch. Note that in servers that implement chunked transfer encoding automatically, each chunk will generally not correspond to one Arrow record batch. diff --git a/http/post_simple/python/server/server.py b/http/post_simple/python/server/server.py new file mode 100644 index 0000000..9cf74ca --- /dev/null +++ b/http/post_simple/python/server/server.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import urllib.parse +from random import randbytes +from http.server import BaseHTTPRequestHandler, HTTPServer +import io + + +def get_post_data_and_schema(total_records: int): + schema = pa.schema([('a', pa.int64()), ('b', pa.int64()), + ('c', pa.int64()), ('d', pa.int64())]) + length = 4096 + ncolumns = 4 + + arrays = [] + + for x in range(0, ncolumns): + buffer = pa.py_buffer(randbytes(length * 8)) + arrays.append( + pa.Int64Array.from_buffers(pa.int64(), + length, [None, buffer], + null_count=0)) + + batch = pa.record_batch(arrays, schema) + batches = [] + + records = 0 + while records < total_records: + if records + length > total_records: + last_length = total_records - records + batches.append(batch.slice(0, last_length)) + records += last_length + else: + batches.append(batch) + records += length + + return batches, schema + + +def make_reader(schema, batches): + return pa.RecordBatchReader.from_batches(schema, batches) + + +def generate_batches(schema, reader): + with io.BytesIO() as sink, pa.ipc.new_stream(sink, schema) as writer: + for batch in reader: + sink.seek(0) + sink.truncate(0) + writer.write_batch(batch) + yield sink.getvalue() + + sink.seek(0) + sink.truncate(0) + writer.close() + yield sink.getvalue() + + +class PostRequestServer(BaseHTTPRequestHandler): + + def do_POST(self): + data = self.rfile.read(int( + self.headers['Content-Length'])).decode('ascii') + params = urllib.parse.parse_qs(data) + + self.close_connection = True + + if 'n_records' not in params.keys(): + # send invalid response + self.send_response(400) + self.send_header('Content-type', 'text/html') + self.end_headers() + message = "Invalid request. Please provide 'n_records' parameter" + self.wfile.write(bytes(message, "utf8")) + + # the parse data returned by urllib.parse.parse_qs is a dictionary + # where each key is query variable name and the values are list of + # values for each name. Here, params['n_records'][0] wil be the + # first value in the list of parsed values of query data. + n_records = int(params['n_records'][0]) + + self.send_response(200) + self.send_header('Content-type', 'application/vnd.apache.arrow.stream') + + if self.request_version == 'HTTP/1.0': + self.protocol_version = 'HTTP/1.0' + chunked = False + else: + # If HTTP/1.1, use chunked encoding + chunked = True + self.send_header('Transfer-Encoding', 'chunked') + + self.end_headers() + + batches, schema = get_post_data_and_schema(n_records) + + for buffer in generate_batches(schema, make_reader(schema, batches)): + if chunked: + self.wfile.write('{:X}\r\n'.format( + len(buffer)).encode('utf-8')) + self.wfile.write(buffer) + if chunked: + self.wfile.write('\r\n'.encode('utf-8')) + self.wfile.flush() + + if chunked: + self.wfile.write('0\r\n\r\n'.encode('utf-8')) + self.wfile.flush() + + +server_address = ('localhost', 8008) +try: + httpd = HTTPServer(server_address, PostRequestServer) + print(f'Serving on {server_address[0]}:{server_address[1]}...') + httpd.serve_forever() +except KeyboardInterrupt: + print('Shutting down server') + httpd.socket.close() From d449d38907f869a854586ad9c4289b7f1e520743 Mon Sep 17 00:00:00 2001 From: Arun Pa Date: Tue, 26 Mar 2024 21:49:58 +0530 Subject: [PATCH 3/4] updated server to use fastapi --- http/post_simple/python/server/README.md | 5 +- http/post_simple/python/server/server.py | 74 +++++------------------- 2 files changed, 19 insertions(+), 60 deletions(-) diff --git a/http/post_simple/python/server/README.md b/http/post_simple/python/server/README.md index aee0b65..ad9eff7 100644 --- a/http/post_simple/python/server/README.md +++ b/http/post_simple/python/server/README.md @@ -27,8 +27,11 @@ This directory contains a minimal example of an HTTP server implemented in Pytho To run this example: ```sh +pip install fastapi +pip install "uvicorn[standard]" pip install pyarrow -python server.py +# start the server using uvicorn +uvicorn server:app --port 8008 ``` > [!NOTE] diff --git a/http/post_simple/python/server/server.py b/http/post_simple/python/server/server.py index 9cf74ca..fc27201 100644 --- a/http/post_simple/python/server/server.py +++ b/http/post_simple/python/server/server.py @@ -18,9 +18,12 @@ import pyarrow as pa import urllib.parse from random import randbytes -from http.server import BaseHTTPRequestHandler, HTTPServer import io +from fastapi import Request, FastAPI +from fastapi.responses import StreamingResponse, JSONResponse + +app = FastAPI() def get_post_data_and_schema(total_records: int): schema = pa.schema([('a', pa.int64()), ('b', pa.int64()), @@ -70,64 +73,17 @@ def generate_batches(schema, reader): writer.close() yield sink.getvalue() +@app.post("/") +async def main(request: Request): + request_body = await request.body() + params = urllib.parse.parse_qs(request_body.decode('ascii')) + if 'n_records' not in params.keys(): + return JSONResponse(status_code=400, content='request failed. n_records value not found in request data.') -class PostRequestServer(BaseHTTPRequestHandler): - - def do_POST(self): - data = self.rfile.read(int( - self.headers['Content-Length'])).decode('ascii') - params = urllib.parse.parse_qs(data) - - self.close_connection = True - - if 'n_records' not in params.keys(): - # send invalid response - self.send_response(400) - self.send_header('Content-type', 'text/html') - self.end_headers() - message = "Invalid request. Please provide 'n_records' parameter" - self.wfile.write(bytes(message, "utf8")) - - # the parse data returned by urllib.parse.parse_qs is a dictionary - # where each key is query variable name and the values are list of - # values for each name. Here, params['n_records'][0] wil be the - # first value in the list of parsed values of query data. - n_records = int(params['n_records'][0]) - - self.send_response(200) - self.send_header('Content-type', 'application/vnd.apache.arrow.stream') - - if self.request_version == 'HTTP/1.0': - self.protocol_version = 'HTTP/1.0' - chunked = False - else: - # If HTTP/1.1, use chunked encoding - chunked = True - self.send_header('Transfer-Encoding', 'chunked') - - self.end_headers() - - batches, schema = get_post_data_and_schema(n_records) + n_records = int(params['n_records'][0]) + batches, schema = get_post_data_and_schema(n_records) + def iterbuffer(): for buffer in generate_batches(schema, make_reader(schema, batches)): - if chunked: - self.wfile.write('{:X}\r\n'.format( - len(buffer)).encode('utf-8')) - self.wfile.write(buffer) - if chunked: - self.wfile.write('\r\n'.encode('utf-8')) - self.wfile.flush() - - if chunked: - self.wfile.write('0\r\n\r\n'.encode('utf-8')) - self.wfile.flush() - - -server_address = ('localhost', 8008) -try: - httpd = HTTPServer(server_address, PostRequestServer) - print(f'Serving on {server_address[0]}:{server_address[1]}...') - httpd.serve_forever() -except KeyboardInterrupt: - print('Shutting down server') - httpd.socket.close() + yield buffer + return StreamingResponse(iterbuffer()) From bd70ca8b693808d81545905690c917e2e0c7ec5c Mon Sep 17 00:00:00 2001 From: Arun Pa Date: Tue, 26 Mar 2024 21:50:22 +0530 Subject: [PATCH 4/4] minor fixes --- http/post_simple/python/client/client.py | 33 +++++++++++++----------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/http/post_simple/python/client/client.py b/http/post_simple/python/client/client.py index 98ac9d5..9332ba2 100644 --- a/http/post_simple/python/client/client.py +++ b/http/post_simple/python/client/client.py @@ -23,22 +23,25 @@ params = {'n_records': 100000} url = 'http://localhost:8008' -response = requests.post(url, data=params) -buffer = response.content +response = requests.post(url, data={}) -batches = [] +if response.status_code >= 400: + print (response.json()) +elif response.status_code == 200: + buffer = response.content + batches = [] -with pa.ipc.open_stream(buffer) as reader: - schema = reader.schema - try: - while True: - batches.append(reader.read_next_batch()) - except StopIteration: - pass + with pa.ipc.open_stream(buffer) as reader: + schema = reader.schema + try: + while True: + batches.append(reader.read_next_batch()) + except StopIteration: + pass -end_time = time.time() -execution_time = end_time - start_time + end_time = time.time() + execution_time = end_time - start_time -print(f"{len(buffer)} bytes received") -print(f"{len(batches)} record batches received") -print(f"{execution_time} seconds elapsed") + print(f"{len(buffer)} bytes received") + print(f"{len(batches)} record batches received") + print(f"{execution_time} seconds elapsed")