Skip to content

Commit c14e651

Browse files
authored
Merge pull request #614 from splitio/FME-12227-sdk-events-factory
update factory class for ready and timedout events
2 parents 00227ce + 8171606 commit c14e651

File tree

6 files changed

+211
-13
lines changed

6 files changed

+211
-13
lines changed

splitio/client/factory.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
2121
from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync
2222
from splitio.models.fallback_config import FallbackTreatmentCalculator
23+
from splitio.events.events_metadata import EventsMetadata, SdkEventType
24+
from splitio.models.notification import SdkInternalEventNotification
25+
from splitio.models.events import SdkInternalEvent
26+
2327
# Storage
2428
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
2529
InMemoryImpressionStorage, InMemoryEventStorage, InMemoryTelemetryStorage, LocalhostTelemetryStorage, \
@@ -166,6 +170,7 @@ def __init__( # pylint: disable=too-many-arguments
166170
storages,
167171
labels_enabled,
168172
recorder,
173+
internal_events_queue,
169174
sync_manager=None,
170175
sdk_ready_flag=None,
171176
telemetry_producer=None,
@@ -204,6 +209,7 @@ def __init__( # pylint: disable=too-many-arguments
204209
_LOGGER.debug("Running in threading mode")
205210
self._sdk_internal_ready_flag = sdk_ready_flag
206211
self._fallback_treatment_calculator = fallback_treatment_calculator
212+
self._internal_events_queue = internal_events_queue
207213
self._start_status_updater()
208214

209215
def _start_status_updater(self):
@@ -224,12 +230,15 @@ def _start_status_updater(self):
224230
ready_updater.start()
225231
else:
226232
self._status = Status.READY
227-
233+
self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None))
234+
228235
def _update_status_when_ready(self):
229236
"""Wait until the sdk is ready and update the status."""
230237
self._sdk_internal_ready_flag.wait()
231238
self._status = Status.READY
232239
self._sdk_ready_flag.set()
240+
self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None))
241+
233242
self._telemetry_init_producer.record_ready_time(get_current_epoch_time_ms() - self._ready_time)
234243
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
235244
self._telemetry_init_producer.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
@@ -270,6 +279,7 @@ def block_until_ready(self, timeout=None):
270279

271280
if not ready:
272281
self._telemetry_init_producer.record_bur_time_out()
282+
self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_TIMED_OUT, None))
273283
raise TimeoutException('SDK Initialization: time of %d exceeded' % timeout)
274284

275285
def destroy(self, destroyed_event=None):
@@ -548,11 +558,11 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
548558
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
549559
}
550560

