TaskFlow¶
在 2.0 版本中新增。
如果你主要使用純 Python 程式碼而不是 Operator 來編寫大部分 DAG,那麼 TaskFlow API 將使你更容易編寫簡潔的 DAG,無需額外的模板程式碼,只需使用 @task 裝飾器。
TaskFlow 負責使用 XCom 在你的任務之間傳遞輸入和輸出,並自動計算依賴關係 - 當你在 DAG 檔案中呼叫 TaskFlow 函式時,它不會立即執行,而是會返回一個代表結果 XCom 的物件(一個 XComArg),然後你可以將此物件用作下游任務或 Operator 的輸入。例如
from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator
@task
def get_ip():
return my_ip_service.get_main_ip()
@task(multiple_outputs=True)
def compose_email(external_ip):
return {
'subject':f'Server connected from {external_ip}',
'body': f'Your server executing Airflow is connected from the external IP {external_ip}<br>'
}
email_info = compose_email(get_ip())
EmailOperator(
task_id='send_email_notification',
to='example@example.com',
subject=email_info['subject'],
html_content=email_info['body']
)
這裡有三個任務 - get_ip、compose_email 和 send_email_notification。
前兩個使用 TaskFlow 宣告,並自動將 get_ip 的返回值傳遞給 compose_email,這不僅連結了 XCom,還自動宣告 compose_email 是 get_ip 的下游任務。
send_email_notification 是一個更傳統的 Operator,但即使它也可以使用 compose_email 的返回值來設定其引數,並且同樣可以自動判斷出它是 compose_email 的下游任務。
你也可以使用一個普通值或變數來呼叫 TaskFlow 函式 - 例如,這將如你預期地工作(當然,在 DAG 執行之前,任務內部的程式碼不會執行 - name 值會一直作為任務引數持久化直到那時)。
@task
def hello_name(name: str):
print(f'Hello {name}!')
hello_name('Airflow users')
如果你想了解更多關於使用 TaskFlow 的資訊,請查閱 TaskFlow 教程。
上下文¶
你可以透過將 Airflow 上下文變數 作為關鍵字引數新增來訪問它們,如下例所示
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(task_instance: TaskInstance, dag_run: DagRun): print(f"Run ID: {task_instance.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {task_instance.duration}") # Duration: 0.972019 print(f"DAG Run queued at: {dag_run.queued_at}") # 2023-08-10 00:00:01+02:20
或者,你可以在任務的簽名中新增 **kwargs,所有 Airflow 上下文變數都將在 kwargs 字典中可訪問。
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(**kwargs): ti: TaskInstance = kwargs["task_instance"] print(f"Run ID: {ti.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {ti.duration}") # Duration: 0.972019 dr: DagRun = kwargs["dag_run"] print(f"DAG Run queued at: {dr.queued_at}") # 2023-08-10 00:00:01+02:20
有關上下文變數的完整列表,請參閱 上下文變數。
日誌記錄¶
要在任務函式中使用日誌記錄,只需匯入並使用 Python 的日誌記錄系統即可。
logger = logging.getLogger("airflow.task")
透過這種方式建立的每一行日誌都將記錄在任務日誌中。
將任意物件作為引數傳遞¶
在 2.5.0 版本中新增。
如前所述,TaskFlow 使用 XCom 將變數傳遞給每個任務。這要求用作引數的變數必須能夠被序列化。Airflow 開箱即用地支援所有內建型別(如 int 或 str),並且支援用 @dataclass 或 @attr.define 裝飾的物件。以下示例展示瞭如何將用 @attr.define 裝飾的 Asset 與 TaskFlow 一起使用。
注意
使用 Asset 的一個額外好處是,如果它用作輸入引數,它會自動註冊為 inlet。如果任務的返回值是一個 Asset 或一個 list[Asset]],它也會自動註冊為 outlet。
import json
import pendulum
import requests
from airflow import Asset
from airflow.sdk import dag, task
SRC = Asset(
"https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json"
)
now = pendulum.now()
@dag(start_date=now, schedule="@daily", catchup=False)
def etl():
@task()
def retrieve(src: Asset) -> dict:
resp = requests.get(url=src.uri)
data = resp.json()
return data["data"]
@task()
def to_fahrenheit(temps: dict[int, dict[str, float]]) -> dict[int, float]:
ret: dict[int, float] = {}
for year, info in temps.items():
ret[year] = float(info["anomaly"]) * 1.8 + 32
return ret
@task()
def load(fahrenheit: dict[int, float]) -> Asset:
filename = "/tmp/fahrenheit.json"
s = json.dumps(fahrenheit)
f = open(filename, "w")
f.write(s)
f.close()
return Asset(f"file:///{filename}")
data = retrieve(SRC)
fahrenheit = to_fahrenheit(data)
load(fahrenheit)
etl()
自定義物件¶
你可能希望傳遞自定義物件。通常情況下,你會用 @dataclass 或 @attr.define 裝飾你的類,Airflow 會自動處理。有時你可能希望自己控制序列化。為此,請在你的類中新增 serialize() 方法和靜態方法 deserialize(data: dict, version: int)。例如
from typing import ClassVar
class MyCustom:
__version__: ClassVar[int] = 1
def __init__(self, x):
self.x = x
def serialize(self) -> dict:
return dict({"x": self.x})
@staticmethod
def deserialize(data: dict, version: int):
if version > 1:
raise TypeError(f"version > {MyCustom.version}")
return MyCustom(data["x"])
物件版本控制¶
對用於序列化的物件進行版本控制是一種良好的實踐。為此,請在你的類中新增 __version__: ClassVar[int] = <x>。Airflow 假定你的類向後相容,以便版本 2 能夠反序列化版本 1。如果你需要自定義反序列化邏輯,請確保指定了 deserialize(data: dict, version: int)。
注意
__version__ 需要指定型別且必須是 ClassVar[int]。
Sensor 與 TaskFlow API¶
在 2.5.0 版本中新增。
有關使用 TaskFlow API 編寫 Sensor 的示例,請參閱 將 TaskFlow API 與 Sensor Operator 結合使用。
歷史¶
TaskFlow API 是 Airflow 2.0 版本新增的,你可能會遇到為舊版本 Airflow 編寫的 DAG,它們使用 PythonOperator 來實現類似目標,儘管程式碼量大得多。
有關 TaskFlow API 新增和設計的更多背景資訊可以在其 Airflow 改進提案中找到 AIP-31:“TaskFlow API”:更清晰/更簡單的 DAG 定義