定義 Operator 額外連結¶
如果您想為 Operator 新增更多連結,可以透過外掛或 Provider 包定義它們。額外連結將顯示在 Grid 檢視的任務詳情頁中。
以下程式碼展示瞭如何透過外掛為 Operator 新增額外連結
from airflow.sdk import BaseOperator
from airflow.sdk import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
class GoogleLink(BaseOperatorLink):
name = "Google"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
return "https://www.google.com"
class MyFirstOperator(BaseOperator):
operator_extra_links = (GoogleLink(),)
def __init__(self, **kwargs):
super().__init__(**kwargs)
def execute(self, context):
self.log.info("Hello World!")
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
GoogleLink(),
]
注意
Operator 額外連結需要透過 Airflow 外掛或自定義 Airflow Provider 註冊才能生效。
您還可以新增一個全域性 Operator 額外連結,該連結將透過 Airflow 外掛或 Airflow Provider 對所有 Operator 可用。您可以在外掛介面和Provider中瞭解更多資訊。
您可以在額外連結中檢視社群管理的 Provider 提供的所有額外連結。
新增或覆蓋現有 Operator 的連結¶
您還可以透過 Airflow 外掛或自定義 Provider 為現有 Operator 新增(或覆蓋)額外連結。
例如,以下 Airflow 外掛將為所有使用 GCSToS3Operator Operator 的任務新增 Operator 連結。
為現有 Operator 新增 Operator 連結 plugins/extra_link.py
from airflow.sdk import BaseOperator, BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
class S3LogLink(BaseOperatorLink):
name = "S3"
# Add list of all the operators to which you want to add this OperatorLinks
# Example: operators = [GCSToS3Operator, GCSToBigQueryOperator]
operators = [GCSToS3Operator]
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
# Invalid bucket name because upper case letters and underscores are used
# This will not be a valid bucket in any region
bucket_name = "Invalid_Bucket_Name"
return "https://s3.amazonaws.com/airflow-logs/{bucket_name}/{dag_id}/{task_id}/{run_id}".format(
bucket_name=bucket_name,
dag_id=operator.dag_id,
task_id=operator.task_id,
run_id=ti_key.run_id,
)
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
S3LogLink(),
]
覆蓋現有 Operator 的 Operator 連結:
也可以透過外掛替換 Operator 的內建連結。例如,BigQueryExecuteQueryOperator 包含一個指向 Google Cloud Console 的連結,但如果我們想更改該連結,我們可以這樣做:
from airflow.sdk import BaseOperator, BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
# Change from https to http just to display the override
BIGQUERY_JOB_DETAILS_LINK_FMT = "http://console.cloud.google.com/bigquery?j={job_id}"
class BigQueryDatasetLink(BaseGoogleLink):
"""
Helper class for constructing BigQuery Dataset Link.
"""
name = "BigQuery Dataset"
key = "bigquery_dataset"
format_str = BIGQUERY_DATASET_LINK
@staticmethod
def persist(
context: Context,
task_instance: BaseOperator,
dataset_id: str,
project_id: str,
):
task_instance.xcom_push(
context,
key=BigQueryDatasetLink.key,
value={"dataset_id": dataset_id, "project_id": project_id},
)
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
BigQueryDatasetLink(),
]
透過 Provider 新增 Operator 連結
正如 Provider 中所解釋的,當您建立自己的 Airflow Provider 時,可以指定提供額外連結功能的 Operator 列表。這透過在儲存在 Provider 包元資料中的 provider-info 資訊中包含 Operator 類名來實現。
Provider-info 字典中所需的示例元資料(這是 apache-airflow-providers-google Provider 當前返回的元資料的一部分)
extra-links:
- airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink
- airflow.providers.google.cloud.links.bigquery.BigQueryTableLink
您可以包含任意數量帶有額外連結的 Operator。