Skip to content
13 changes: 6 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ repos:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- repo: https://github.com/psf/black
rev: 22.3.0
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.14
hooks:
- id: black
- repo: https://github.com/pycqa/flake8
rev: 3.9.2 # version-scanner: ignore
hooks:
- id: flake8
- id: ruff
args: [--select, I, --fix, --line-length=88, --target-version=py310]
- id: ruff-format
args: [--line-length=88, --target-version=py310]
16 changes: 13 additions & 3 deletions packages/bigframes/bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import threading
import traceback
import warnings
from typing import TYPE_CHECKING, Callable, Optional, TypeVar
from typing import TYPE_CHECKING, Callable, Iterable, Optional, TypeVar

import google.auth.exceptions

Expand Down Expand Up @@ -124,12 +124,22 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
return func_(get_global_session(), *args, **kwargs)


def execution_history() -> "bigframes.session._ExecutionHistory":
def execution_history(
*,
events: Optional[Iterable[bigframes.core.events.Event]] = None,
job_ids: Optional[Iterable[str]] = None,
filter_by_cell: bool = True,
) -> "bigframes.session._ExecutionHistory":
import pandas # noqa: F401

import bigframes.session

return with_default_session(bigframes.session.Session.execution_history)
return with_default_session(
bigframes.session.Session.execution_history,
events=events,
job_ids=job_ids,
filter_by_cell=filter_by_cell,
)


