Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ cover
.pytest_cache
.env
tests/snapshots/temp/
tests/playground/snapshots/temp

# PyBuilder
target/
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,10 @@ It allows you to run the Client SDK in local mode (zero latency) and still have

```python
Client.schedule_snapshot_auto_update(
interval=60,
callback=lambda error, updated: print('Snapshot updated' if not error else error)
interval=60,
callback=lambda error, updated: (
print(f"Snapshot updated to version: {Client.snapshot_version()}") if updated else None
)
)
```

Expand Down
12 changes: 10 additions & 2 deletions switcher_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from switcher_client.lib.globals.global_context import Context, ContextOptions
from switcher_client.lib.globals.global_context import DEFAULT_ENVIRONMENT
from switcher_client.lib.snapshot_auto_updater import SnapshotAutoUpdater
from switcher_client.lib.snapshot_loader import load_domain, validate_snapshot
from switcher_client.lib.snapshot_loader import load_domain, validate_snapshot, save_snapshot
from switcher_client.lib.utils import get
from switcher_client.switcher import Switcher

Expand Down Expand Up @@ -109,13 +109,21 @@ def check_snapshot():
)

if snapshot is not None:
if Client.context.options.snapshot_location is not None:
save_snapshot(
snapshot=snapshot,
snapshot_location=get(Client.context.options.snapshot_location, ''),
environment=get(Client.context.environment, DEFAULT_ENVIRONMENT)
)

GlobalSnapshot.init(snapshot)
return True

return False

@staticmethod
def schedule_snapshot_auto_update(interval: Optional[int] = None, callback: Optional[Callable] = None):
def schedule_snapshot_auto_update(interval: Optional[int] = None,
callback: Optional[Callable[[Optional[Exception], bool], None]] = None):
""" Schedule Snapshot auto update """
callback = get(callback, lambda *_: None)

Expand Down
10 changes: 9 additions & 1 deletion switcher_client/lib/snapshot_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ def validate_snapshot(
graphql_response = json.loads(snapshot_str or '{}')
return Snapshot(graphql_response.get('data', '{}'))

return None
return None

def save_snapshot(snapshot: Snapshot, snapshot_location: str, environment: str):
""" Save snapshot to file """

os.makedirs(snapshot_location, exist_ok=True)
snapshot_file = f"{snapshot_location}/{environment}.json"
with open(snapshot_file, 'w') as file:
json.dump(snapshot.to_dict(), file, indent=4)
8 changes: 7 additions & 1 deletion switcher_client/lib/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self):
class Snapshot:
def __init__(self, json_data: dict):
data = json_data
self._original_data = json_data
self.data = SnapshotData()
self.data.domain = self._parse_domain(data.get('domain', {}))

Expand Down Expand Up @@ -112,4 +113,9 @@ def _parse_relay(self, relay_data: dict) -> Relay:
relay.type = relay_data.get('type')
relay.activated = relay_data.get('activated')

return relay
return relay

def to_dict(self) -> dict:
""" Convert Snapshot back to dictionary format for JSON serialization """

return {'data': self._original_data}
44 changes: 41 additions & 3 deletions tests/playground/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
import time

from util import monitor_run
from switcher_client.lib.globals.global_context import DEFAULT_ENVIRONMENT
from switcher_client.lib.globals.global_snapshot import LoadSnapshotOptions
from switcher_client import Client, ContextOptions

SWITCHER_KEY = 'CLIENT_PYTHON_FEATURE'
LOOP = True

def setup_context(options: ContextOptions = ContextOptions()):
def setup_context(options: ContextOptions = ContextOptions(), environment = DEFAULT_ENVIRONMENT):
Client.build_context(
domain='Switcher API',
url='https://api.switcherapi.com',
api_key='[API_KEY]',
component='switcher-client-python',
environment='default',
environment=environment,
options=options
)

Expand All @@ -27,10 +30,45 @@ def simple_api_call():
monitor_thread = threading.Thread(target=monitor_run, args=(switcher,), daemon=True)
monitor_thread.start()

def load_snapshot_from_remote():
""" Use case: Load snapshot from remote API """
global LOOP
LOOP = False

setup_context(ContextOptions(
local=True,
snapshot_location='snapshots/temp'
))

Client.load_snapshot(LoadSnapshotOptions(
fetch_remote=True
))

print(f"Snapshot version: {Client.snapshot_version()}")

def auto_update_snapshot():
""" Use case: Auto update snapshot """
setup_context(ContextOptions(
local=True,
snapshot_location='snapshots/temp',
snapshot_auto_update_interval=10
))

Client.load_snapshot(LoadSnapshotOptions(
fetch_remote=True
))

print(f"Initial snapshot version: {Client.snapshot_version()}")
Client.schedule_snapshot_auto_update(
callback=lambda _, updated: (
print(f"Snapshot updated to version: {Client.snapshot_version()}") if updated else None
)
)

try:
# Replace with use case
simple_api_call()
while True:
while LOOP:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping...")