diff --git a/bigframes/__init__.py b/bigframes/__init__.py
index 9b0d6bb00c..ae7bbe8f81 100644
--- a/bigframes/__init__.py
+++ b/bigframes/__init__.py
@@ -28,6 +28,7 @@
from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402
from bigframes.core.global_session import ( # noqa: E402
close_session,
+ execution_history,
get_global_session,
)
import bigframes.enums as enums # noqa: E402
@@ -57,6 +58,7 @@ def load_ipython_extension(ipython):
"BigQueryOptions",
"get_global_session",
"close_session",
+ "execution_history",
"enums",
"exceptions",
"connect",
diff --git a/bigframes/core/global_session.py b/bigframes/core/global_session.py
index b055bdb854..1061cae525 100644
--- a/bigframes/core/global_session.py
+++ b/bigframes/core/global_session.py
@@ -26,6 +26,8 @@
import bigframes.exceptions as bfe
if TYPE_CHECKING:
+ import pandas
+
import bigframes.session
_global_session: Optional[bigframes.session.Session] = None
@@ -124,6 +126,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
return func_(get_global_session(), *args, **kwargs)
+def execution_history() -> "pandas.DataFrame":
+ import pandas # noqa: F401
+
+ import bigframes.session
+
+ return with_default_session(bigframes.session.Session.execution_history)
+
+
class _GlobalSessionContext:
"""
Context manager for testing that sets global session.
diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py
index 75be3022d7..e7cb68b480 100644
--- a/bigframes/session/__init__.py
+++ b/bigframes/session/__init__.py
@@ -109,6 +109,39 @@
logger = logging.getLogger(__name__)
+class _ExecutionHistory(pandas.DataFrame):
+ @property
+ def _constructor(self):
+ return _ExecutionHistory
+
+ def _repr_html_(self) -> str | None:
+ try:
+ import bigframes.formatting_helpers as formatter
+
+ if self.empty:
+ return "
No executions found.
"
+
+ cols = ["job_id", "status", "total_bytes_processed", "job_url"]
+ df_display = self[cols].copy()
+ df_display["total_bytes_processed"] = df_display[
+ "total_bytes_processed"
+ ].apply(formatter.get_formatted_bytes)
+
+ def format_url(url):
+ return f'Open Job' if url else ""
+
+ df_display["job_url"] = df_display["job_url"].apply(format_url)
+
+ # Rename job_id to query_id to match user expectations
+ df_display = df_display.rename(columns={"job_id": "query_id"})
+
+ compact_html = df_display.to_html(escape=False, index=False)
+
+ return compact_html
+ except Exception:
+ return super()._repr_html_() # type: ignore
+
+
@log_adapter.class_logger
class Session(
third_party_pandas_gbq.GBQIOMixin,
@@ -371,6 +404,10 @@ def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._metrics.slot_millis
+ def execution_history(self) -> pandas.DataFrame:
+ """Returns a list of underlying BigQuery executions initiated by BigFrames in the current session."""
+ return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])
+
@property
def _allows_ambiguity(self) -> bool:
return self._allow_ambiguity
diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py
index 7b5d1bcaf1..3975f84b7c 100644
--- a/bigframes/session/loader.py
+++ b/bigframes/session/loader.py
@@ -49,6 +49,8 @@
from google.cloud import bigquery_storage_v1
import google.cloud.bigquery
import google.cloud.bigquery as bigquery
+from google.cloud.bigquery.job.load import LoadJob
+from google.cloud.bigquery.job.query import QueryJob
import google.cloud.bigquery.table
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
import pandas
@@ -605,6 +607,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
else:
job.result()
+ if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)):
+ self._metrics.count_job_stats(query_job=job)
+
@overload
def read_gbq_table( # type: ignore[overload-overlap]
self,
diff --git a/bigframes/session/metrics.py b/bigframes/session/metrics.py
index 8d43a83d73..cd499e87e0 100644
--- a/bigframes/session/metrics.py
+++ b/bigframes/session/metrics.py
@@ -15,16 +15,131 @@
from __future__ import annotations
import dataclasses
+import datetime
import os
-from typing import Optional, Tuple
+from typing import Any, Mapping, Optional, Tuple, Union
import google.cloud.bigquery as bigquery
-import google.cloud.bigquery.job as bq_job
+from google.cloud.bigquery.job.load import LoadJob
+from google.cloud.bigquery.job.query import QueryJob
import google.cloud.bigquery.table as bq_table
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
+@dataclasses.dataclass
+class JobMetadata:
+ job_id: Optional[str] = None
+ query_id: Optional[str] = None
+ location: Optional[str] = None
+ project: Optional[str] = None
+ creation_time: Optional[datetime.datetime] = None
+ start_time: Optional[datetime.datetime] = None
+ end_time: Optional[datetime.datetime] = None
+ duration_seconds: Optional[float] = None
+ status: Optional[str] = None
+ total_bytes_processed: Optional[int] = None
+ total_slot_ms: Optional[int] = None
+ job_type: Optional[str] = None
+ error_result: Optional[Mapping[str, Any]] = None
+ cached: Optional[bool] = None
+ job_url: Optional[str] = None
+ query: Optional[str] = None
+ destination_table: Optional[str] = None
+ source_uris: Optional[list[str]] = None
+ input_files: Optional[int] = None
+ input_bytes: Optional[int] = None
+ output_rows: Optional[int] = None
+ source_format: Optional[str] = None
+
+ @classmethod
+ def from_job(
+ cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None
+ ) -> "JobMetadata":
+ query_text = getattr(query_job, "query", None)
+ if query_text and len(query_text) > 1024:
+ query_text = query_text[:1021] + "..."
+
+ job_id = getattr(query_job, "job_id", None)
+ job_url = None
+ if job_id:
+ job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults"
+
+ metadata = cls(
+ job_id=query_job.job_id,
+ location=query_job.location,
+ project=query_job.project,
+ creation_time=query_job.created,
+ start_time=query_job.started,
+ end_time=query_job.ended,
+ duration_seconds=exec_seconds,
+ status=query_job.state,
+ job_type=query_job.job_type,
+ error_result=query_job.error_result,
+ query=query_text,
+ job_url=job_url,
+ )
+ if isinstance(query_job, QueryJob):
+ metadata.cached = getattr(query_job, "cache_hit", None)
+ metadata.destination_table = (
+ str(query_job.destination) if query_job.destination else None
+ )
+ metadata.total_bytes_processed = getattr(
+ query_job, "total_bytes_processed", None
+ )
+ metadata.total_slot_ms = getattr(query_job, "slot_millis", None)
+ elif isinstance(query_job, LoadJob):
+ metadata.output_rows = getattr(query_job, "output_rows", None)
+ metadata.input_files = getattr(query_job, "input_files", None)
+ metadata.input_bytes = getattr(query_job, "input_bytes", None)
+ metadata.destination_table = (
+ str(query_job.destination)
+ if getattr(query_job, "destination", None)
+ else None
+ )
+ if getattr(query_job, "source_uris", None):
+ metadata.source_uris = list(query_job.source_uris)
+ if query_job.configuration and hasattr(
+ query_job.configuration, "source_format"
+ ):
+ metadata.source_format = query_job.configuration.source_format
+
+ return metadata
+
+ @classmethod
+ def from_row_iterator(
+ cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None
+ ) -> "JobMetadata":
+ query_text = getattr(row_iterator, "query", None)
+ if query_text and len(query_text) > 1024:
+ query_text = query_text[:1021] + "..."
+
+ job_id = getattr(row_iterator, "job_id", None)
+ job_url = None
+ if job_id:
+ project = getattr(row_iterator, "project", "")
+ location = getattr(row_iterator, "location", "")
+ job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults"
+
+ return cls(
+ job_id=job_id,
+ query_id=getattr(row_iterator, "query_id", None),
+ location=getattr(row_iterator, "location", None),
+ project=getattr(row_iterator, "project", None),
+ creation_time=getattr(row_iterator, "created", None),
+ start_time=getattr(row_iterator, "started", None),
+ end_time=getattr(row_iterator, "ended", None),
+ duration_seconds=exec_seconds,
+ status="DONE",
+ total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None),
+ total_slot_ms=getattr(row_iterator, "slot_millis", None),
+ job_type="query",
+ cached=getattr(row_iterator, "cache_hit", None),
+ query=query_text,
+ job_url=job_url,
+ )
+
+
@dataclasses.dataclass
class ExecutionMetrics:
execution_count: int = 0
@@ -32,10 +147,11 @@ class ExecutionMetrics:
bytes_processed: int = 0
execution_secs: float = 0
query_char_count: int = 0
+ jobs: list[JobMetadata] = dataclasses.field(default_factory=list)
def count_job_stats(
self,
- query_job: Optional[bq_job.QueryJob] = None,
+ query_job: Optional[Union[QueryJob, LoadJob]] = None,
row_iterator: Optional[bq_table.RowIterator] = None,
):
if query_job is None:
@@ -57,21 +173,64 @@ def count_job_stats(
self.slot_millis += slot_millis
self.execution_secs += exec_seconds
- elif query_job.configuration.dry_run:
- query_char_count = len(query_job.query)
+ self.jobs.append(
+ JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds)
+ )
+
+ elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run:
+ query_char_count = len(getattr(query_job, "query", ""))
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
slot_millis = 0
exec_seconds = 0.0
- elif (stats := get_performance_stats(query_job)) is not None:
- query_char_count, bytes_processed, slot_millis, exec_seconds = stats
+ elif isinstance(query_job, bigquery.QueryJob):
+ if (stats := get_performance_stats(query_job)) is not None:
+ query_char_count, bytes_processed, slot_millis, exec_seconds = stats
+ self.execution_count += 1
+ self.query_char_count += query_char_count or 0
+ self.bytes_processed += bytes_processed or 0
+ self.slot_millis += slot_millis or 0
+ self.execution_secs += exec_seconds or 0
+
+ metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds)
+ metadata.total_bytes_processed = bytes_processed
+ metadata.total_slot_ms = slot_millis
+ self.jobs.append(metadata)
+
+ else:
self.execution_count += 1
- self.query_char_count += query_char_count or 0
- self.bytes_processed += bytes_processed or 0
- self.slot_millis += slot_millis or 0
- self.execution_secs += exec_seconds or 0
+ duration = (
+ (query_job.ended - query_job.created).total_seconds()
+ if query_job.ended and query_job.created
+ else None
+ )
+ self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration))
+
+ # For pytest runs only, log information about the query job
+ # to a file in order to create a performance report.
+ if (
+ isinstance(query_job, bigquery.QueryJob)
+ and not query_job.configuration.dry_run
+ ):
+ stats = get_performance_stats(query_job)
+ if stats:
+ write_stats_to_disk(
+ query_char_count=stats[0],
+ bytes_processed=stats[1],
+ slot_millis=stats[2],
+ exec_seconds=stats[3],
+ )
+ elif row_iterator is not None:
+ bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
+ query_char_count = len(getattr(row_iterator, "query", "") or "")
+ slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
+ created = getattr(row_iterator, "created", None)
+ ended = getattr(row_iterator, "ended", None)
+ exec_seconds = (
+ (ended - created).total_seconds() if created and ended else 0.0
+ )
write_stats_to_disk(
query_char_count=query_char_count,
bytes_processed=bytes_processed,
@@ -79,20 +238,6 @@ def count_job_stats(
exec_seconds=exec_seconds,
)
- else:
- # TODO(tswast): Pass None after making benchmark publishing robust to missing data.
- bytes_processed = 0
- query_char_count = 0
- slot_millis = 0
- exec_seconds = 0
-
- write_stats_to_disk(
- query_char_count=query_char_count,
- bytes_processed=bytes_processed,
- slot_millis=slot_millis,
- exec_seconds=exec_seconds,
- )
-
def get_performance_stats(
query_job: bigquery.QueryJob,
diff --git a/tests/unit/test_pandas.py b/tests/unit/test_pandas.py
index e1e713697d..a79d7a059b 100644
--- a/tests/unit/test_pandas.py
+++ b/tests/unit/test_pandas.py
@@ -37,6 +37,8 @@ def all_session_methods():
session_attributes.remove("close")
# streaming isn't in pandas
session_attributes.remove("read_gbq_table_streaming")
+ # execution_history is in base namespace, not pandas
+ session_attributes.remove("execution_history")
for attribute in sorted(session_attributes):
session_method = getattr(bigframes.session.Session, attribute)