Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion splitio/engine/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx):
'impression': {
'label': label,
'change_number': _change_number
}
},
'track': feature.trackImpressions
}

def _treatment_for_flag(self, flag, key, bucketing, attributes, ctx):
Expand Down
26 changes: 20 additions & 6 deletions splitio/engine/impressions/impressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class ImpressionsMode(Enum):
class Manager(object): # pylint:disable=too-few-public-methods
"""Impression manager."""

def __init__(self, strategy, telemetry_runtime_producer):
def __init__(self, strategy, none_strategy, telemetry_runtime_producer):
"""
Construct a manger to track and forward impressions to the queue.

Expand All @@ -23,19 +23,33 @@ def __init__(self, strategy, telemetry_runtime_producer):
"""

self._strategy = strategy
self._none_strategy = none_strategy
self._telemetry_runtime_producer = telemetry_runtime_producer

def process_impressions(self, impressions):
def process_impressions(self, impressions_decorated):
"""
Process impressions.

Impressions are analyzed to see if they've been seen before and counted.

:param impressions: List of impression objects with attributes
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
:param impressions_decorated: List of impression objects with attributes
:type impressions_decorated: list[tuple[splitio.models.impression.ImpressionDecorated, dict]]

:return: processed and deduped impressions.
:rtype: tuple(list[tuple[splitio.models.impression.Impression, dict]], list(int))
"""
for_log, for_listener, for_counter, for_unique_keys_tracker = self._strategy.process_impressions(impressions)
return for_log, len(impressions) - len(for_log), for_listener, for_counter, for_unique_keys_tracker
for_listener_all = []
for_log_all = []
for_counter_all = []
for_unique_keys_tracker_all = []
for impression_decorated, att in impressions_decorated:
if not impression_decorated.track:
for_log, for_listener, for_counter, for_unique_keys_tracker = self._none_strategy.process_impressions([(impression_decorated.Impression, att)])
else:
for_log, for_listener, for_counter, for_unique_keys_tracker = self._strategy.process_impressions([(impression_decorated.Impression, att)])
for_listener_all.extend(for_listener)
for_log_all.extend(for_log)
for_counter_all.extend(for_counter)
for_unique_keys_tracker_all.extend(for_unique_keys_tracker)

return for_log_all, len(impressions_decorated) - len(for_log_all), for_listener_all, for_counter_all, for_unique_keys_tracker_all
8 changes: 8 additions & 0 deletions splitio/models/impressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
]
)

ImpressionDecorated = namedtuple(
'ImpressionDecorated',
[
'Impression',
'track'
]
)

# pre-python3.7 hack to make previous_time optional
Impression.__new__.__defaults__ = (None,)

Expand Down
22 changes: 17 additions & 5 deletions splitio/models/splits.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

SplitView = namedtuple(
'SplitView',
['name', 'traffic_type', 'killed', 'treatments', 'change_number', 'configs', 'default_treatment', 'sets']
['name', 'traffic_type', 'killed', 'treatments', 'change_number', 'configs', 'default_treatment', 'sets', 'trackImpressions']
)

_DEFAULT_CONDITIONS_TEMPLATE = {
Expand Down Expand Up @@ -73,7 +73,8 @@ def __init__( # pylint: disable=too-many-arguments
traffic_allocation=None,
traffic_allocation_seed=None,
configurations=None,
sets=None
sets=None,
trackImpressions=None
):
"""
Class constructor.
Expand All @@ -96,6 +97,8 @@ def __init__( # pylint: disable=too-many-arguments
:type traffic_allocation_seed: int
:pram sets: list of flag sets
:type sets: list
:pram trackImpressions: track impressions flag
:type trackImpressions: boolean
"""
self._name = name
self._seed = seed
Expand Down Expand Up @@ -125,6 +128,7 @@ def __init__( # pylint: disable=too-many-arguments

self._configurations = configurations
self._sets = set(sets) if sets is not None else set()
self._trackImpressions = trackImpressions if trackImpressions is not None else True

@property
def name(self):
Expand Down Expand Up @@ -186,6 +190,11 @@ def sets(self):
"""Return the flag sets of the split."""
return self._sets

@property
def trackImpressions(self):
"""Return trackImpressions of the split."""
return self._trackImpressions

def get_configurations_for(self, treatment):
"""Return the mapping of treatments to configurations."""
return self._configurations.get(treatment) if self._configurations else None
Expand Down Expand Up @@ -214,7 +223,8 @@ def to_json(self):
'algo': self.algo.value,
'conditions': [c.to_json() for c in self.conditions],
'configurations': self._configurations,
'sets': list(self._sets)
'sets': list(self._sets),
'trackImpressions': self._trackImpressions
}

def to_split_view(self):
Expand All @@ -232,7 +242,8 @@ def to_split_view(self):
self.change_number,
self._configurations if self._configurations is not None else {},
self._default_treatment,
list(self._sets) if self._sets is not None else []
list(self._sets) if self._sets is not None else [],
self._trackImpressions
)

