Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,7 @@ extra-links:
- airflow.providers.google.cloud.links.bigquery.BigQueryTableLink
- airflow.providers.google.cloud.links.bigquery.BigQueryJobDetailLink
- airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink
- airflow.providers.google.cloud.links.cloud_run.CloudRunJobExecutionLink
- airflow.providers.google.cloud.links.compute.ComputeInstanceDetailsLink
- airflow.providers.google.cloud.links.compute.ComputeInstanceTemplateDetailsLink
- airflow.providers.google.cloud.links.compute.ComputeInstanceGroupManagerDetailsLink
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -14,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Cloud Run links."""
from __future__ import annotations

from typing import TYPE_CHECKING
Expand All @@ -31,6 +33,38 @@
else:
from airflow.models.xcom import XCom # type: ignore[no-redef]

CLOUD_RUN_BASE_LINK = "/run"
CLOUD_RUN_JOB_EXECUTION_LINK = (
CLOUD_RUN_BASE_LINK + "/jobs/execution/{region}/{execution_name}"
"?project={project_id}"
)


class CloudRunJobExecutionLink(BaseGoogleLink):
"""Helper class for constructing Cloud Run Job Execution Link."""

name = "Cloud Run Job Execution"
key = "cloud_run_job_execution"
format_str = CLOUD_RUN_JOB_EXECUTION_LINK

@staticmethod
def persist(
context: Context,
task_instance: BaseOperator,
project_id: str,
region: str,
execution_name: str,
):
task_instance.xcom_push(
context,
key=CloudRunJobExecutionLink.key,
value={
"project_id": project_id,
"region": region,
"execution_name": execution_name,
},
)


class CloudRunJobLoggingLink(BaseGoogleLink):
"""Helper class for constructing Cloud Run Job Logging link."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.cloud_run import CloudRunHook, CloudRunServiceHook
from airflow.providers.google.cloud.links.cloud_run import CloudRunJobLoggingLink
from airflow.providers.google.cloud.links.cloud_run import CloudRunJobExecutionLink, CloudRunJobLoggingLink
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.cloud.triggers.cloud_run import CloudRunJobFinishedTrigger, RunJobStatus

Expand Down Expand Up @@ -266,7 +266,7 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
:param deferrable: Run the operator in deferrable mode.
"""

operator_extra_links = (CloudRunJobLoggingLink(),)
operator_extra_links = (CloudRunJobExecutionLink(), CloudRunJobLoggingLink())
template_fields = (
"project_id",
"region",
Expand Down Expand Up @@ -307,6 +307,7 @@ def execute(self, context: Context):
hook: CloudRunHook = CloudRunHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)

self.operation = hook.execute_job(
region=self.region, project_id=self.project_id, job_name=self.job_name, overrides=self.overrides
)
Expand All @@ -321,6 +322,15 @@ def execute(self, context: Context):
log_uri=self.operation.metadata.log_uri,
)

if self.operation.metadata.name:
CloudRunJobExecutionLink.persist(
context=context,
task_instance=self,
project_id=self.project_id,
region=self.region,
execution_name=self.operation.metadata.name.split("/")[-1],
)

if not self.deferrable:
result: Execution = self._wait_for_operation(self.operation)
self._fail_if_execution_failed(result)
Expand Down
Loading