class _GlobalSessionContext:
Expand Down
6 changes: 6 additions & 0 deletions packages/bigframes/bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def _try_read_gbq_colab_sessionless_dry_run(
def _read_gbq_colab( # type: ignore[overload-overlap]
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[False] = ...,
) -> bigframes.dataframe.DataFrame: ...
Expand All @@ -309,6 +310,7 @@ def _read_gbq_colab( # type: ignore[overload-overlap]
def _read_gbq_colab(
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
Expand All @@ -317,6 +319,7 @@ def _read_gbq_colab(
def _read_gbq_colab(
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
) -> bigframes.dataframe.DataFrame | pandas.Series:
Expand All @@ -328,6 +331,8 @@ def _read_gbq_colab(
Args:
query_or_table (str):
SQL query or table ID (table ID not yet supported).
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
Callback to receive query execution events.
pyformat_args (Optional[Dict[str, Any]]):
Parameters to format into the query string.
dry_run (bool):
Expand Down Expand Up @@ -379,6 +384,7 @@ def _read_gbq_colab(
return global_session.with_default_session(
bigframes.session.Session._read_gbq_colab,
query_or_table,
callback=callback,
pyformat_args=pyformat_args,
dry_run=dry_run,
)
Expand Down
119 changes: 106 additions & 13 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@
class _ExecutionHistory:
def __init__(self, jobs: list[dict]):
self._df = pandas.DataFrame(jobs)
if self._df.empty:
self._df = pandas.DataFrame(
columns=[
"job_id",
"query_id",
"job_type",
"status",
"query",
"total_bytes_processed",
"job_url",
]
)

def to_dataframe(self) -> pandas.DataFrame:
"""Returns the execution history as a pandas DataFrame."""
Expand Down Expand Up @@ -199,9 +211,10 @@ def __init__(
self._clients_provider = clients_provider
self._location = context.location or "US"
else:
credentials, project = (
bigframes._config.auth.resolve_credentials_and_project(context)
)
(
credentials,
project,
) = bigframes._config.auth.resolve_credentials_and_project(context)
if context.location is None:
with bigquery.Client(
project=project,
Expand Down Expand Up @@ -430,12 +443,82 @@ def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._metrics.slot_millis

def execution_history(self) -> _ExecutionHistory:
def execution_history(
self,
*,
events: Optional[Iterable[bigframes.core.events.Event]] = None,
job_ids: Optional[Iterable[str]] = None,
filter_by_cell: bool = True,
) -> _ExecutionHistory:
"""Returns the history of executions initiated by BigFrames in the current session.

Use `.to_dataframe()` on the result to get a pandas DataFrame.

Args:
events (Iterable[Event], optional):
Filter execution history to only include jobs associated with the given events.
job_ids (Iterable[str], optional):
Filter execution history to only include jobs matching the given job IDs.
filter_by_cell (bool, optional):
If True and running in Colab/Jupyter, automatically filter history to only include
jobs executed within the current cell. Defaults to True.
"""
return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])
jobs = [job.__dict__ for job in self._metrics.jobs]

if events is not None:
event_job_ids = {
getattr(event, "job_id", None)
for event in events
if getattr(event, "job_id", None) is not None
}
event_query_ids = {
getattr(event, "query_id", None)
for event in events
if getattr(event, "query_id", None) is not None
}
jobs = [
job
for job in jobs
if (
job.get("job_id") is not None and job.get("job_id") in event_job_ids
)
or (
job.get("query_id") is not None
and job.get("query_id") in event_query_ids
)
]

elif job_ids is not None:
target_job_ids = set(job_ids)
jobs = [
job
for job in jobs
if (
job.get("job_id") is not None
and job.get("job_id") in target_job_ids
)
or (
job.get("query_id") is not None
and job.get("query_id") in target_job_ids
)
]

elif filter_by_cell:
try:
import IPython

ipy = IPython.get_ipython()
if ipy is not None and hasattr(ipy, "execution_count"):
current_count = ipy.execution_count
jobs = [
job
for job in jobs
if job.get("cell_execution_count") == current_count
]
except (ImportError, NameError):
pass

return _ExecutionHistory(jobs)

@property
def _allows_ambiguity(self) -> bool:
Expand Down Expand Up @@ -584,6 +667,7 @@ def _read_gbq_colab(
self,
query: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[False] = ...,
) -> dataframe.DataFrame: ...
Expand All @@ -593,6 +677,7 @@ def _read_gbq_colab(
self,
query: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
Expand All @@ -601,8 +686,8 @@ def _read_gbq_colab(
def _read_gbq_colab(
self,
query: str,
# TODO: Add a callback parameter that takes some kind of Event object.
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
) -> Union[dataframe.DataFrame, pandas.Series]:
Expand All @@ -615,6 +700,8 @@ def _read_gbq_colab(
query (str):
A SQL query string to execute. Results (if any) are turned into
a DataFrame.
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
Callback to receive query execution events.
pyformat_args (dict):
A dictionary of potential variables to replace in ``query``.
Note: strings are _not_ escaped. Use query parameters for these,
Expand All @@ -634,13 +721,19 @@ def _read_gbq_colab(
dry_run=dry_run,
)

return self._loader.read_gbq_query(
query=query,
index_col=bigframes.enums.DefaultIndexKind.NULL,
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
allow_large_results=allow_large_results,
)
def _run_query():
return self._loader.read_gbq_query(
query=query,
index_col=bigframes.enums.DefaultIndexKind.NULL,
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
allow_large_results=allow_large_results,
)

if callback is not None:
with self._publisher.subscribe(callback):
return _run_query()
return _run_query()

@overload
def read_gbq_query( # type: ignore[overload-overlap]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def create_job_configs_labels(
) -> Dict[str, str]:
if job_configs_labels is None:
job_configs_labels = {}
else:
job_configs_labels = dict(job_configs_labels)

if api_methods and "bigframes-api" not in job_configs_labels:
job_configs_labels["bigframes-api"] = api_methods[0]
Expand Down
2 changes: 1 addition & 1 deletion packages/bigframes/bigframes/session/bigquery_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def close(self):
# Assume this is being called in the user thread, so we can access
# this thread-local config.
job_config=bigquery.QueryJobConfig(
labels=bigframes.options.compute.extra_query_labels
labels=dict(bigframes.options.compute.extra_query_labels)
),
location=self.location,
project=None,
Expand Down
34 changes: 34 additions & 0 deletions packages/bigframes/bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class JobMetadata:
input_bytes: Optional[int] = None
output_rows: Optional[int] = None
source_format: Optional[str] = None
cell_execution_count: Optional[int] = None

@classmethod
def from_job(
Expand All @@ -71,6 +72,16 @@ def from_job(
f"{job_id}&page=queryresults"
)

cell_execution_count = None
try:
import IPython

ipy = IPython.get_ipython()
if ipy is not None and hasattr(ipy, "execution_count"):
cell_execution_count = ipy.execution_count
except (ImportError, NameError):
pass

metadata = cls(
job_id=query_job.job_id,
location=query_job.location,
Expand All @@ -84,6 +95,7 @@ def from_job(
error_result=query_job.error_result,
query=query_text,
job_url=job_url,
cell_execution_count=cell_execution_count,
)
if isinstance(query_job, QueryJob):
metadata.cached = getattr(query_job, "cache_hit", None)
Expand Down Expand Up @@ -132,6 +144,16 @@ def from_row_iterator(
f"project={project}&j=bq:{location}:{job_id}&page=queryresults"
)

cell_execution_count = None
try:
import IPython

ipy = IPython.get_ipython()
if ipy is not None and hasattr(ipy, "execution_count"):
cell_execution_count = ipy.execution_count
except (ImportError, NameError):
pass

# fmt: off
return cls(
job_id=job_id,
Expand All @@ -151,6 +173,7 @@ def from_row_iterator(
cached=getattr(row_iterator, "cache_hit", None),
query=query_text,
job_url=job_url,
cell_execution_count=cell_execution_count,
)
# fmt: on

Expand Down Expand Up @@ -291,10 +314,21 @@ def on_event(self, envelope: Any):
bytes_processed = event.result.total_bytes_processed or 0
self.bytes_processed += bytes_processed

cell_execution_count = None
try:
import IPython

ipy = IPython.get_ipython()
if ipy is not None and hasattr(ipy, "execution_count"):
cell_execution_count = ipy.execution_count
except (ImportError, NameError):
pass

metadata = JobMetadata(
job_type="polars",
status="DONE",
total_bytes_processed=bytes_processed,
cell_execution_count=cell_execution_count,
)
self.jobs.append(metadata)

Expand Down
Loading
Loading