def local_kill(self, default_treatment, change_number):
Expand Down Expand Up @@ -288,5 +299,6 @@ def from_raw(raw_split):
traffic_allocation=raw_split.get('trafficAllocation'),
traffic_allocation_seed=raw_split.get('trafficAllocationSeed'),
configurations=raw_split.get('configurations'),
sets=set(raw_split.get('sets')) if raw_split.get('sets') is not None else []
sets=set(raw_split.get('sets')) if raw_split.get('sets') is not None else [],
trackImpressions=raw_split.get('trackImpressions') if raw_split.get('trackImpressions') is not None else True
)
16 changes: 8 additions & 8 deletions splitio/recorder/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem
self._telemetry_evaluation_producer = telemetry_evaluation_producer
self._telemetry_runtime_producer = telemetry_runtime_producer

def record_treatment_stats(self, impressions, latency, operation, method_name):
def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
"""
Record stats for treatment evaluation.

Expand All @@ -165,7 +165,7 @@ def record_treatment_stats(self, impressions, latency, operation, method_name):
try:
if method_name is not None:
self._telemetry_evaluation_producer.record_latency(operation, latency)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
if deduped > 0:
self._telemetry_runtime_producer.record_impression_stats(telemetry.CounterConstants.IMPRESSIONS_DEDUPED, deduped)
self._impression_storage.put(impressions)
Expand Down Expand Up @@ -210,7 +210,7 @@ def __init__(self, impressions_manager, event_storage, impression_storage, telem
self._telemetry_evaluation_producer = telemetry_evaluation_producer
self._telemetry_runtime_producer = telemetry_runtime_producer

async def record_treatment_stats(self, impressions, latency, operation, method_name):
async def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
"""
Record stats for treatment evaluation.

Expand All @@ -224,7 +224,7 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n
try:
if method_name is not None:
await self._telemetry_evaluation_producer.record_latency(operation, latency)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
if deduped > 0:
await self._telemetry_runtime_producer.record_impression_stats(telemetry.CounterConstants.IMPRESSIONS_DEDUPED, deduped)

Expand Down Expand Up @@ -277,7 +277,7 @@ def __init__(self, pipe, impressions_manager, event_storage,
self._data_sampling = data_sampling
self._telemetry_redis_storage = telemetry_redis_storage

def record_treatment_stats(self, impressions, latency, operation, method_name):
def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
"""
Record stats for treatment evaluation.

Expand All @@ -294,7 +294,7 @@ def record_treatment_stats(self, impressions, latency, operation, method_name):
if self._data_sampling < rnumber:
return

impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
if impressions:
pipe = self._make_pipe()
self._impression_storage.add_impressions_to_pipe(impressions, pipe)
Expand Down Expand Up @@ -367,7 +367,7 @@ def __init__(self, pipe, impressions_manager, event_storage,
self._data_sampling = data_sampling
self._telemetry_redis_storage = telemetry_redis_storage

async def record_treatment_stats(self, impressions, latency, operation, method_name):
async def record_treatment_stats(self, impressions_decorated, latency, operation, method_name):
"""
Record stats for treatment evaluation.

Expand All @@ -384,7 +384,7 @@ async def record_treatment_stats(self, impressions, latency, operation, method_n
if self._data_sampling < rnumber:
return

impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions)
impressions, deduped, for_listener, for_counter, for_unique_keys_tracker = self._impressions_manager.process_impressions(impressions_decorated)
if impressions:
pipe = self._make_pipe()
self._impression_storage.add_impressions_to_pipe(impressions, pipe)
Expand Down
1 change: 1 addition & 0 deletions tests/engine/test_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def test_evaluate_treatment_ok(self, mocker):
assert result['impression']['change_number'] == 123
assert result['impression']['label'] == 'some_label'
assert mocked_split.get_configurations_for.mock_calls == [mocker.call('on')]
assert result['track'] == mocked_split.trackImpressions


def test_evaluate_treatment_ok_no_config(self, mocker):
Expand Down
Loading