551-
events_queue = queue.Queue()
561+
internal_events_queue = queue.Queue()
552562
storages = {
553-
'splits': InMemorySplitStorage(events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
554-
'segments': InMemorySegmentStorage(events_queue),
555-
'rule_based_segments': InMemoryRuleBasedSegmentStorage(events_queue),
563+
'splits': InMemorySplitStorage(internal_events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
564+
'segments': InMemorySegmentStorage(internal_events_queue),
565+
'rule_based_segments': InMemoryRuleBasedSegmentStorage(internal_events_queue),
556566
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize'], telemetry_runtime_producer),
557567
'events': InMemoryEventStorage(cfg['eventsQueueSize'], telemetry_runtime_producer),
558568
}
@@ -629,14 +639,14 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
629639
synchronizer._split_synchronizers._segment_sync.shutdown()
630640

631641
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
632-
recorder, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization,
642+
recorder, internal_events_queue, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization,
633643
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments']))
634644

635645
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
636646
initialization_thread.start()
637647

638648
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
639-
recorder, manager, sdk_ready_flag,
649+
recorder, internal_events_queue, manager, sdk_ready_flag,
640650
telemetry_producer, telemetry_init_producer,
641651
telemetry_submitter, fallback_treatment_calculator = FallbackTreatmentCalculator(cfg['fallbackTreatments']))
642652

@@ -826,12 +836,14 @@ def _build_redis_factory(api_key, cfg):
826836
initialization_thread.start()
827837

828838
telemetry_init_producer.record_config(cfg, {}, 0, 0)
829-
839+
internal_events_queue = queue.Queue()
840+
830841
split_factory = SplitFactory(
831842
api_key,
832843
storages,
833844
cfg['labelsEnabled'],
834845
recorder,
846+
internal_events_queue,
835847
manager,
836848
sdk_ready_flag=None,
837849
telemetry_producer=telemetry_producer,
@@ -992,12 +1004,14 @@ def _build_pluggable_factory(api_key, cfg):
9921004
initialization_thread.start()
9931005

9941006
telemetry_init_producer.record_config(cfg, {}, 0, 0)
1007+
internal_events_queue = queue.Queue()
9951008

9961009
split_factory = SplitFactory(
9971010
api_key,
9981011
storages,
9991012
cfg['labelsEnabled'],
10001013
recorder,
1014+
internal_events_queue,
10011015
manager,
10021016
sdk_ready_flag=None,
10031017
telemetry_producer=telemetry_producer,
@@ -1152,11 +1166,14 @@ def _build_localhost_factory(cfg):
11521166
telemetry_evaluation_producer,
11531167
telemetry_runtime_producer
11541168
)
1169+
internal_events_queue = queue.Queue()
1170+
11551171
return SplitFactory(
11561172
'localhost',
11571173
storages,
11581174
False,
11591175
recorder,
1176+
internal_events_queue,
11601177
manager,
11611178
ready_event,
11621179
telemetry_producer=telemetry_producer,

tests/client/test_client.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def synchronize_config(*_):
6666
'events': event_storage},
6767
mocker.Mock(),
6868
recorder,
69+
events_queue,
6970
mocker.Mock(),
7071
mocker.Mock(),
7172
telemetry_producer,
@@ -136,6 +137,7 @@ def test_get_treatment_with_config(self, mocker):
136137
'events': event_storage},
137138
mocker.Mock(),
138139
recorder,
140+
events_queue,
139141
mocker.Mock(),
140142
mocker.Mock(),
141143
telemetry_producer,
@@ -215,6 +217,7 @@ def test_get_treatments(self, mocker):
215217
'events': event_storage},
216218
mocker.Mock(),
217219
recorder,
220+
events_queue,
218221
mocker.Mock(),
219222
mocker.Mock(),
220223
telemetry_producer,
@@ -296,6 +299,7 @@ def test_get_treatments_by_flag_set(self, mocker):
296299
'events': event_storage},
297300
mocker.Mock(),
298301
recorder,
302+
events_queue,
299303
mocker.Mock(),
300304
mocker.Mock(),
301305
telemetry_producer,
@@ -376,6 +380,7 @@ def test_get_treatments_by_flag_sets(self, mocker):
376380
'events': event_storage},
377381
mocker.Mock(),
378382
recorder,
383+
events_queue,
379384
mocker.Mock(),
380385
mocker.Mock(),
381386
telemetry_producer,
@@ -455,6 +460,7 @@ def test_get_treatments_with_config(self, mocker):
455460
'events': event_storage},
456461
mocker.Mock(),
457462
recorder,
463+
events_queue,
458464
mocker.Mock(),
459465
mocker.Mock(),
460466
telemetry_producer,
@@ -531,6 +537,7 @@ def test_get_treatments_with_config_by_flag_set(self, mocker):
531537
destroyed_property = mocker.PropertyMock()
532538
destroyed_property.return_value = False
533539
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
540+
events_queue = queue.Queue()
534541
factory = SplitFactory(mocker.Mock(),
535542
{'splits': split_storage,
536543
'segments': segment_storage,
@@ -539,6 +546,7 @@ def test_get_treatments_with_config_by_flag_set(self, mocker):
539546
'events': event_storage},
540547
mocker.Mock(),
541548
recorder,
549+
events_queue,
542550
mocker.Mock(),
543551
mocker.Mock(),
544552
telemetry_producer,
@@ -620,6 +628,7 @@ def test_get_treatments_with_config_by_flag_sets(self, mocker):
620628
'events': event_storage},
621629
mocker.Mock(),
622630
recorder,
631+
events_queue,
623632
mocker.Mock(),
624633
mocker.Mock(),
625634
telemetry_producer,
@@ -708,6 +717,7 @@ def synchronize_config(*_):
708717
'events': event_storage},
709718
mocker.Mock(),
710719
recorder,
720+
events_queue,
711721
mocker.Mock(),
712722
mocker.Mock(),
713723
telemetry_producer,
@@ -773,6 +783,7 @@ def synchronize_config(*_):
773783
'events': event_storage},
774784
mocker.Mock(),
775785
recorder,
786+
events_queue,
776787
mocker.Mock(),
777788
mocker.Mock(),
778789
telemetry_producer,
@@ -838,6 +849,7 @@ def synchronize_config(*_):
838849
'events': event_storage},
839850
mocker.Mock(),
840851
recorder,
852+
events_queue,
841853
mocker.Mock(),
842854
mocker.Mock(),
843855
telemetry_producer,
@@ -874,6 +886,7 @@ def test_destroy(self, mocker):
874886
telemetry_storage = InMemoryTelemetryStorage()
875887
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
876888
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
889+
events_queue = queue.Queue()
877890
factory = SplitFactory(mocker.Mock(),
878891
{'splits': split_storage,
879892
'segments': segment_storage,
@@ -882,6 +895,7 @@ def test_destroy(self, mocker):
882895
'events': event_storage},
883896
mocker.Mock(),
884897
recorder,
898+
events_queue,
885899
mocker.Mock(),
886900
mocker.Mock(),
887901
telemetry_producer,
@@ -911,6 +925,7 @@ def test_track(self, mocker):
911925
telemetry_storage = InMemoryTelemetryStorage()
912926
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
913927
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
928+
events_queue = queue.Queue()
914929
factory = SplitFactory(mocker.Mock(),
915930
{'splits': split_storage,
916931
'segments': segment_storage,
@@ -919,6 +934,7 @@ def test_track(self, mocker):
919934
'events': event_storage},
920935
mocker.Mock(),
921936
recorder,
937+
events_queue,
922938
mocker.Mock(),
923939
mocker.Mock(),
924940
telemetry_producer,
@@ -961,6 +977,7 @@ def test_evaluations_before_running_post_fork(self, mocker):
961977

962978
impmanager = mocker.Mock(spec=ImpressionManager)
963979
recorder = StandardRecorder(impmanager, mocker.Mock(), impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
980+
events_queue = queue.Queue()
964981
factory = SplitFactory(mocker.Mock(),
965982
{'splits': split_storage,
966983
'segments': segment_storage,
@@ -969,6 +986,7 @@ def test_evaluations_before_running_post_fork(self, mocker):
969986
'events': mocker.Mock()},
970987
mocker.Mock(),
971988
recorder,
989+
events_queue,
972990
mocker.Mock(),
973991
mocker.Mock(),
974992
telemetry_producer,
@@ -1047,6 +1065,7 @@ def test_telemetry_not_ready(self, mocker):
10471065
'events': mocker.Mock()},
10481066
mocker.Mock(),
10491067
recorder,
1068+
events_queue,
10501069
mocker.Mock(),
10511070
mocker.Mock(),
10521071
telemetry_producer,
@@ -1092,6 +1111,7 @@ def test_telemetry_record_treatment_exception(self, mocker):
10921111
'events': event_storage},
10931112
mocker.Mock(),
10941113
recorder,
1114+
events_queue,
10951115
impmanager,
10961116
mocker.Mock(),
10971117
telemetry_producer,
@@ -1193,6 +1213,7 @@ def test_telemetry_method_latency(self, mocker):
11931213
'events': event_storage},
11941214
mocker.Mock(),
11951215
recorder,
1216+
events_queue,
11961217
impmanager,
11971218
mocker.Mock(),
11981219
telemetry_producer,
@@ -1255,6 +1276,7 @@ def test_telemetry_track_exception(self, mocker):
12551276
telemetry_storage = InMemoryTelemetryStorage()
12561277
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
12571278
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
1279+
events_queue = queue.Queue()
12581280
factory = SplitFactory(mocker.Mock(),
12591281
{'splits': split_storage,
12601282
'segments': segment_storage,
@@ -1263,6 +1285,7 @@ def test_telemetry_track_exception(self, mocker):
12631285
'events': event_storage},
12641286
mocker.Mock(),
12651287
recorder,
1288+
events_queue,
12661289
impmanager,
12671290
mocker.Mock(),
12681291
telemetry_producer,
@@ -1316,6 +1339,7 @@ def synchronize_config(*_):
13161339
'events': event_storage},
13171340
mocker.Mock(),
13181341
recorder,
1342+
events_queue,
13191343
mocker.Mock(),
13201344
mocker.Mock(),
13211345
telemetry_producer,
@@ -1412,6 +1436,7 @@ def test_fallback_treatment_eval_exception(self, mocker):
14121436
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
14131437
impmanager = ImpressionManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_producer.get_telemetry_runtime_producer())
14141438
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
1439+
internal_events_queue = queue.Queue()
14151440
factory = SplitFactory(mocker.Mock(),
14161441
{'splits': split_storage,
14171442
'segments': segment_storage,
@@ -1420,6 +1445,7 @@ def test_fallback_treatment_eval_exception(self, mocker):
14201445
'events': event_storage},
14211446
mocker.Mock(),
14221447
recorder,
1448+
internal_events_queue,
14231449
impmanager,
14241450
mocker.Mock(),
14251451
telemetry_producer,
@@ -1550,6 +1576,7 @@ def test_fallback_treatment_exception(self, mocker):
15501576
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
15511577
impmanager = ImpressionManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_producer.get_telemetry_runtime_producer())
15521578
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
1579+
internal_events_queue = queue.Queue()
15531580
factory = SplitFactory(mocker.Mock(),
15541581
{'splits': split_storage,
15551582
'segments': segment_storage,
@@ -1558,6 +1585,7 @@ def test_fallback_treatment_exception(self, mocker):
15581585
'events': event_storage},
15591586
mocker.Mock(),
15601587
recorder,
1588+
internal_events_queue,
15611589
impmanager,
15621590
mocker.Mock(),
15631591
telemetry_producer,
@@ -1618,6 +1646,7 @@ def test_fallback_treatment_not_ready_impressions(self, mocker):
16181646
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
16191647
impmanager = ImpressionManager(StrategyOptimizedMode(), StrategyNoneMode(), telemetry_producer.get_telemetry_runtime_producer())
16201648
recorder = StandardRecorder(impmanager, event_storage, impression_storage, telemetry_producer.get_telemetry_evaluation_producer(), telemetry_producer.get_telemetry_runtime_producer())
1649+
internal_events_queue = queue.Queue()
16211650
factory = SplitFactory(mocker.Mock(),
16221651
{'splits': split_storage,
16231652
'segments': segment_storage,
@@ -1626,6 +1655,7 @@ def test_fallback_treatment_not_ready_impressions(self, mocker):
16261655
'events': event_storage},
16271656
mocker.Mock(),
16281657
recorder,
1658+
internal_events_queue,
16291659
impmanager,
16301660
mocker.Mock(),
16311661
telemetry_producer,

0 commit comments

Comments
 (0)