From d2774acda968c4c6e7985250ab9a02f33730ba85 Mon Sep 17 00:00:00 2001 From: Francesco Faraone Date: Thu, 21 May 2026 16:36:25 +0200 Subject: [PATCH 1/9] MPT-21087 limit diworker concurrency per tenant; change 429 severity to warning in events-log. --- diworker/diworker/importers/base.py | 27 ++++++++++++++++++- diworker/diworker/main.py | 22 ++++++++++++--- docker_images/keeper_executor/events.py | 7 +++++ .../keeper_executor/executors/main_events.py | 1 + 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/diworker/diworker/importers/base.py b/diworker/diworker/importers/base.py index 6bdd4156b..ea231b14c 100644 --- a/diworker/diworker/importers/base.py +++ b/diworker/diworker/importers/base.py @@ -4,7 +4,9 @@ import requests import gzip import shutil +import threading import uuid +from contextlib import nullcontext from functools import cached_property from collections import defaultdict @@ -24,16 +26,30 @@ LOG = logging.getLogger(__name__) CHUNK_SIZE = 200 + +_THROTTLE_SEMAPHORES: dict[str, threading.Semaphore] = {} +_THROTTLE_LOCK = threading.Lock() + CSV_REWRITE_DAYS = 5 GZIP_ENDING = '.gz' REPORTS_PATH_PREFIX = 'reports' +def _get_throttle_semaphore(parent_id: str, + max_concurrent: int) -> threading.Semaphore: + with _THROTTLE_LOCK: + if parent_id not in _THROTTLE_SEMAPHORES: + _THROTTLE_SEMAPHORES[parent_id] = threading.Semaphore(max_concurrent) + return _THROTTLE_SEMAPHORES[parent_id] + + class BaseReportImporter: def __init__(self, cloud_account_id, rest_cl, config_cl, mongo_raw, mongo_resources, clickhouse_cl, import_file=None, - recalculate=False, detect_period_start=True): + recalculate=False, detect_period_start=True, + max_tenant_concurrent=1): self.cloud_acc_id = cloud_account_id + self.max_tenant_concurrent = max_tenant_concurrent self.rest_cl = rest_cl self.config_cl = config_cl self.mongo_raw = mongo_raw @@ -429,6 +445,15 @@ def data_import(self): self.generate_clean_records(regeneration=regeneration) def import_report(self): + parent_id = self.cloud_acc.get('parent_id') + throttle = ( + _get_throttle_semaphore(parent_id, self.max_tenant_concurrent) + if parent_id else nullcontext() + ) + with throttle: + self._run_import() + + def _run_import(self): LOG.info('Started import for %s', self.cloud_acc_id) self.prepare() try: diff --git a/diworker/diworker/main.py b/diworker/diworker/main.py index dcb017e8d..9bf990e53 100755 --- a/diworker/diworker/main.py +++ b/diworker/diworker/main.py @@ -40,6 +40,14 @@ ENVIRONMENT_CLOUD_TYPE = 'environment' HEARTBEAT_INTERVAL = 300 DEFAULT_MAX_WORKERS = 4 +DEFAULT_MAX_TENANT_WORKERS = 1 + + +def _is_rate_limit_exc(exc): + if getattr(exc, 'status_code', None) == 429: + return True + msg = str(exc).lower() + return '429' in msg or 'toomanyrequests' in msg or 'too many requests' in msg class DIWorker(ConsumerMixin): @@ -168,7 +176,9 @@ def report_import(self, task, config_cl, rest_cl, mongo_cl, clickhouse_cl): 'mongo_resources': mongo_cl.restapi['resources'], 'clickhouse_cl': clickhouse_cl, 'import_file': import_dict.get('import_file'), - 'recalculate': is_recalculation} + 'recalculate': is_recalculation, + 'max_tenant_concurrent': int(self.diworker_settings.get( + 'max_tenant_import_workers', DEFAULT_MAX_TENANT_WORKERS))} importer = None ca = None previous_attempt_ts = 0 @@ -220,11 +230,13 @@ def report_import(self, task, config_cl, rest_cl, mongo_cl, clickhouse_cl): if not importer: importer = BaseReportImporter(**importer_params) importer.update_cloud_import_attempt(now, reason) - self.send_report_failed_email(ca, previous_attempt_ts, now) + self.send_report_failed_email( + ca, previous_attempt_ts, now, + is_throttled=_is_rate_limit_exc(exc)) raise def send_report_failed_email(self, cloud_account, previous_attempt_ts, - now): + now, is_throttled=False): last_import_at = cloud_account['last_import_at'] if not last_import_at: last_import_at = cloud_account['created_at'] @@ -236,9 +248,11 @@ def send_report_failed_email(self, cloud_account, previous_attempt_ts, utcfromtimestamp(now)): # email already sent today during previous report import fails return + action = ('report_import_throttled' if is_throttled + else 'report_import_failed') self.publish_activities_task( cloud_account['organization_id'], cloud_account['id'], - 'cloud_account', 'report_import_failed', + 'cloud_account', action, 'organization.report_import.failed') def process_task(self, body, message): diff --git a/docker_images/keeper_executor/events.py b/docker_images/keeper_executor/events.py index 00affdac4..a29cfc525 100644 --- a/docker_images/keeper_executor/events.py +++ b/docker_images/keeper_executor/events.py @@ -391,3 +391,10 @@ class Events(enum.Enum): ['object_name', 'object_id'], "INFO" ] + N0163 = [ + "Billing data import for cloud account {object_name} " + "({cloud_account_id}) was throttled by the cloud provider: " + "{error_reason}", + ["object_name", "cloud_account_id", "error_reason"], + "WARNING" + ] diff --git a/docker_images/keeper_executor/executors/main_events.py b/docker_images/keeper_executor/executors/main_events.py index c3e665840..c45d87995 100644 --- a/docker_images/keeper_executor/executors/main_events.py +++ b/docker_images/keeper_executor/executors/main_events.py @@ -24,6 +24,7 @@ def action_event_map(self): 'cloud_account_deleted': Events.N0068, 'report_import_completed': Events.N0069, 'report_import_failed': Events.N0070, + 'report_import_throttled': Events.N0163, 'assignment_request_accepted': Events.N0071, 'assignment_request_declined': Events.N0072, 'root_assigned_resource': Events.N0076, From a0e31f0a746317642bfc8c640024ab87ecffa2ec Mon Sep 17 00:00:00 2001 From: Aleksandra Ovchinnikova <46540441+bdjilka@users.noreply.github.com> Date: Wed, 27 May 2026 01:57:27 -0400 Subject: [PATCH 2/9] MPT-20813 Move shared expenses logic to tools --- .../rest_api_server/controllers/expense.py | 138 ++--------------- tools/optscale_data/expenses.py | 143 ++++++++++++++++++ 2 files changed, 159 insertions(+), 122 deletions(-) create mode 100644 tools/optscale_data/expenses.py diff --git a/rest_api/rest_api_server/controllers/expense.py b/rest_api/rest_api_server/controllers/expense.py index 7eb516c25..7c5ddc8d9 100644 --- a/rest_api/rest_api_server/controllers/expense.py +++ b/rest_api/rest_api_server/controllers/expense.py @@ -20,7 +20,8 @@ from tools.cloud_adapter.cloud import Cloud as CloudAdapter from tools.optscale_data.clickhouse import ExternalDataConverter -from tools.optscale_time import utcfromtimestamp, utcnow +from tools.optscale_data.expenses import ExpenseQuery +from tools.optscale_time import utcfromtimestamp LOG = logging.getLogger(__name__) NOT_SET_NAME = '(not set)' @@ -32,6 +33,16 @@ class ExpenseController(MongoMixin, ClickHouseMixin): def __init__(self, config=None): super().__init__() self._config = config + self._query = None + + @property + def query(self) -> ExpenseQuery: + if self._query is None: + self._query = ExpenseQuery( + execute_clickhouse=self.execute_clickhouse, + resources_collection=self.resources_collection, + ) + return self._query def get_expenses_for_pools(self, start_date, end_date, pool_ids): return self.get_expenses( @@ -114,132 +125,15 @@ def _get_expenses_clickhouse( def get_cloud_expenses_with_resource_info(self, cloud_acc_list, start_date, end_date): - pipeline = [ - { - '$match': { - '$and': [ - {'cloud_account_id': {'$in': cloud_acc_list}}, - {'_first_seen_date': {'$lt': end_date}}, - {'_last_seen_date': {'$gte': start_date.replace( - hour=0, minute=0, second=0, microsecond=0)}}, - {'first_seen': {'$lt': int(end_date.timestamp())}}, - {'last_seen': {'$gte': int(start_date.timestamp())}}, - {'deleted_at': 0} - ] - } - }, - { - '$group': { - '_id': '$cloud_account_id', - 'count': {'$sum': 1} - } - } - ] - resource_counts = list(self.resources_collection.aggregate(pipeline)) - query = """ - SELECT cloud_account_id, SUM(cost * sign), count - FROM expenses - JOIN cloud_accounts - ON expenses.cloud_account_id = cloud_accounts._id - WHERE date >= %(start_date)s AND date < %(end_date)s - GROUP BY cloud_account_id, count - """ - return self.execute_clickhouse( - query=query, - parameters={ - 'start_date': start_date, - 'end_date': end_date - }, - external_data=ExternalDataConverter()([{ - 'name': 'cloud_accounts', - 'structure': [ - ('_id', 'String'), - ('count', 'Int32') - ], - 'data': resource_counts - }]), + return self.query.get_cloud_expenses_with_resource_info( + cloud_acc_list, start_date, end_date ) def get_monthly_forecast(self, cost, month_cost, first_expense=None): - today = datetime.today() - month_start = today.replace( - day=1, hour=0, minute=0, second=0, microsecond=0) - last_month_start = (month_start - timedelta(days=1)).replace(day=1) - start_date = max(last_month_start, first_expense) if ( - first_expense) else last_month_start - worked_days = (today - month_start).days - forecast_days = (today - start_date).days - daily_forecast = cost / forecast_days if forecast_days > 0 else cost - _, days_in_month = monthrange(today.year, today.month) - forecast = month_cost + daily_forecast * (days_in_month - worked_days) - return round(forecast, 2) - - def _get_first_cloud_account_expense(self, cloud_account_ids, date, - field=None, values=None): - if (field and not values) or not cloud_account_ids: - return [] - if field and re.search(r'[^_A-Za-z0-9]', field): - raise ValueError('Suspected SQL injection ') - query = f""" - SELECT {field if field else 'cloud_account_id'}, min(date) - FROM expenses - WHERE cloud_account_id - IN cloud_account_ids{' AND %s IN values' % - field if field else ''} - AND date >= %(date)s - GROUP BY cloud_account_id{', %s' % field if field else ''} - """ - external_tables = [ - { - 'name': 'cloud_account_ids', - 'structure': [('id', 'String')], - 'data': [{'id': r_id} for r_id in cloud_account_ids] - } - ] - if values: - external_tables.append({ - 'name': 'values', - 'structure': [('id', 'String')], - 'data': [{'id': r_id} for r_id in values] - }) - return self.execute_clickhouse( - query=query, - parameters={ - 'date': date - }, - external_data=ExternalDataConverter()(external_tables) - ) + return self.query.get_monthly_forecast(cost, month_cost, first_expense) def get_first_expenses_for_forecast(self, field, values): - prev_month_start = (utcnow().replace(day=1) - timedelta( - days=1)).replace(day=1, hour=0, minute=0, second=0, microsecond=0) - if field in ['cloud_account_id']: - result = self._get_first_cloud_account_expense( - values, prev_month_start) - else: - # TODO: Not the optimal solution to get expenses dates. - # We can get all the necessary dates at the time of receiving the - # expenses for the previous month - resources = list(self.resources_collection.find( - {field: {'$in': values}, 'cloud_account_id': {'$ne': None}}, - ['cloud_account_id', field])) - r_ids = list(map(lambda x: x['_id'], resources)) - cloud_account_ids = set( - map(lambda x: x.get('cloud_account_id'), resources)) - expenses = self._get_first_cloud_account_expense( - list(cloud_account_ids), prev_month_start, 'resource_id', - r_ids) - expenses_map = {e[0]: e[1] for e in expenses} - result = {} - for resource in resources: - value = resource.get(field) - date = expenses_map.get(resource['_id']) - if not date: - continue - if value not in result or result[value] > date: - result[value] = date - result = [(k, v) for k, v in result.items()] - return {r[0]: r[1] for r in result} + return self.query.get_first_expenses_for_forecast(field, values) def get_raw_expenses(self, start_date, end_date, filters): match_filters = [{'start_date': {'$lt': end_date}}] diff --git a/tools/optscale_data/expenses.py b/tools/optscale_data/expenses.py new file mode 100644 index 000000000..73bbd0e83 --- /dev/null +++ b/tools/optscale_data/expenses.py @@ -0,0 +1,143 @@ +import re +from calendar import monthrange +from datetime import datetime, timedelta, timezone + +from optscale_data.clickhouse import ExternalDataConverter +from tools.optscale_time import utcnow + + +class ExpenseQuery: + def __init__(self, execute_clickhouse, resources_collection): + self._execute_ch = execute_clickhouse + self._resources = resources_collection + + def _execute(self, query, **kwargs): + return self._execute_ch(query=query, **kwargs) + + def get_cloud_expenses_with_resource_info(self, cloud_acc_list, start_date, end_date): + pipeline = [ + { + '$match': { + '$and': [ + {'cloud_account_id': {'$in': cloud_acc_list}}, + {'_first_seen_date': {'$lt': end_date}}, + {'_last_seen_date': {'$gte': start_date.replace( + hour=0, minute=0, second=0, microsecond=0)}}, + {'first_seen': {'$lt': int(end_date.timestamp())}}, + {'last_seen': {'$gte': int(start_date.timestamp())}}, + {'deleted_at': 0} + ] + } + }, + { + '$group': { + '_id': '$cloud_account_id', + 'count': {'$sum': 1} + } + } + ] + resource_counts = list(self._resources.aggregate(pipeline)) + query = """ + SELECT cloud_account_id, SUM(cost * sign), count + FROM expenses + JOIN cloud_accounts + ON expenses.cloud_account_id = cloud_accounts._id + WHERE date >= %(start_date)s AND date < %(end_date)s + GROUP BY cloud_account_id, count + """ + return self._execute( + query=query, + parameters={ + 'start_date': start_date, + 'end_date': end_date + }, + external_data=ExternalDataConverter()([{ + 'name': 'cloud_accounts', + 'structure': [ + ('_id', 'String'), + ('count', 'Int32') + ], + 'data': resource_counts + }]), + ) + + @staticmethod + def get_monthly_forecast(cost, month_cost, first_expense=None): + today = datetime.today() + month_start = today.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + last_month_start = (month_start - timedelta(days=1)).replace(day=1) + start_date = max(last_month_start, first_expense) if ( + first_expense) else last_month_start + worked_days = (today - month_start).days + forecast_days = (today - start_date).days + daily_forecast = cost / forecast_days if forecast_days > 0 else cost + _, days_in_month = monthrange(today.year, today.month) + forecast = month_cost + daily_forecast * (days_in_month - worked_days) + return round(forecast, 2) + + def _get_first_cloud_account_expense( + self, cloud_account_ids, date, field=None, values=None + ): + if (field and not values) or not cloud_account_ids: + return [] + if field and re.search(r'[^_A-Za-z0-9]', field): + raise ValueError('Suspected SQL injection ') + query = f""" + SELECT {field if field else 'cloud_account_id'}, min(date) + FROM expenses + WHERE cloud_account_id + IN cloud_account_ids{' AND %s IN values' % + field if field else ''} + AND date >= %(date)s + GROUP BY cloud_account_id{', %s' % field if field else ''} + """ + external_tables = [ + { + 'name': 'cloud_account_ids', + 'structure': [('id', 'String')], + 'data': [{'id': r_id} for r_id in cloud_account_ids] + } + ] + if values: + external_tables.append({ + 'name': 'values', + 'structure': [('id', 'String')], + 'data': [{'id': r_id} for r_id in values] + }) + return self._execute( + query=query, + parameters={ + 'date': date + }, + external_data=ExternalDataConverter()(external_tables) + ) + + def get_first_expenses_for_forecast(self, field, values): + prev_month_start = (utcnow().replace(day=1) - timedelta( + days=1)).replace(day=1, hour=0, minute=0, second=0, microsecond=0) + if field in ['cloud_account_id']: + result = self._get_first_cloud_account_expense(values, prev_month_start) + else: + # TODO: Not the optimal solution to get expenses dates. + # We can get all the necessary dates at the time of receiving the + # expenses for the previous month + resources = list(self._resources.find( + {field: {'$in': values}, 'cloud_account_id': {'$ne': None}}, + ['cloud_account_id', field])) + r_ids = list(map(lambda x: x['_id'], resources)) + cloud_account_ids = set( + map(lambda x: x.get('cloud_account_id'), resources)) + expenses = self._get_first_cloud_account_expense( + list(cloud_account_ids), prev_month_start, 'resource_id', + r_ids) + expenses_map = {e[0]: e[1] for e in expenses} + result = {} + for resource in resources: + value = resource.get(field) + date = expenses_map.get(resource['_id']) + if not date: + continue + if value not in result or result[value] > date: + result[value] = date + result = [(k, v) for k, v in result.items()] + return {r[0]: r[1] for r in result} From aaa0a8493faf0443dfd9bd9289ccae91a0e031a9 Mon Sep 17 00:00:00 2001 From: sd-hystax <110374605+sd-hystax@users.noreply.github.com> Date: Tue, 5 May 2026 12:53:15 +0300 Subject: [PATCH 3/9] =?UTF-8?q?OSN-1451.=20Region=20expenses,=20traffic=20?= =?UTF-8?q?expenses,=20available=20filters=20perfor=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …mance ## Description Region expenses, traffic expenses, available filters performance ## Related issue number OSN-1451 ## Special notes ## Checklist * [ ] The pull request title is a good summary of the changes * [ ] Unit tests for the changes exist * [ ] New and existing unit tests pass locally --- .../controllers/available_filters.py | 60 +++++++++++++++---- .../rest_api_server/controllers/expense.py | 24 ++++---- .../controllers/traffic_expense.py | 2 +- tools/cloud_adapter/clouds/alibaba.py | 4 +- tools/cloud_adapter/clouds/aws.py | 4 +- tools/cloud_adapter/clouds/azure.py | 4 +- tools/cloud_adapter/clouds/base.py | 2 +- tools/cloud_adapter/clouds/databricks.py | 2 +- tools/cloud_adapter/clouds/environment.py | 2 +- tools/cloud_adapter/clouds/gcp.py | 2 +- tools/cloud_adapter/clouds/kubernetes.py | 2 +- tools/cloud_adapter/clouds/nebius.py | 2 +- 12 files changed, 77 insertions(+), 33 deletions(-) diff --git a/rest_api/rest_api_server/controllers/available_filters.py b/rest_api/rest_api_server/controllers/available_filters.py index 2755f7bad..3538b7512 100644 --- a/rest_api/rest_api_server/controllers/available_filters.py +++ b/rest_api/rest_api_server/controllers/available_filters.py @@ -237,23 +237,57 @@ def _aggregate_resource_data(self, match_query, **kwargs): 'day': {'$trunc': { '$divide': ['$first_seen', DAY_IN_SECONDS]}}, }, - 'tags': {'$addToSet': '$tags.k'}, - 'meta': {'$addToSet': '$meta.k'}, + 'tags': {'$push': '$tagKeys'}, + 'meta': {'$push': '$metaKeys'}, 'cloud_resource_ids': {'$addToSet': '$cloud_resource_id'}, }) return self.resources_collection.aggregate([ {'$match': match_query}, - {'$addFields': {'tags': {'$objectToArray': "$tags"}}}, - {'$unwind': { - 'path': "$tags", - 'preserveNullAndEmptyArrays': True - }}, - {'$addFields': {'meta': {'$objectToArray': "$meta"}}}, - {'$unwind': { - 'path': "$meta", - 'preserveNullAndEmptyArrays': True - }}, - {'$group': group_stage} + { + '$addFields': { + 'tagKeys': { + '$map': { + 'input': { + '$objectToArray': {'$ifNull': ['$tags', {}]} + }, + 'as': "t", + 'in': "$$t.k" + } + }, + 'metaKeys': { + '$map': { + 'input': { + '$objectToArray': {'$ifNull': ["$meta", {}]} + }, + 'as': "m", + 'in': "$$m.k" + } + } + } + }, + {'$group': group_stage}, + { + '$addFields': { + 'tags': { + '$reduce': { + 'input': "$tags", + 'initialValue': [], + 'in': {'$setUnion': ["$$value", "$$this"]} + } + } + } + }, + { + '$addFields': { + 'meta': { + '$reduce': { + 'input': "$meta", + 'initialValue': [], + 'in': {'$setUnion': ["$$value", "$$this"]} + } + } + } + } ], allowDiskUse=True) def get(self, organization_id, **params): diff --git a/rest_api/rest_api_server/controllers/expense.py b/rest_api/rest_api_server/controllers/expense.py index 7c5ddc8d9..918f1f31f 100644 --- a/rest_api/rest_api_server/controllers/expense.py +++ b/rest_api/rest_api_server/controllers/expense.py @@ -79,12 +79,16 @@ def _get_expenses_clickhouse( 'deleted_at': 0 }, resource_fields) - external_resource_table = [{ - '_id': x['_id'], - 'cloud_account_id': x['cloud_account_id'], - 'group_field': x.get(resource_field_mappings.get( - group_by, group_by)) - } for x in resource_results if x.get('cloud_account_id')] + cloud_account_ids, external_resource_table = set(), [] + for x in resource_results: + cloud_account_id = x.get('cloud_account_id') + if cloud_account_id: + cloud_account_ids.add(cloud_account_id) + external_resource_table.append({ + '_id': x['_id'], + 'group_field': x.get(resource_field_mappings.get( + group_by, group_by)) + }) expenses_results = self.execute_clickhouse( query=""" SELECT @@ -92,8 +96,8 @@ def _get_expenses_clickhouse( SUM(cost * sign) AS total_cost FROM expenses JOIN resources ON expenses.resource_id = resources._id - AND expenses.cloud_account_id = resources.cloud_account_id - WHERE date >= %(start_date)s + WHERE cloud_account_id IN %(cloud_account_ids)s + AND date >= %(start_date)s AND date <= %(end_date)s GROUP BY date, group_field, cloud_account_id HAVING SUM(sign) > 0 @@ -102,12 +106,12 @@ def _get_expenses_clickhouse( parameters={ 'start_date': start_date, 'end_date': end_date, + 'cloud_account_ids': list(cloud_account_ids), }, external_data=ExternalDataConverter()([{ 'name': 'resources', 'structure': [ ('_id', 'String'), - ('cloud_account_id', 'String'), ('group_field', 'Nullable(String)'), ], 'data': external_resource_table @@ -1982,7 +1986,7 @@ def get_info_map(self, cloud_accs): if cloud_type in scanned: continue adapter = CloudAdapter.get_adapter(cloud_config) - regions = adapter.get_regions_coordinates() + regions = adapter.get_regions_coordinates(load=False) for region_id, info in regions.items(): res.update(self._generate_info_map_element( region_id, info, cloud_type diff --git a/rest_api/rest_api_server/controllers/traffic_expense.py b/rest_api/rest_api_server/controllers/traffic_expense.py index e844059e9..49afd022d 100644 --- a/rest_api/rest_api_server/controllers/traffic_expense.py +++ b/rest_api/rest_api_server/controllers/traffic_expense.py @@ -46,7 +46,7 @@ def _get_configs(cloud_accs): @staticmethod def _get_coordinates(config): cloud_adapter = CloudAdapter.get_adapter(config) - res = cloud_adapter.get_regions_coordinates() + res = cloud_adapter.get_regions_coordinates(load=False) coordinates_result = {} for k, v in res.items(): coordinates = { diff --git a/tools/cloud_adapter/clouds/alibaba.py b/tools/cloud_adapter/clouds/alibaba.py index 907e72d9f..b19a64a9e 100644 --- a/tools/cloud_adapter/clouds/alibaba.py +++ b/tools/cloud_adapter/clouds/alibaba.py @@ -956,9 +956,11 @@ def _get_outdated_regions(): 'longitude': 150.7915495, 'latitude': -33.8481643}, } - def get_regions_coordinates(self): + def get_regions_coordinates(self, load=True): coordinates_map = self._get_coordinates_map() coordinates_map.update(self._get_outdated_regions()) + if not load: + return coordinates_map try: for region_details in self._list_region_details(): region_id = region_details['RegionId'] diff --git a/tools/cloud_adapter/clouds/aws.py b/tools/cloud_adapter/clouds/aws.py index 78f34b5ab..8e161fad7 100644 --- a/tools/cloud_adapter/clouds/aws.py +++ b/tools/cloud_adapter/clouds/aws.py @@ -1651,12 +1651,14 @@ def _get_coordinates_map(self): 'global': {'longitude': -98.48424, 'latitude': 39.01190} } - def get_regions_coordinates(self): + def get_regions_coordinates(self, load=True): zero_coordinates = { 'longitude': None, 'latitude': None } coordinates_map = self._get_coordinates_map() + if not load: + return coordinates_map try: for available_region in self.list_regions(): if not coordinates_map.get(available_region): diff --git a/tools/cloud_adapter/clouds/azure.py b/tools/cloud_adapter/clouds/azure.py index bc6791817..4f886eba5 100644 --- a/tools/cloud_adapter/clouds/azure.py +++ b/tools/cloud_adapter/clouds/azure.py @@ -1756,7 +1756,7 @@ def _get_coordinates_map(self): coordinates_map.update(self._get_cn_coordinates_map()) return coordinates_map - def get_regions_coordinates(self): + def get_regions_coordinates(self, load=True): def to_coord(coordinate): if isinstance(coordinate, str): try: @@ -1766,6 +1766,8 @@ def to_coord(coordinate): return coordinate coordinates_map = self._get_coordinates_map() + if not load: + return coordinates_map try: for region in self.subscription.subscriptions.list_locations( self._subscription_id): diff --git a/tools/cloud_adapter/clouds/base.py b/tools/cloud_adapter/clouds/base.py index 37a7d4891..3d9c1fe0d 100644 --- a/tools/cloud_adapter/clouds/base.py +++ b/tools/cloud_adapter/clouds/base.py @@ -52,7 +52,7 @@ def configure_last_import_modified_at(self): raise NotImplementedError @abc.abstractmethod - def get_regions_coordinates(self): + def get_regions_coordinates(self, load=True): raise NotImplementedError diff --git a/tools/cloud_adapter/clouds/databricks.py b/tools/cloud_adapter/clouds/databricks.py index 5af1ebaa4..b9c53bdc3 100644 --- a/tools/cloud_adapter/clouds/databricks.py +++ b/tools/cloud_adapter/clouds/databricks.py @@ -217,7 +217,7 @@ def set_currency(self, currency): def configure_last_import_modified_at(self): pass - def get_regions_coordinates(self): + def get_regions_coordinates(self, load=True): return {} def discovery_calls_map(self): diff --git a/tools/cloud_adapter/clouds/environment.py b/tools/cloud_adapter/clouds/environment.py index 358de736f..5dda399b4 100644 --- a/tools/cloud_adapter/clouds/environment.py +++ b/tools/cloud_adapter/clouds/environment.py @@ -43,7 +43,7 @@ def rds_instance_discovery_calls(self): def ip_address_discovery_calls(self): raise NotImplementedError - def get_regions_coordinates(self): + def get_regions_coordinates(self, load=True): return {} def set_currency(self, currency): diff --git a/tools/cloud_adapter/clouds/gcp.py b/tools/cloud_adapter/clouds/gcp.py index 9de0dc39b..7d038661f 100644 --- a/tools/cloud_adapter/clouds/gcp.py +++ b/tools/cloud_adapter/clouds/gcp.py @@ -2129,7 +2129,7 @@ def _add_global_coordinates(regions: dict): regions["australia-southeast1"]["alias"] = "Australia" regions["asia-east2"]["alias"] = "China" - def get_regions_coordinates(self): + def get_regions_coordinates(self, load=True): coordinates = self._get_regions_coordinates() self._add_global_coordinates(coordinates) return coordinates diff --git a/tools/cloud_adapter/clouds/kubernetes.py b/tools/cloud_adapter/clouds/kubernetes.py index 05f95d28d..8143ef08e 100644 --- a/tools/cloud_adapter/clouds/kubernetes.py +++ b/tools/cloud_adapter/clouds/kubernetes.py @@ -230,7 +230,7 @@ def ip_address_discovery_calls(self): # For Kubernetes cloud we don't have ips as a separate resources return [] - def get_regions_coordinates(self): + def get_regions_coordinates(self, load=True): return {} def set_currency(self, currency): diff --git a/tools/cloud_adapter/clouds/nebius.py b/tools/cloud_adapter/clouds/nebius.py index 6c86ac7df..25c7ec464 100644 --- a/tools/cloud_adapter/clouds/nebius.py +++ b/tools/cloud_adapter/clouds/nebius.py @@ -915,7 +915,7 @@ def get_metric(self, metric_name, instance_ids, start_date, end_date, return self._get_metrics(start_date, end_date, query, folder_id, downsampling) - def get_regions_coordinates(self): + def get_regions_coordinates(self, load=True): return self.config.get('regions_coordinates', {}) or REGIONS_COORDINATES def get_prices(self, currency='USD', filter=None, **kwargs): From 585e7e7c0005bfb0239611221f26bdfbac1061ad Mon Sep 17 00:00:00 2001 From: sd-hystax <110374605+sd-hystax@users.noreply.github.com> Date: Wed, 27 May 2026 14:06:09 +0300 Subject: [PATCH 4/9] OSN-1481. Fixed primary key issue for clickhouse expenses query ## Description Fixed primary key issue for clickhouse expenses query ## Related issue number OSN-1481 ## Special notes ## Checklist * [ ] The pull request title is a good summary of the changes * [ ] Unit tests for the changes exist * [ ] New and existing unit tests pass locally --- tools/optscale_data/expenses.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tools/optscale_data/expenses.py b/tools/optscale_data/expenses.py index 73bbd0e83..d97cece1a 100644 --- a/tools/optscale_data/expenses.py +++ b/tools/optscale_data/expenses.py @@ -42,14 +42,16 @@ def get_cloud_expenses_with_resource_info(self, cloud_acc_list, start_date, end_ FROM expenses JOIN cloud_accounts ON expenses.cloud_account_id = cloud_accounts._id - WHERE date >= %(start_date)s AND date < %(end_date)s + WHERE cloud_account_id IN %(cloud_acc_list)s + AND date >= %(start_date)s AND date < %(end_date)s GROUP BY cloud_account_id, count """ return self._execute( query=query, parameters={ 'start_date': start_date, - 'end_date': end_date + 'end_date': end_date, + 'cloud_acc_list': cloud_acc_list }, external_data=ExternalDataConverter()([{ 'name': 'cloud_accounts', From c050add06d460206a93226b3a0d7635c7d306008 Mon Sep 17 00:00:00 2001 From: asm-hystax Date: Wed, 27 May 2026 15:29:01 +0400 Subject: [PATCH 5/9] OSN-1473. Removed telegram link from email templates --- herald/Dockerfile_tests | 1 + herald/modules/__init__.py | 0 .../email_generator/context_generator.py | 2 -- .../email_generator/templates/alert.html | 7 +------ .../templates/anomaly_detection_alert.html | 7 +------ .../bumi_module_execution_failed.html | 7 +------ .../templates/bumi_task_execution_failed.html | 7 +------ .../templates/cloud_account_deleted.html | 7 +------ .../templates/disconnect_survey.html | 7 +------ .../templates/employee_greetings.html | 21 ------------------- .../templates/environment_changes.html | 7 +------ .../templates/first_shareable_resources.html | 7 +------ .../templates/incorrect_alibaba_expenses.html | 7 +------ .../templates/insider_prices_sslerror.html | 7 +------ .../email_generator/templates/invite.html | 7 +------ .../templates/new_cloud_account.html | 7 +------ .../templates/new_employee.html | 7 +------ .../new_security_recommendation.html | 5 +---- .../templates/new_subscriber.html | 7 +------ .../templates/organization_audit_submit.html | 7 +------ .../organization_policy_expiring_budget.html | 5 +---- .../templates/organization_policy_quota.html | 5 +---- .../organization_policy_recurring_budget.html | 5 +---- .../organization_policy_tagging.html | 7 +------ .../templates/pool_exceed_report.html | 7 +------ .../pool_exceed_resources_report.html | 7 +------ .../pool_owner_violation_report.html | 7 +------ .../templates/report_import_failed.html | 7 +------ .../report_imports_passed_for_org.html | 7 +------ .../resource_owner_violation_alert.html | 7 +------ .../resource_owner_violation_report.html | 7 +------ .../templates/restore_password.html | 7 +------ .../templates/saving_spike.html | 5 +---- .../templates/verify_email.html | 7 +------ .../templates/weekly_expense_report.html | 7 +------ herald/run_test.sh | 2 +- 36 files changed, 33 insertions(+), 200 deletions(-) create mode 100644 herald/modules/__init__.py diff --git a/herald/Dockerfile_tests b/herald/Dockerfile_tests index c1de70ce4..ca3a4b668 100644 --- a/herald/Dockerfile_tests +++ b/herald/Dockerfile_tests @@ -12,4 +12,5 @@ COPY optscale_client/herald_client optscale_client/herald_client RUN uv --project herald sync --locked COPY herald/herald_server/tests herald/herald_server/tests +COPY herald/modules/__init__.py herald/modules/__init__.py COPY herald/modules/tests herald/modules/tests diff --git a/herald/modules/__init__.py b/herald/modules/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/herald/modules/email_generator/context_generator.py b/herald/modules/email_generator/context_generator.py index 42f773b2f..f2a47c8b6 100644 --- a/herald/modules/email_generator/context_generator.py +++ b/herald/modules/email_generator/context_generator.py @@ -69,7 +69,6 @@ def get_default_context(): return { "images": { "logo": "https://cdn.hystax.com/OptScale/OptScale-logo-white.png", - "telegram": "https://cdn.hystax.com/OptScale/email-images/telegram.png", "optscale_ml_banner": "https://cdn.hystax.com/OptScale/email-images/optscale-ml-welcome-banner.png", "optscale_finops": "https://cdn.hystax.com/OptScale/email-images/optscale-finops-capabilities.png", }, @@ -87,7 +86,6 @@ def get_default_context(): "linkedin": "https://linkedin.com/company/hystax", "twitter": "https://twitter.com/hystaxcom", "facebook": "https://facebook.com/hystax", - "telegram": "https://t.me/hystax", "terms_of_use": "https://hystax.com/terms-of-use/", "privacy_policy": "https://hystax.com/privacy-policy/", "documentation": "https://hystax.com/documentation/optscale", diff --git a/herald/modules/email_generator/templates/alert.html b/herald/modules/email_generator/templates/alert.html index ad2152dce..655213975 100644 --- a/herald/modules/email_generator/templates/alert.html +++ b/herald/modules/email_generator/templates/alert.html @@ -211,7 +211,7 @@
- + {% if links.linkedin|length %} {% endif %} - {% if links.telegram|length %} - - {% endif %}
diff --git a/herald/modules/email_generator/templates/anomaly_detection_alert.html b/herald/modules/email_generator/templates/anomaly_detection_alert.html index d47f8c416..f88f0a509 100644 --- a/herald/modules/email_generator/templates/anomaly_detection_alert.html +++ b/herald/modules/email_generator/templates/anomaly_detection_alert.html @@ -321,7 +321,7 @@
- {% if links.linkedin|length %} @@ -339,11 +339,6 @@ Facebook {% endif %} - {% if links.telegram|length %} - - {% endif %}
diff --git a/herald/modules/email_generator/templates/bumi_module_execution_failed.html b/herald/modules/email_generator/templates/bumi_module_execution_failed.html index 380b11a88..1e39d7c1d 100644 --- a/herald/modules/email_generator/templates/bumi_module_execution_failed.html +++ b/herald/modules/email_generator/templates/bumi_module_execution_failed.html @@ -383,7 +383,7 @@
- + {% if links.linkedin|length %} {% endif %} - {% if links.telegram|length %} - - {% endif %}
diff --git a/herald/modules/email_generator/templates/bumi_task_execution_failed.html b/herald/modules/email_generator/templates/bumi_task_execution_failed.html index 0938db887..7c4218095 100644 --- a/herald/modules/email_generator/templates/bumi_task_execution_failed.html +++ b/herald/modules/email_generator/templates/bumi_task_execution_failed.html @@ -427,7 +427,7 @@
- + {% if links.linkedin|length %} {% endif %} - {% if links.telegram|length %} - - {% endif %}
diff --git a/herald/modules/email_generator/templates/cloud_account_deleted.html b/herald/modules/email_generator/templates/cloud_account_deleted.html index d6aa77560..4b6230ae0 100644 --- a/herald/modules/email_generator/templates/cloud_account_deleted.html +++ b/herald/modules/email_generator/templates/cloud_account_deleted.html @@ -412,7 +412,7 @@
- + {% if links.linkedin|length %} {% endif %} - {% if links.telegram|length %} - - {% endif %}
diff --git a/herald/modules/email_generator/templates/disconnect_survey.html b/herald/modules/email_generator/templates/disconnect_survey.html index c4b6e3349..52ab0f01d 100644 --- a/herald/modules/email_generator/templates/disconnect_survey.html +++ b/herald/modules/email_generator/templates/disconnect_survey.html @@ -336,7 +336,7 @@
- + {% if links.linkedin|length %} {% endif %} - {% if links.telegram|length %} - - {% endif %}
diff --git a/herald/modules/email_generator/templates/employee_greetings.html b/herald/modules/email_generator/templates/employee_greetings.html index d13685f56..281c3f3f8 100644 --- a/herald/modules/email_generator/templates/employee_greetings.html +++ b/herald/modules/email_generator/templates/employee_greetings.html @@ -740,27 +740,6 @@ - {% if links.telegram|length %} - - - - - - -
- - - - - - -
- - - -
-
- {% endif %} ## Checklist * [ ] The pull request title is a good summary of the changes * [ ] Unit tests for the changes exist * [ ] New and existing unit tests pass locally --- insider/insider_worker/http_client/client.py | 7 +- insider/insider_worker/main.py | 20 +++- ...20260618150755_last_seen_currency_index.py | 53 +++++++++ insider/insider_worker/processors/azure.py | 112 +++++++++--------- insider/insider_worker/processors/base.py | 5 +- 5 files changed, 131 insertions(+), 66 deletions(-) create mode 100644 insider/insider_worker/migrations/20260618150755_last_seen_currency_index.py diff --git a/insider/insider_worker/http_client/client.py b/insider/insider_worker/http_client/client.py index a6d64842b..27576df10 100644 --- a/insider/insider_worker/http_client/client.py +++ b/insider/insider_worker/http_client/client.py @@ -35,12 +35,13 @@ def request(self, url, method): response_body = None # pylint: disable=E1101 if response.status_code != requests.codes.no_content: - if 'application/json' in response.headers['Content-Type']: + content_type = response.headers.get('Content-Type', '') + if 'application/json' in content_type: response_body = json.loads( response.content.decode('utf-8')) - if 'text/plain' in response.headers['Content-Type']: + elif 'text/plain' in content_type: response_body = response.content.decode() - if 'application/octet-stream' in response.headers['Content-Type']: + else: response_body = response.content return response.status_code, response_body diff --git a/insider/insider_worker/main.py b/insider/insider_worker/main.py index 260f8c4ce..e144fe683 100644 --- a/insider/insider_worker/main.py +++ b/insider/insider_worker/main.py @@ -21,6 +21,7 @@ LOG = get_logger(__name__) TASK_EXCHANGE = Exchange(EXCHANGE_NAME, type='direct') TASK_QUEUE = Queue(QUEUE_NAME, TASK_EXCHANGE, routing_key=QUEUE_NAME) +DISCOVERIES_THRESHOLD = 43200 # 12 hours in seconds class InsiderWorker(ConsumerMixin): @@ -42,13 +43,28 @@ def discoveries(self): def get_consumers(self, consumer, channel): return [consumer(queues=[TASK_QUEUE], accept=['json'], - callbacks=[self.process_task], prefetch_count=10)] + callbacks=[self.process_task], prefetch_count=1)] + + def get_last_discovery_ts(self, cloud_type): + discoveries = self.discoveries.find( + {'cloud_type': cloud_type, 'completed_at': {'$ne': 0}} + ).sort( + [('completed_at', -1)]).limit(1) + try: + discovery = next(discoveries) + return discovery.get('started_at', 0) + except StopIteration: + return 0 def _process_task(self, task): start_process_time = int(datetime.now(tz=timezone.utc).timestamp()) cloud_type = task.get('cloud_type') if not cloud_type: raise Exception('Invalid task received: {}'.format(task)) + last_discovery_ts = self.get_last_discovery_ts(cloud_type) + if last_discovery_ts + DISCOVERIES_THRESHOLD >= start_process_time: + LOG.info('Skipping task for %s by threshold', cloud_type) + return discovery_id = self.discoveries.insert_one({ 'cloud_type': cloud_type, 'started_at': start_process_time, @@ -56,7 +72,7 @@ def _process_task(self, task): }).inserted_id get_processor_class(cloud_type)( - self.mongo_client, self.config_cl).process_prices() + self.mongo_client, self.config_cl).process_prices(last_discovery_ts) end_process_time = int(datetime.now(tz=timezone.utc).timestamp()) self.discoveries.update_one( diff --git a/insider/insider_worker/migrations/20260618150755_last_seen_currency_index.py b/insider/insider_worker/migrations/20260618150755_last_seen_currency_index.py new file mode 100644 index 000000000..58786e92d --- /dev/null +++ b/insider/insider_worker/migrations/20260618150755_last_seen_currency_index.py @@ -0,0 +1,53 @@ +import logging +from insider.insider_worker.migrations.base import BaseMigration + +NEW_INDEXES = { + 'CurrencyLastSeen': ['currencyCode', 'last_seen'] +} +OLD_INDEXES = { + 'LastSeen': ['last_seen'] +} +LOG = logging.getLogger(__name__) + + +class Migration(BaseMigration): + def get_indexes(self): + return [x['name'] for x in self.azure_prices.list_indexes()] + + def upgrade(self): + existing_indexes = self.get_indexes() + for index_name, index_fields in NEW_INDEXES.items(): + if index_name in existing_indexes: + LOG.info(f'Index {index_name} already exists') + continue + LOG.info(f'Creating index {index_name}') + self.azure_prices.create_index( + [(f, 1) for f in index_fields], + name=index_name, + background=True + ) + for index_name, index_fields in OLD_INDEXES.items(): + if index_name in existing_indexes: + LOG.info(f'Dropping index {index_name}') + self.azure_prices.drop_index(index_name) + else: + LOG.info(f'Index {index_name} doesn\'t exist') + + def downgrade(self): + existing_indexes = self.get_indexes() + for index_name, index_fields in OLD_INDEXES.items(): + if index_name in existing_indexes: + LOG.info(f'Index {index_name} already exists') + continue + LOG.info(f'Creating index {index_name}') + self.azure_prices.create_index( + [(f, 1) for f in index_fields], + name=index_name, + background=True + ) + for index_name, index_fields in NEW_INDEXES.items(): + if index_name in existing_indexes: + LOG.info(f'Dropping index {index_name}') + self.azure_prices.drop_index(index_name) + else: + LOG.info(f'Index {index_name} doesn\'t exist') diff --git a/insider/insider_worker/processors/azure.py b/insider/insider_worker/processors/azure.py index 5b02dc75f..cf0f5a54d 100644 --- a/insider/insider_worker/processors/azure.py +++ b/insider/insider_worker/processors/azure.py @@ -15,8 +15,8 @@ ACTIVITIES_EXCHANGE_NAME = 'activities-tasks' ACTIVITIES_EXCHANGE = Exchange(ACTIVITIES_EXCHANGE_NAME, type='topic') LOG = get_logger(__name__) -PRICES_PER_REQUEST = 100 PRICES_COUNT_TO_LOG = 1000 +CHINA_CURRENCY_CODE = 'CNY' class AzurePriceProcessor(BasePriceProcessor): @@ -35,16 +35,6 @@ def discoveries(self): def prices(self): return self.mongo_client.insider.azure_prices - def get_last_discovery(self): - discoveries = self.discoveries.find( - {'cloud_type': self.CLOUD_TYPE, 'completed_at': {'$ne': 0}} - ).sort( - [('completed_at', -1)]).limit(1) - try: - return next(discoveries) - except StopIteration: - return {} - @staticmethod def unique_values(price): return tuple(price.get(p) for p in AzurePriceProcessor.UNIQUE_FIELDS) @@ -82,42 +72,39 @@ def _get_currencies_list(self): currencies = set(map(lambda x: x['currency'], orgs['organizations'])) return list(currencies) - def _process_global_prices(self, http_client, old_prices_map): - LOG.info('Start processing Azure Global prices') - for currency in self._get_currencies_list(): - LOG.info('Processing Azure prices for currency: %s', currency) - processed_keys = {} - prices_counter = 0 - - next_page = 'https://prices.azure.com/api/retail/prices' - next_page += '?currencyCode=%s' % currency - while True: - if prices_counter % PRICES_COUNT_TO_LOG == 0: - LOG.info('Total number of prices got from ' - 'cloud: %s', prices_counter) - try: - code, response = http_client.get(next_page) - except SSLError: - LOG.error('Getting Azure prices failed with SSL ' - 'verification error. Will try to get prices' - 'without SSL verification') - self.send_sslerror_service_email() - http_client = Client(verify=False) - code, response = http_client.get(next_page) - items = response.get('Items', []) - new_prices_map = {self.unique_values(p): p for p in items} - self.update_price_records(new_prices_map, old_prices_map, - processed_keys) - new_url = response.get('NextPageLink') - if not new_url or new_url == next_page: - LOG.info('Total number of prices got from ' - 'cloud: %s', prices_counter) - break - next_page = new_url - prices_counter += response.get('Count', 0) - - def _process_china_prices(self, http_client, old_prices_map): - LOG.info('Start processing Azure China prices') + def _process_global_prices(self, http_client, old_prices_map, currency): + LOG.info('Processing Azure prices for currency: %s', currency) + processed_keys = {} + prices_counter = 0 + next_page = 'https://prices.azure.com/api/retail/prices' + next_page += '?currencyCode=%s' % currency + while True: + if prices_counter % PRICES_COUNT_TO_LOG == 0: + LOG.info('Total number of prices got from ' + 'cloud: %s', prices_counter) + try: + code, response = http_client.get(next_page) + except SSLError: + LOG.error('Getting Azure prices failed with SSL ' + 'verification error. Will try to get prices' + 'without SSL verification') + self.send_sslerror_service_email() + http_client = Client(verify=False) + code, response = http_client.get(next_page) + items = response.get('Items', []) + new_prices_map = {self.unique_values(p): p for p in items} + self.update_price_records(new_prices_map, old_prices_map, + processed_keys) + new_url = response.get('NextPageLink') + if not new_url or new_url == next_page: + LOG.info('Total number of prices got from ' + 'cloud: %s', prices_counter) + break + next_page = new_url + prices_counter += response.get('Count', 0) + + def _process_china_prices(self, http_client, old_prices_map, currency): + LOG.info('Start processing Azure China prices (%s)', currency) url = 'https://prices.azure.cn/api/retail/pricesheet/download?' \ 'api-version=2023-06-01-preview' _, response = http_client.get(url) @@ -130,17 +117,28 @@ def _process_china_prices(self, http_client, old_prices_map): LOG.info('Total number of prices got from cloud: %s', len(new_prices_map)) - def process_prices(self): - last_discovery = self.get_last_discovery() - old_prices = self.prices.find( - {'last_seen': {'$gte': last_discovery.get('started_at', 0)}}, - {k: 1 for k in self.UNIQUE_FIELDS + self.CHANGE_FIELDS + ['last_seen']} - ) - old_prices_map = {self.unique_values(p): p for p in old_prices} - + def process_prices(self, last_discovery_ts): http_client = Client() - self._process_global_prices(http_client, old_prices_map) - self._process_china_prices(http_client, old_prices_map) + process_func_map = { + CHINA_CURRENCY_CODE: self._process_china_prices + } + for currency in self._get_currencies_list(): + old_prices = self.prices.find( + { + 'last_seen': { + '$gte': last_discovery_ts + }, + 'currencyCode': currency + }, + { + k: 1 for k in + self.UNIQUE_FIELDS + self.CHANGE_FIELDS + ['last_seen'] + } + ) + old_prices_map = {self.unique_values(p): p for p in old_prices} + process_func = process_func_map.get( + currency, self._process_global_prices) + process_func(http_client, old_prices_map, currency) def update_price_records(self, new_prices_map, old_prices_map, processed_keys): diff --git a/insider/insider_worker/processors/base.py b/insider/insider_worker/processors/base.py index 468258cad..d962faaca 100644 --- a/insider/insider_worker/processors/base.py +++ b/insider/insider_worker/processors/base.py @@ -11,8 +11,5 @@ def discoveries(self): def prices(self): raise NotImplementedError() - def get_last_discovery(self): - raise NotImplementedError() - - def process_prices(self): + def process_prices(self, last_discovery_ts): raise NotImplementedError()