Azure Data Factory 運算子

Azure Data Factory 是 Azure 的雲 ETL 服務,用於橫向擴充套件的無伺服器資料整合和資料轉換。它提供一個無需程式碼的 UI,用於直觀創作以及單窗格監控和管理。

AzureDataFactoryRunPipelineOperator

使用 AzureDataFactoryRunPipelineOperator 在資料工廠中執行管道。預設情況下,運算子會定期檢查已執行管道的狀態,並在狀態為“成功”時終止。透過將 wait_for_termination 設定為 False,可以停用此功能以實現非同步等待——通常與 AzureDataFactoryPipelineRunStatusSensor 配合使用。

下面是使用此運算子執行 Azure Data Factory 管道的示例。

tests/system/microsoft/azure/example_adf_run_pipeline.py

    run_pipeline1 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline1",
        pipeline_name="pipeline1",
        parameters={"myParam": "value"},
    )

下面是使用此運算子執行 Azure Data Factory 管道並設定 deferrable 標誌的示例,以便管道執行狀態的輪詢發生在 Airflow Triggerer 上。

tests/system/microsoft/azure/example_adf_run_pipeline.py

run_pipeline3 = AzureDataFactoryRunPipelineOperator(
    task_id="run_pipeline3",
    pipeline_name="pipeline1",
    parameters={"myParam": "value"},
    deferrable=True,
)

這裡是使用此運算子執行管道但與 AzureDataFactoryPipelineRunStatusSensor 配合執行非同步等待的另一個示例。

tests/system/microsoft/azure/example_adf_run_pipeline.py

    run_pipeline2 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline2",
        pipeline_name="pipeline2",
        wait_for_termination=False,
    )

    pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor",
        run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
    )

    # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
    pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor_defered",
        run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

    pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_async_sensor",
        run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

此外,如果您想在 sensor 執行時釋放工作槽,可以在 AzureDataFactoryPipelineRunStatusSensor 中使用 deferrable 模式。

tests/system/microsoft/azure/example_adf_run_pipeline.py

    run_pipeline2 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline2",
        pipeline_name="pipeline2",
        wait_for_termination=False,
    )

    pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor",
        run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
    )

    # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
    pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor_defered",
        run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

    pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_async_sensor",
        run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

非同步輪詢資料工廠管道執行狀態

使用 AzureDataFactoryPipelineRunStatusAsyncSensor (deferrable 版本) 非同步定期檢索資料工廠管道執行狀態。此 sensor 會釋放工作槽,因為作業狀態的輪詢發生在 Airflow triggerer 上,從而實現 Airflow 內部資源的有效利用。

tests/system/microsoft/azure/example_adf_run_pipeline.py

run_pipeline2 = AzureDataFactoryRunPipelineOperator(
    task_id="run_pipeline2",
    pipeline_name="pipeline2",
    wait_for_termination=False,
)

pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_sensor",
    run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
)

# Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_sensor_defered",
    run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
    deferrable=True,
)

pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_async_sensor",
    run_id=cast("str", XComArg(run_pipeline2, key="run_id")),
    deferrable=True,
)

參考

欲瞭解更多資訊,請參考 Microsoft 文件

此條目有幫助嗎?