From 245a922d6e78a7f5c0fd9944ae128bb6c14edf78 Mon Sep 17 00:00:00 2001 From: quettabit <27509167+quettabit@users.noreply.github.com> Date: Wed, 8 Apr 2026 14:14:40 -0600 Subject: [PATCH] initial commit --- .github/workflows/ci.yml | 4 +- examples/docs/account_and_basins.py | 109 ++++++++++++++++++ examples/docs/configuration.py | 57 ++++++++++ examples/docs/metrics.py | 63 +++++++++++ examples/docs/overview.py | 20 ++++ examples/docs/streams.py | 168 ++++++++++++++++++++++++++++ pyproject.toml | 14 +-- src/s2_sdk/_compression.py | 2 - 8 files changed, 426 insertions(+), 11 deletions(-) create mode 100644 examples/docs/account_and_basins.py create mode 100644 examples/docs/configuration.py create mode 100644 examples/docs/metrics.py create mode 100644 examples/docs/overview.py create mode 100644 examples/docs/streams.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a8db075..ef33789 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,8 +27,8 @@ jobs: - name: Sync dependencies run: | uv sync --all-groups - - name: Static code check - run: uv run poe ci_checker + - name: Code quality check + run: uv run poe cq-check - name: Unit tests run: uv run pytest tests/ -v -m 'not (account or basin or stream or metrics)' - name: Check docs build diff --git a/examples/docs/account_and_basins.py b/examples/docs/account_and_basins.py new file mode 100644 index 0000000..bf86ec7 --- /dev/null +++ b/examples/docs/account_and_basins.py @@ -0,0 +1,109 @@ +"""Documentation examples for Account and Basins page. + +These snippets are extracted by the docs build script. + +Run with: python examples/docs/account_and_basins.py +Requires: S2_ACCESS_TOKEN, S2_BASIN environment variables + +Note: These examples create resources with hardcoded names. +They may fail on repeated runs if resources already exist. +""" + +import asyncio +import os +from datetime import datetime, timezone + +from s2_sdk import ( + S2, + AccessTokenScope, + OperationGroupPermissions, + Permission, + PrefixMatch, +) + +access_token = os.environ["S2_ACCESS_TOKEN"] +basin_name = os.environ["S2_BASIN"] + + +async def main(): + async with S2(access_token) as client: + # ANCHOR: basin-operations + # List basins + basins = await client.list_basins() + + # Create a basin + await client.create_basin("my-events") + + # Get configuration + config = await client.get_basin_config("my-events") + + # Delete + await client.delete_basin("my-events") + # ANCHOR_END: basin-operations + print(f"Basins: {len(basins.items)} found, config: {config}") + + basin = client.basin(basin_name) + + # ANCHOR: stream-operations + # List streams + streams = await basin.list_streams(prefix="user-") + + # Create a stream + await basin.create_stream( + "user-actions", + # config=StreamConfig(...) # optional + ) + + # Get configuration + config = await basin.get_stream_config("user-actions") + + # Delete + await basin.delete_stream("user-actions") + # ANCHOR_END: stream-operations + print(f"Streams: {len(streams.items)} found, config: {config}") + + # ANCHOR: access-token-basic + # List tokens (returns metadata, not the secret) + tokens = await client.list_access_tokens() + + # Issue a token scoped to streams under "users/1234/" + issued_token = await client.issue_access_token( + "user-1234-rw-token", + scope=AccessTokenScope( + basins=PrefixMatch(""), # all basins + streams=PrefixMatch("users/1234/"), + op_groups=OperationGroupPermissions( + stream=Permission.READ_WRITE, + ), + ), + expires_at=datetime(2027, 1, 1, tzinfo=timezone.utc), + ) + + # Revoke a token + await client.revoke_access_token("user-1234-rw-token") + # ANCHOR_END: access-token-basic + print(f"Tokens: {len(tokens.items)} found, issued: {bool(issued_token)}") + + # ANCHOR: pagination + # Iterate through all streams with automatic pagination + async for stream in basin.list_all_streams(): + print(stream.name) + # ANCHOR_END: pagination + + # ANCHOR: pagination-filtering + # List streams with a prefix filter + async for stream in basin.list_all_streams(prefix="events/"): + print(stream.name) + # ANCHOR_END: pagination-filtering + + # ANCHOR: pagination-deleted + # Include streams that are being deleted + async for stream in basin.list_all_streams(include_deleted=True): + print(stream.name, stream.deleted_at) + # ANCHOR_END: pagination-deleted + + print("Account and basins examples completed") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/docs/configuration.py b/examples/docs/configuration.py new file mode 100644 index 0000000..dfd4828 --- /dev/null +++ b/examples/docs/configuration.py @@ -0,0 +1,57 @@ +"""Documentation examples for Configuration page. + +These snippets are extracted by the docs build script. + +Run with: python examples/docs/configuration.py +Requires: S2_ACCESS_TOKEN environment variable +""" + +import asyncio +import os +from datetime import timedelta + +from s2_sdk import S2, Endpoints, Retry, Timeout + + +async def main(): + # ANCHOR: custom-endpoints + client = S2( + "local-token", + endpoints=Endpoints( + account="http://localhost:8080", + basin="http://localhost:8080", + ), + ) + # ANCHOR_END: custom-endpoints + await client.close() + + access_token = os.environ["S2_ACCESS_TOKEN"] + + # ANCHOR: retry-config + client = S2( + access_token, + retry=Retry( + max_attempts=5, + min_base_delay=timedelta(milliseconds=100), + max_base_delay=timedelta(seconds=2), + ), + ) + # ANCHOR_END: retry-config + await client.close() + + # ANCHOR: timeout-config + client = S2( + access_token, + timeout=Timeout( + connection=timedelta(seconds=5), + request=timedelta(seconds=10), + ), + ) + # ANCHOR_END: timeout-config + await client.close() + + print("Configuration examples loaded") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/docs/metrics.py b/examples/docs/metrics.py new file mode 100644 index 0000000..019e0a4 --- /dev/null +++ b/examples/docs/metrics.py @@ -0,0 +1,63 @@ +"""Documentation examples for Metrics page. + +These snippets are extracted by the docs build script. + +Run with: python examples/docs/metrics.py +Requires: S2_ACCESS_TOKEN environment variable +""" + +import asyncio +import os +import time + +from s2_sdk import ( + S2, + AccountMetricSet, + BasinMetricSet, + StreamMetricSet, + TimeseriesInterval, +) + +access_token = os.environ["S2_ACCESS_TOKEN"] + + +async def main(): + async with S2(access_token) as client: + # ANCHOR: metrics + now = int(time.time()) + thirty_days_ago = now - 30 * 24 * 3600 + six_hours_ago = now - 6 * 3600 + hour_ago = now - 3600 + + # Account-level: active basins over the last 30 days + account_metrics = await client.account_metrics( + set=AccountMetricSet.ACTIVE_BASINS, + start=thirty_days_ago, + end=now, + ) + + # Basin-level: storage usage with hourly resolution + basin_metrics = await client.basin_metrics( + "events", + set=BasinMetricSet.STORAGE, + start=six_hours_ago, + end=now, + interval=TimeseriesInterval.HOUR, + ) + + # Stream-level: storage for a specific stream + stream_metrics = await client.stream_metrics( + "events", + "user-actions", + set=StreamMetricSet.STORAGE, + start=hour_ago, + end=now, + interval=TimeseriesInterval.MINUTE, + ) + # ANCHOR_END: metrics + + print(account_metrics, basin_metrics, stream_metrics) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/docs/overview.py b/examples/docs/overview.py new file mode 100644 index 0000000..4820ca0 --- /dev/null +++ b/examples/docs/overview.py @@ -0,0 +1,20 @@ +"""Documentation examples for SDK Overview page. + +These snippets are extracted by the docs build script. + +Run with: python examples/docs/overview.py +Requires: S2_ACCESS_TOKEN environment variable +""" + +# ANCHOR: create-client +import os + +from s2_sdk import S2 + +client = S2(os.environ["S2_ACCESS_TOKEN"]) + +basin = client.basin("my-basin") +stream = basin.stream("my-stream") +# ANCHOR_END: create-client + +print("Client created successfully") diff --git a/examples/docs/streams.py b/examples/docs/streams.py new file mode 100644 index 0000000..b3e7e9d --- /dev/null +++ b/examples/docs/streams.py @@ -0,0 +1,168 @@ +"""Documentation examples for Streams page. + +These snippets are extracted by the docs build script. + +Run with: python examples/docs/streams.py +Requires: S2_ACCESS_TOKEN, S2_BASIN environment variables +""" + +import asyncio +import os +import time +from datetime import timedelta + +from s2_sdk import ( + S2, + AppendInput, + Batching, + ReadLimit, + Record, + SeqNum, + TailOffset, + Timestamp, +) + +access_token = os.environ["S2_ACCESS_TOKEN"] +basin_name = os.environ["S2_BASIN"] + + +async def main(): + async with S2(access_token) as client: + basin = client.basin(basin_name) + stream_name = f"docs-streams-{int(time.time())}" + + # Ensure stream exists + try: + await basin.create_stream(stream_name) + except Exception: + pass + + # ANCHOR: simple-append + stream = basin.stream(stream_name) + ack = await stream.append( + AppendInput( + records=[ + Record(body=b"first event"), + Record(body=b"second event"), + ] + ) + ) + + # ack tells us where the records landed + print(f"Wrote records {ack.start.seq_num} through {ack.end.seq_num - 1}") + # ANCHOR_END: simple-append + + # ANCHOR: simple-read + batch = await stream.read( + start=SeqNum(0), + limit=ReadLimit(count=100), + ) + + for record in batch.records: + print(f"[{record.seq_num}] {record.body}") + # ANCHOR_END: simple-read + + await append_session_example(stream) + await producer_example(stream) + await check_tail_example(stream) + + # Cleanup + await basin.delete_stream(stream_name) + + print("Streams examples completed") + + +async def append_session_example(stream): + # ANCHOR: append-session + async with stream.append_session() as session: + # Submit a batch — this enqueues it and returns a ticket + ticket = await session.submit( + AppendInput( + records=[ + Record(body=b"event-1"), + Record(body=b"event-2"), + ] + ) + ) + + # The ticket resolves when the batch is durable + ack = await ticket + print(f"Durable at seq_num {ack.start.seq_num}") + # ANCHOR_END: append-session + + +async def producer_example(stream): + # ANCHOR: producer + async with stream.producer( + batching=Batching(linger=timedelta(milliseconds=5)), + ) as producer: + # Submit individual records + ticket = await producer.submit(Record(body=b"my event")) + + # Get the exact sequence number for this record + ack = await ticket + print(f"Record durable at seq_num {ack.seq_num}") + # ANCHOR_END: producer + + +async def check_tail_example(stream): + # ANCHOR: check-tail + tail = await stream.check_tail() + print(f"Stream has {tail.seq_num} records") + # ANCHOR_END: check-tail + + +async def read_session_example(stream): + # ANCHOR: read-session + async for batch in stream.read_session(start=SeqNum(0)): + for record in batch.records: + print(f"[{record.seq_num}] {record.body}") + # ANCHOR_END: read-session + + +async def read_session_tail_offset(stream): + # ANCHOR: read-session-tail-offset + # Start reading from 10 records before the current tail + async for batch in stream.read_session(start=TailOffset(10)): + for record in batch.records: + print(f"[{record.seq_num}] {record.body}") + # ANCHOR_END: read-session-tail-offset + + +async def read_session_timestamp(stream): + # ANCHOR: read-session-timestamp + # Start reading from a specific timestamp + one_hour_ago_ms = int((time.time() - 3600) * 1000) + async for batch in stream.read_session(start=Timestamp(one_hour_ago_ms)): + for record in batch.records: + print(f"[{record.seq_num}] {record.body}") + # ANCHOR_END: read-session-timestamp + + +async def read_session_until(stream): + # ANCHOR: read-session-until + # Read records until a specific timestamp + one_hour_ago_ms = int((time.time() - 3600) * 1000) + async for batch in stream.read_session( + start=SeqNum(0), + until_timestamp=one_hour_ago_ms, + ): + for record in batch.records: + print(f"[{record.seq_num}] {record.body}") + # ANCHOR_END: read-session-until + + +async def read_session_wait(stream): + # ANCHOR: read-session-wait + # Read all available records, then wait up to 30 seconds for new ones + async for batch in stream.read_session( + start=SeqNum(0), + wait=30, + ): + for record in batch.records: + print(f"[{record.seq_num}] {record.body}") + # ANCHOR_END: read-session-wait + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 65af882..d043796 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,13 +61,13 @@ exclude = [ extend-select = ["I"] [tool.poe.tasks] -linter = "uv run ruff check --fix" -formatter = "uv run ruff format" -type_checker = "uv run mypy" -ci_linter = "uv run ruff check" -ci_formatter = "uv run ruff format --check" -checker = ["linter", "formatter", "type_checker"] -ci_checker = ["ci_linter", "ci_formatter", "type_checker"] +lint = "uv run ruff check" +lint-fix = "uv run ruff check --fix" +format = "uv run ruff format" +format-check = "uv run ruff format --check" +typecheck = "uv run mypy" +cq-check = ["lint", "format-check", "typecheck"] +cq-fix = ["lint-fix", "format", "typecheck"] docs_check = "uv run --group docs sphinx-build -W -b html docs/source docs/build/html" docs_clean = "rm -rf docs/build" docs_live.sequence = ["docs_clean", "docs_check", {cmd = "uv run --group docs sphinx-autobuild docs/source docs/build/html --watch src/s2_sdk"}] diff --git a/src/s2_sdk/_compression.py b/src/s2_sdk/_compression.py index c083e2a..a53be1e 100644 --- a/src/s2_sdk/_compression.py +++ b/src/s2_sdk/_compression.py @@ -1,5 +1,3 @@ -"""S2S message-level compression (zstd and gzip).""" - import gzip import zstandard