Skip to content

Commit f5d6e35

Browse files
authored
feature to export dynamic queries (#227)
* feature to export dynamic queries * fix(floware): dynamic query based expoer * fix(floware): dynamic query based export * Export using CSV signed url * Export using CSV signed url * fix(gold): rls filter to download api * fix(floware): support for more tables * fix(floware): use streaming write to GCS and also blocking multi-export per user
1 parent ed859ff commit f5d6e35

File tree

10 files changed

+309
-9
lines changed

10 files changed

+309
-9
lines changed

wavefront/server/modules/plugins_module/plugins_module/controllers/datasource_controller.py

Lines changed: 214 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from typing import Dict, Any
12
from datasource.bigquery.config import BigQueryConfig
23
from datasource.redshift.config import RedshiftConfig
34
from dependency_injector.wiring import inject
@@ -33,11 +34,18 @@
3334
from plugins_module.plugins_container import PluginsContainer
3435
from user_management_module.user_container import UserContainer
3536
from user_management_module.services.user_service import UserService
37+
from flo_cloud.cloud_storage import CloudStorageManager
3638
from fastapi import HTTPException
3739
from user_management_module.utils.user_utils import get_current_user
3840
from plugins_module.services.dynamic_query_service import DynamicQueryService
3941
from db_repo_module.cache.cache_manager import CacheManager
40-
from ..utils.helper import generate_cache_key, validate_yaml_query
42+
from ..utils.helper import (
43+
generate_cache_key,
44+
generate_export_filename_hash,
45+
validate_yaml_query,
46+
)
47+
import csv
48+
import io
4149
import yaml
4250
from ..utils.helper import DynamicQueryRequest
4351
from ..utils.helper import DynamicQueryExecuteRequest
@@ -47,6 +55,29 @@
4755
datasource_router = APIRouter()
4856

4957

58+
def _serialized_rows_to_csv(rows: list) -> bytes:
59+
"""Convert a list of serialized dicts (e.g. from execute_dynamic_query) to CSV bytes."""
60+
if not rows:
61+
return b''
62+
out = io.StringIO()
63+
fieldnames = list(rows[0].keys())
64+
for row in rows[1:]:
65+
for k in row:
66+
if k not in fieldnames:
67+
fieldnames.append(k)
68+
writer = csv.DictWriter(out, fieldnames=fieldnames, extrasaction='ignore')
69+
70+
def _cell_value(v):
71+
if isinstance(v, (dict, list)):
72+
return json.dumps(v)
73+
return v if v is None or isinstance(v, str) else str(v)
74+
75+
writer.writeheader()
76+
for row in rows:
77+
writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames})
78+
return out.getvalue().encode('utf-8-sig')
79+
80+
5081
@datasource_router.post('/v1/datasources')
5182
@inject
5283
async def add_datasource(
@@ -718,6 +749,188 @@ async def execute_dynamic_query(
718749
)
719750

720751

752+
EXPORT_RATE_LIMIT_SECONDS = 120 # 2 minutes between exports per user
753+
754+
755+
@datasource_router.post('/v1/{datasource_id}/dynamic-queries/{query_id}/export')
756+
@inject
757+
async def export_dynamic_query_csv(
758+
request: Request,
759+
datasource_id: str,
760+
query_id: str,
761+
filter: str | None = Query(None, alias='$filter'),
762+
offset: int | None = 0,
763+
limit: int | None = 100,
764+
dynamic_query_params: DynamicQueryExecuteRequest = None,
765+
response_formatter: ResponseFormatter = Depends(
766+
Provide[CommonContainer.response_formatter]
767+
),
768+
dynamic_query_yaml_service: DynamicQueryService = Depends(
769+
Provide[PluginsContainer.dynamic_query_service]
770+
),
771+
user_service: UserService = Depends(Provide[UserContainer.user_service]),
772+
cloud_manager: CloudStorageManager = Depends(
773+
Provide[PluginsContainer.cloud_manager]
774+
),
775+
config: dict = Depends(Provide[PluginsContainer.config]),
776+
cache_manager: CacheManager = Depends(Provide[PluginsContainer.cache_manager]),
777+
force_fetch: int = Query(0),
778+
):
779+
"""Execute the dynamic query and return results as a downloadable CSV file."""
780+
role_id, user_id, _ = get_current_user(request)
781+
782+
# Block multiple exports per user within a 2-minute window (Redis)
783+
export_rate_key = f'dynamic_query_export_rate:{user_id}'
784+
if not cache_manager.add(
785+
export_rate_key, '1', expiry=EXPORT_RATE_LIMIT_SECONDS, nx=True
786+
):
787+
return JSONResponse(
788+
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
789+
content=response_formatter.buildErrorResponse(
790+
f'Export rate limit: one export per user every {EXPORT_RATE_LIMIT_SECONDS // 60} minutes. Please try again later.'
791+
),
792+
)
793+
datasource_type, datasource_config = await get_datasource_config(datasource_id)
794+
if not datasource_config:
795+
return JSONResponse(
796+
status_code=status.HTTP_404_NOT_FOUND,
797+
content=response_formatter.buildErrorResponse(
798+
f'Datasource not found: {datasource_id}'
799+
),
800+
)
801+
yaml_query, _ = await dynamic_query_yaml_service.get_dynamic_yaml_query(query_id)
802+
if not yaml_query:
803+
return JSONResponse(
804+
status_code=status.HTTP_404_NOT_FOUND,
805+
content=response_formatter.buildErrorResponse(
806+
f'Dynamic query not found: {query_id}'
807+
),
808+
)
809+
810+
rls_filter_str = None
811+
is_admin = await check_admin(role_id)
812+
if not is_admin:
813+
rls_filters = await user_service.get_user_resources(
814+
user_id=user_id, scope=ResourceScope.DATA
815+
)
816+
if len(rls_filters) == 0:
817+
return JSONResponse(
818+
status_code=status.HTTP_403_FORBIDDEN,
819+
content=response_formatter.buildErrorResponse(
820+
'Data access not set for non-admin user'
821+
),
822+
)
823+
rls_filters = fetch_data_filters(rls_filters)
824+
rls_filter_str = f"{ ' $and '.join(rls_filters)}"
825+
826+
# Bucket and filename: hash of $filter, limit, offset, dynamic_query_params
827+
provider = config['cloud_config']['cloud_provider']
828+
bucket_name = (
829+
config['aws']['aws_asset_storage_bucket']
830+
if provider == 'aws'
831+
else config['gcp']['gcp_asset_storage_bucket']
832+
)
833+
export_hash = generate_export_filename_hash(
834+
filter=filter,
835+
limit=limit,
836+
offset=offset,
837+
params=dynamic_query_params.params if dynamic_query_params else None,
838+
rls_filter_str=rls_filter_str,
839+
)
840+
filename = f'export_{query_id}_{export_hash}.csv'
841+
file_key = f'dynamic_query_exports/{filename}'
842+
843+
# If not force_fetch, return existing file from bucket if present
844+
if not force_fetch:
845+
existing_keys, _ = cloud_manager.list_files(
846+
bucket_name=bucket_name,
847+
prefix=file_key,
848+
page_size=1,
849+
page_number=1,
850+
)
851+
if existing_keys and existing_keys[0] == file_key:
852+
signed_url = cloud_manager.generate_presigned_url(
853+
bucket_name=bucket_name, key=file_key, type='GET'
854+
)
855+
return JSONResponse(
856+
status_code=status.HTTP_200_OK,
857+
content=response_formatter.buildSuccessResponse(
858+
{'export_url': signed_url}
859+
),
860+
)
861+
862+
datasource_plugin = DatasourcePlugin(datasource_type, datasource_config)
863+
res: Dict[str, Any] = await datasource_plugin.execute_dynamic_query(
864+
yaml_query,
865+
rls_filter_str,
866+
filter,
867+
offset,
868+
limit,
869+
dynamic_query_params.params if dynamic_query_params else None,
870+
)
871+
872+
if not res:
873+
return JSONResponse(
874+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
875+
content=response_formatter.buildErrorResponse(
876+
f'Unexpected dynamic query result format for query_id {query_id}, no results'
877+
),
878+
)
879+
880+
first_key = next(iter(res))
881+
if res[first_key].get('status') != 'success':
882+
return JSONResponse(
883+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
884+
content=response_formatter.buildErrorResponse(
885+
f'Unexpected dynamic query result format for query_id {query_id}, no results'
886+
),
887+
)
888+
889+
serialized_res = serialize_values(res[first_key]['result'])
890+
891+
# Stream rows to CSV directly in GCS/S3 to avoid building the full CSV in memory
892+
rows = serialized_res or []
893+
if not isinstance(rows, list):
894+
return JSONResponse(
895+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
896+
content=response_formatter.buildErrorResponse(
897+
f'Unexpected dynamic query result format for query_id {query_id}, invalid rows'
898+
),
899+
)
900+
901+
if rows:
902+
fieldnames = list(rows[0].keys())
903+
for row in rows[1:]:
904+
for k in row:
905+
if k not in fieldnames:
906+
fieldnames.append(k)
907+
else:
908+
fieldnames = []
909+
910+
def _cell_value(v):
911+
if isinstance(v, (dict, list)):
912+
return json.dumps(v)
913+
return v if v is None or isinstance(v, str) else str(v)
914+
915+
with cloud_manager.open_text_writer(
916+
bucket_name=bucket_name, key=file_key, content_type='text/csv'
917+
) as f:
918+
if fieldnames:
919+
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
920+
writer.writeheader()
921+
for row in rows:
922+
writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames})
923+
924+
signed_url = cloud_manager.generate_presigned_url(
925+
bucket_name=bucket_name, key=file_key, type='GET'
926+
)
927+
928+
return JSONResponse(
929+
status_code=status.HTTP_200_OK,
930+
content=response_formatter.buildSuccessResponse({'export_url': signed_url}),
931+
)
932+
933+
721934
@datasource_router.delete('/v1/{datasource_id}/dynamic-queries/{query_id}')
722935
@inject
723936
async def delete_dynamic_query(

wavefront/server/modules/plugins_module/plugins_module/services/datasource_services.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def check_is_valid_resource(resource_id: str) -> bool:
5353
'rf_parsed_data_object',
5454
'rf_gold_data_object',
5555
'rf_gold_item_details',
56+
'rf_gold_auditor_data',
5657
]:
5758
return True
5859
return False

