Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion http/get_multipart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,44 @@

# HTTP GET Arrow Data: Multipart Examples

This directory contains examples of HTTP servers/clients that send/receive a multipart response (`Content-Type: multipart/mixed`) containing JSON data (`Content-Type: application/json`) and Arrow IPC stream data (`Content-Type: application/vnd.apache.arrow.stream`).
This directory contains examples of HTTP servers/clients that send/receive a multipart response (`Content-Type: multipart/mixed`) containing JSON data (`Content-Type: application/json`), an Arrow IPC stream data (`Content-Type: application/vnd.apache.arrow.stream`), and (optionally) plain text data (`Content-Type: text/plain`).

## Picking a Boundary

The `multipart/mixed` response format uses a boundary string to separate the
parts. This string **must not appear in the content of any part** according
to RFC 1341.[^1]

We **do not recommend** checking for the boundary string in the content of the
parts as that would prevent streaming them. Which would add up to the memory
usage of the server and waste CPU time.

### Recommended Algorithm

For every `multipart/mixed` response produced by the server:
1. Using a CSPRNG,[^2] generate a byte string of enough entropy to make the
probability of collision[^3] negligible (at least 160 bits = 20 bytes).[^4]
2. Encode the byte string in a way that is safe to use in HTTP headers. We
recommend using `base64url` encoding described in RFC 4648.[^5]

`base64url` encoding is a variant of `base64` encoding that uses `-` and `_`
instead of `+` and `/` respectively. It also omits padding characters (`=`).

This algorithm can be implemented in Python using the `secret.token_urlsafe()`
function.

If you generate a boundary string with generous 224 bits of entropy
(i.e. 28 bytes), the base64url encoding will produce a 38-character
string which is well below the limit defined by RFC 1341 (70 characters).

>>> import secrets
>>> boundary = secrets.token_urlsafe(28)
>>> len(boundary)
38


