Skip to content
Prev Previous commit
fix(floware): use streaming write to GCS and also blocking multi-expo…
…rt per user
  • Loading branch information
vizsatiz committed Mar 6, 2026
commit f18373af7344be019d0087432b780f2fd7946f56
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,9 @@ async def execute_dynamic_query(
)


EXPORT_RATE_LIMIT_SECONDS = 120 # 2 minutes between exports per user


@datasource_router.post('/v1/{datasource_id}/dynamic-queries/{query_id}/export')
@inject
async def export_dynamic_query_csv(
Expand All @@ -770,10 +773,23 @@ async def export_dynamic_query_csv(
Provide[PluginsContainer.cloud_manager]
),
config: dict = Depends(Provide[PluginsContainer.config]),
cache_manager: CacheManager = Depends(Provide[PluginsContainer.cache_manager]),
force_fetch: int = Query(0),
):
"""Execute the dynamic query and return results as a downloadable CSV file."""
role_id, user_id, _ = get_current_user(request)

# Block multiple exports per user within a 2-minute window (Redis)
export_rate_key = f'dynamic_query_export_rate:{user_id}'
if not cache_manager.add(
export_rate_key, '1', expiry=EXPORT_RATE_LIMIT_SECONDS, nx=True
):
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content=response_formatter.buildErrorResponse(
f'Export rate limit: one export per user every {EXPORT_RATE_LIMIT_SECONDS // 60} minutes. Please try again later.'
),
)
datasource_type, datasource_config = await get_datasource_config(datasource_id)
if not datasource_config:
return JSONResponse(
Expand Down Expand Up @@ -872,14 +888,38 @@ async def export_dynamic_query_csv(

serialized_res = serialize_values(res[first_key]['result'])

# Convert rows to CSV bytes and store in bucket
csv_bytes = _serialized_rows_to_csv(serialized_res)
cloud_manager.save_small_file(
file_content=csv_bytes,
bucket_name=bucket_name,
key=file_key,
content_type='text/csv',
)
# Stream rows to CSV directly in GCS/S3 to avoid building the full CSV in memory
rows = serialized_res or []
if not isinstance(rows, list):
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=response_formatter.buildErrorResponse(
f'Unexpected dynamic query result format for query_id {query_id}, invalid rows'
),
)

if rows:
fieldnames = list(rows[0].keys())
for row in rows[1:]:
for k in row:
if k not in fieldnames:
fieldnames.append(k)
else:
fieldnames = []

def _cell_value(v):
if isinstance(v, (dict, list)):
return json.dumps(v)
return v if v is None or isinstance(v, str) else str(v)

with cloud_manager.open_text_writer(
bucket_name=bucket_name, key=file_key, content_type='text/csv'
) as f:
if fieldnames:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
for row in rows:
writer.writerow({k: _cell_value(row.get(k)) for k in fieldnames})

signed_url = cloud_manager.generate_presigned_url(
bucket_name=bucket_name, key=file_key, type='GET'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import List, Tuple, Optional
from typing import List, Tuple, Optional, IO, ContextManager


class CloudStorageHandler(ABC):
Expand Down Expand Up @@ -103,3 +103,16 @@ def list_files(
Exception: If listing fails
"""
pass

@abstractmethod
def open_text_writer(
self, bucket_name: str, key: str, content_type: Optional[str] = None
) -> ContextManager[IO[str]]:
"""
Open a text writer stream to cloud storage for incremental writes.
Implementations should return a context manager that yields a file-like
object opened in text mode. Data written to this object must be
uploaded to the underlying storage when the context manager exits.
"""
pass
27 changes: 26 additions & 1 deletion wavefront/server/packages/flo_cloud/flo_cloud/aws/s3.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from itertools import islice
import boto3
import io
from typing import Optional, List, Tuple
from typing import Optional, List, Tuple, IO, ContextManager
from contextlib import contextmanager
from botocore.exceptions import ClientError
from .._types import CloudStorageHandler
from ..exceptions import CloudStorageFileNotFoundError
Expand Down Expand Up @@ -201,3 +202,27 @@ def delete_file(self, bucket_name: str, file_path: str) -> None:
self.s3_client.delete_object(Bucket=bucket_name, Key=file_path)
except Exception as e:
raise Exception(f'Error deleting file from S3: {str(e)}')

def open_text_writer(
self, bucket_name: str, key: str, content_type: Optional[str] = None
) -> ContextManager[IO[str]]:
"""
Open a text-mode writer for S3 uploads.
Since boto3 does not provide a native streaming text writer API like
GCS, this implementation buffers content in memory and uploads it on
context exit. This keeps a consistent interface with GCS, while still
allowing incremental CSV writing logic to be shared.
"""

@contextmanager
def _writer() -> IO[str]:
buffer = io.StringIO()
try:
yield buffer
data = buffer.getvalue().encode('utf-8')
self.save_large_file(data, bucket_name, key, content_type)
finally:
buffer.close()

return _writer()
13 changes: 12 additions & 1 deletion wavefront/server/packages/flo_cloud/flo_cloud/cloud_storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from io import BytesIO
from typing import Union, List, Tuple, Optional
from typing import Union, List, Tuple, Optional, IO, ContextManager
from .aws.s3 import S3Storage
from .gcp.gcs import GCSStorage
from ._types import CloudStorageHandler, CloudProvider
Expand Down Expand Up @@ -187,3 +187,14 @@ def delete_file(self, bucket_name: str, file_path: str) -> None:
file_path: Path to the file in bucket
"""
return self.handler.delete_file(bucket_name, file_path)

def open_text_writer(
self, bucket_name: str, key: str, content_type: Optional[str] = None
) -> ContextManager[IO[str]]:
"""
Open a text-mode writer to cloud storage for incremental writes.
For GCS, this uses the native streaming blob.open API.
For S3, this buffers content in memory and uploads on close.
"""
return self.handler.open_text_writer(bucket_name, key, content_type)
20 changes: 19 additions & 1 deletion wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from itertools import islice
from google.cloud import storage
from google.cloud.exceptions import NotFound
from typing import Optional, List, Tuple
from typing import Optional, List, Tuple, IO, ContextManager
from .._types import CloudStorageHandler
from ..exceptions import CloudStorageFileNotFoundError
import re
Expand Down Expand Up @@ -240,3 +240,21 @@ def delete_file(self, bucket_name: str, file_path: str) -> None:
blob.delete()
except Exception as e:
raise Exception(f'Error deleting file from GCS: {str(e)}')

def open_text_writer(
self, bucket_name: str, key: str, content_type: Optional[str] = None
) -> ContextManager[IO[str]]:
"""
Open a text-mode writer to a GCS object using the native blob.open API.
Returns a context manager yielding a file-like object. Data written
to this object is streamed directly to GCS.
"""
if not bucket_name:
raise ValueError('bucket_name cannot be None or empty')
if not key:
raise ValueError('key cannot be None or empty')

bucket = self.client.bucket(bucket_name)
blob = bucket.blob(key)
return blob.open('wt', content_type=content_type)
Loading