wavefront/server/modules/plugins_module/plugins_module/utils/helper.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,25 @@ def generate_cache_key(
5353
return f'dynamic_query:{hash_digest}'
5454

5555

56+
def generate_export_filename_hash(
57+
filter: str = None,
58+
limit: int = None,
59+
offset: int = None,
60+
params: dict[str, str] = None,
61+
rls_filter_str: str = None,
62+
) -> str:
63+
"""Generate a short hash for export filename from $filter, limit, offset, dynamic_query params, and RLS scope."""
64+
key_dict = {
65+
'filter': filter,
66+
'limit': limit,
67+
'offset': offset,
68+
'params': params or {},
69+
'rls': rls_filter_str,
70+
}
71+
key_json = json.dumps(key_dict, sort_keys=True, separators=(',', ':'))
72+
return hashlib.md5(key_json.encode()).hexdigest()
73+
74+
5675
def validate_yaml_query(yaml_query: dict) -> bool:
5776
"""
5877
Validate the structure of a dynamic query YAML file.

wavefront/server/packages/flo_cloud/flo_cloud/_types/cloud_storage.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import List, Tuple, Optional
2+
from typing import List, Tuple, Optional, IO, ContextManager
33

44

55
class CloudStorageHandler(ABC):
@@ -103,3 +103,16 @@ def list_files(
103103
Exception: If listing fails
104104
"""
105105
pass
106+
107+
@abstractmethod
108+
def open_text_writer(
109+
self, bucket_name: str, key: str, content_type: Optional[str] = None
110+
) -> ContextManager[IO[str]]:
111+
"""
112+
Open a text writer stream to cloud storage for incremental writes.
113+
114+
Implementations should return a context manager that yields a file-like
115+
object opened in text mode. Data written to this object must be
116+
uploaded to the underlying storage when the context manager exits.
117+
"""
118+
pass

wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from itertools import islice
22
import boto3
33
import io
4-
from typing import Optional, List, Tuple
4+
from typing import Optional, List, Tuple, IO, ContextManager
5+
from contextlib import contextmanager
56
from botocore.exceptions import ClientError
67
from .._types import CloudStorageHandler
78
from ..exceptions import CloudStorageFileNotFoundError
@@ -201,3 +202,27 @@ def delete_file(self, bucket_name: str, file_path: str) -> None:
201202
self.s3_client.delete_object(Bucket=bucket_name, Key=file_path)
202203
except Exception as e:
203204
raise Exception(f'Error deleting file from S3: {str(e)}')
205+
206+
def open_text_writer(
207+
self, bucket_name: str, key: str, content_type: Optional[str] = None
208+
) -> ContextManager[IO[str]]:
209+
"""
210+
Open a text-mode writer for S3 uploads.
211+
212+
Since boto3 does not provide a native streaming text writer API like
213+
GCS, this implementation buffers content in memory and uploads it on
214+
context exit. This keeps a consistent interface with GCS, while still
215+
allowing incremental CSV writing logic to be shared.
216+
"""
217+
218+
@contextmanager
219+
def _writer() -> IO[str]:
220+
buffer = io.StringIO()
221+
try:
222+
yield buffer
223+
data = buffer.getvalue().encode('utf-8')
224+
self.save_large_file(data, bucket_name, key, content_type)
225+
finally:
226+
buffer.close()
227+
228+
return _writer()

wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from io import BytesIO
2-
from typing import Union, List, Tuple, Optional
2+
from typing import Union, List, Tuple, Optional, IO, ContextManager
33
from .aws.s3 import S3Storage
44
from .gcp.gcs import GCSStorage
55
from ._types import CloudStorageHandler, CloudProvider
@@ -187,3 +187,14 @@ def delete_file(self, bucket_name: str, file_path: str) -> None:
187187
file_path: Path to the file in bucket
188188
"""
189189
return self.handler.delete_file(bucket_name, file_path)
190+
191+
def open_text_writer(
192+
self, bucket_name: str, key: str, content_type: Optional[str] = None
193+
) -> ContextManager[IO[str]]:
194+
"""
195+
Open a text-mode writer to cloud storage for incremental writes.
196+
197+
For GCS, this uses the native streaming blob.open API.
198+
For S3, this buffers content in memory and uploads on close.
199+
"""
200+
return self.handler.open_text_writer(bucket_name, key, content_type)

wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from itertools import islice
44
from google.cloud import storage
55
from google.cloud.exceptions import NotFound
6-
from typing import Optional, List, Tuple
6+
from typing import Optional, List, Tuple, IO, ContextManager
77
from .._types import CloudStorageHandler
88
from ..exceptions import CloudStorageFileNotFoundError
99
import re
@@ -240,3 +240,21 @@ def delete_file(self, bucket_name: str, file_path: str) -> None:
240240
blob.delete()
241241
except Exception as e:
242242
raise Exception(f'Error deleting file from GCS: {str(e)}')
243+
244+
def open_text_writer(
245+
self, bucket_name: str, key: str, content_type: Optional[str] = None
246+
) -> ContextManager[IO[str]]:
247+
"""
248+
Open a text-mode writer to a GCS object using the native blob.open API.
249+
250+
Returns a context manager yielding a file-like object. Data written
251+
to this object is streamed directly to GCS.
252+
"""
253+
if not bucket_name:
254+
raise ValueError('bucket_name cannot be None or empty')
255+
if not key:
256+
raise ValueError('key cannot be None or empty')
257+
258+
bucket = self.client.bucket(bucket_name)
259+
blob = bucket.blob(key)
260+
return blob.open('wt', content_type=content_type)

0 commit comments

Comments
 (0)