[^1]: [RFC 1341 - Section 7.2 The Multipart Content-Type](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html)
[^2]: [Cryptographically Secure Pseudo-Random Number Generator](https://en.wikipedia.org/wiki/Cryptographically_secure_pseudorandom_number_generator)
[^3]: [Birthday Problem](https://en.wikipedia.org/wiki/Birthday_problem)
[^4]: [Hash Collision Probabilities](https://preshing.com/20110504/hash-collision-probabilities/)
[^5]: [RFC 4648 - Section 5 Base 64 Encoding with URL and Filename Safe Alphabet](https://tools.ietf.org/html/rfc4648#section-5)
52 changes: 52 additions & 0 deletions http/get_multipart/python/client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# HTTP GET Arrow Data in multipart/mixed: Python Client Example

This directory contains an example of a Python HTTP client that receives a
`multipart/mixed` response from the server. The client:
1. Sends an HTTP GET request to a server.
2. Receives an HTTP 200 response from the server, with the response body
containing a `multipart/mixed` response.
3. Parses the `multipart/mixed` response using the `email` module.[^1]
4. Extracts the JSON part, parses it and prints a preview of the JSON data.
5. Extracts the Arrow stream part, reads the Arrow stream, and sums the
total number of records in the entire Arrow stream.
6. Extracts the plain text part and prints it as it is.

To run this example, first start one of the server examples in the parent
directory, then:

```sh
pip install pyarrow
python simple_client.py
```

> [!WARNING]
> This `simple_client.py` parses the multipart response using the multipart
> message parser from the Python `email` module. This module puts the entire
> message in memory and seems to spend a lot of time looking for part delimiter
> and encoding/decoding the parts.
>
> The overhead of `multipart/mixed` parsing is 85% on my machine and after the
> ~1GB Arrow Stream message is fully in memory, it takes only 0.06% of the total
> execution time to parse it.

[^1]: The `multipart/mixed` standard, used by HTTP, is derived from the MIME
standard used in email.
144 changes: 144 additions & 0 deletions http/get_multipart/python/client/simple_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from email import policy
import email
import json
import pyarrow as pa
import sys
import time
import urllib.request

JSON_FORMAT = "application/json"
TEXT_FORMAT = "text/plain"
ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream"

start_time = time.time()
response_parsing_time = 0 # time to parse the multipart message
arrow_stream_parsing_time = 0 # time to parse the Arrow stream


def parse_multipart_message(response, boundary, buffer_size=8192):
"""
Parse a multipart/mixed HTTP response into a list of Message objects.

Returns
-------
list of email.message.Message containing the parts of the multipart message.
"""
global response_parsing_time
buffer_size = max(buffer_size, 8192)
buffer = bytearray(buffer_size)

header = f'MIME-Version: 1.0\r\nContent-Type: multipart/mixed; boundary="{boundary}"\r\n\r\n'
feedparser = email.parser.BytesFeedParser(policy=policy.default)
feedparser.feed(header.encode("utf-8"))
while bytes_read := response.readinto(buffer):
start_time = time.time()
feedparser.feed(buffer[0:bytes_read])
response_parsing_time += time.time() - start_time
start_time = time.time()
message = feedparser.close()
response_parsing_time += time.time() - start_time
assert message.is_multipart()
return message.get_payload()


def process_json_part(message):
assert message.get_content_type() == JSON_FORMAT
payload = part.get_payload()
print(f"-- {len(payload)} bytes of JSON data:")
try:
PREVIW_SIZE = 5
data = json.loads(payload)
print("[")
for i in range(min(PREVIW_SIZE, len(data))):
print(f" {data[i]}")
if len(data) > PREVIW_SIZE:
print(f" ...+{len(data) - PREVIW_SIZE} entries...")
print("]")
except json.JSONDecodeError as e:
print(f"Error parsing JSON data: {e}\n", file=sys.stderr)
return data


def process_arrow_stream_message(message):
global arrow_stream_parsing_time
assert message.get_content_type() == ARROW_STREAM_FORMAT
payload = part.get_payload(decode=True)
print(f"-- {len(payload)} bytes of Arrow data:")
num_batches = 0
num_records = 0
start_time = time.time()
with pa.ipc.open_stream(payload) as reader:
schema = reader.schema
print(f"Schema: \n{schema}\n")
try:
while True:
batch = reader.read_next_batch()
num_batches += 1
num_records += batch.num_rows
except StopIteration:
pass
arrow_stream_parsing_time = time.time() - start_time
print(f"Parsed {num_records} records in {num_batches} batch(es)")


def process_text_part(message):
assert message.get_content_type() == TEXT_FORMAT
payload = part.get_payload()
print("-- Text Message:")
print(payload, end="")
print("-- End of Text Message --")


response = urllib.request.urlopen("http://localhost:8008?include_footnotes")

content_type = response.headers.get_content_type()
if content_type != "multipart/mixed":
raise ValueError(f"Expected multipart/mixed Content-Type, got {content_type}")
boundary = response.headers.get_boundary()
if boundary is None or len(boundary) == 0:
raise ValueError("No multipart boundary found in Content-Type header")

parts = parse_multipart_message(response, boundary, buffer_size=64 * 1024)
batches = None
for part in parts:
content_type = part.get_content_type()
if content_type == JSON_FORMAT:
process_json_part(part)
elif content_type == ARROW_STREAM_FORMAT:
batches = process_arrow_stream_message(part)
elif content_type == TEXT_FORMAT:
process_text_part(part)

end_time = time.time()
execution_time = end_time - start_time

rel_response_parsing_time = response_parsing_time / execution_time
rel_arrow_stream_parsing_time = arrow_stream_parsing_time / execution_time
print(f"{execution_time:.3f} seconds elapsed")
print(
f"""{response_parsing_time:.3f} seconds \
({rel_response_parsing_time * 100:.2f}%) \
seconds parsing multipart/mixed response"""
)
print(
f"""{arrow_stream_parsing_time:.3f} seconds \
({rel_arrow_stream_parsing_time * 100:.2f}%) \
seconds parsing Arrow stream"""
)
49 changes: 49 additions & 0 deletions http/get_multipart/python/server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# HTTP GET Arrow Data in multipart/mixed: Python Server Example

This directory contains an example of a Python HTTP server that sends a
`multipart/mixed` response to clients. The server:
1. Creates a list of record batches and populates it with synthesized data.
2. Listens for HTTP GET requests from clients.
3. Upon receiving a request, builds and sends an HTTP 200 `multipart/mixed`
response containing:
- A JSON part with metadata about the Arrow stream.
- An Arrow stream part with the Arrow IPC stream of record batches.
- A plain text part with a message containing timing information. This part
is optional (included if `?include_footnotes` is present in the URL).

To run this example:

```sh
pip install pyarrow
python server.py
```

> [!NOTE]
> This example uses Python's built-in
> [`http.server`](https://docs.python.org/3/library/http.server.html) module.
> This allows us to implement [chunked transfer
> encoding](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) manually.
> Other servers may implement chunked transfer encoding automatically at the
> cost of an undesirable new layer of buffering. Arrow IPC streams already offer
> a natural way of chunking large amounts of tabular data. It's not a general
> requirement, but in this example each chunk corresponds to one Arrow record
> batch.
Loading