Airflow Summit 2025 將於 10 月 07 日至 09 日舉行。立即註冊獲取早鳥票!

構建簡單資料管道

歡迎閱讀本系列第三個教程!至此,您已經編寫了第一個 DAG 並使用了基本 Operator。現在是時候構建一個小而有意義的資料管道了——一個從外部源檢索資料、將其載入到資料庫並在此過程中進行清洗的資料管道。

本教程將介紹 SQLExecuteQueryOperator,這是一種在 Airflow 中執行 SQL 的靈活且現代的方式。我們將使用它與本地 Postgres 資料庫進行互動,我們將在 Airflow UI 中配置該資料庫。

透過本教程,您將擁有一個可用的管道,該管道將:

  • 下載 CSV 檔案

  • 將資料載入到暫存表中

  • 清洗資料並將其 upsert 到目標表中

在此過程中,您將獲得 Airflow UI、連線系統、SQL 執行和 DAG 編寫模式的實踐經驗。

想在學習過程中深入瞭解嗎?這裡有兩個有用的參考資料:

讓我們開始吧!

初始設定

注意

您需要安裝 Docker 才能執行本教程。我們將使用 Docker Compose 在本地啟動 Airflow。如果您在設定方面需要幫助,請檢視Docker Compose 快速入門指南

要執行我們的管道,我們需要一個可用的 Airflow 環境。Docker Compose 使這變得簡單安全——無需進行系統範圍的安裝。只需開啟您的終端並執行以下命令:

# Download the docker-compose.yaml file
curl -LfO 'https://airflow.apache.tw/docs/apache-airflow/stable/docker-compose.yaml'

# Make expected directories and set an expected environment variable
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env

# Initialize the database
docker compose up airflow-init

# Start up all services
docker compose up

Airflow 啟動並執行後,訪問 UI 地址 https://:8080

使用以下憑據登入:

  • 使用者名稱: airflow

  • 密碼: airflow

您將進入 Airflow 儀表板,在那裡您可以觸發 DAG、檢視日誌以及管理您的環境。

建立 Postgres 連線

在我們的管道能夠寫入 Postgres 之前,我們需要告訴 Airflow 如何連線到它。在 UI 中,開啟 Admin > Connections 頁面,然後點選 + 按鈕新增一個新連線

填寫以下詳細資訊:

  • 連線 ID: tutorial_pg_conn

  • 連線型別: postgres

  • 主機: postgres

  • 資料庫: airflow (這是我們容器中的預設資料庫)

  • 登入使用者: airflow

  • 密碼: airflow

  • 埠: 5432

Add Connection form in Airflow's web UI with Postgres details filled in.

儲存連線。這會告訴 Airflow 如何訪問在您的 Docker 環境中執行的 Postgres 資料庫。

接下來,我們將開始構建使用此連線的管道。

建立用於暫存和最終資料的表

讓我們從建立表開始。我們將建立兩個表:

  • employees_temp: 用於原始資料的暫存表

  • employees: 清洗並去重後的目標表

我們將使用 SQLExecuteQueryOperator 來執行建立這些表所需的 SQL 語句。

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

create_employees_table = SQLExecuteQueryOperator(
    task_id="create_employees_table",
    conn_id="tutorial_pg_conn",
    sql="""
        CREATE TABLE IF NOT EXISTS employees (
            "Serial Number" NUMERIC PRIMARY KEY,
            "Company Name" TEXT,
            "Employee Markme" TEXT,
            "Description" TEXT,
            "Leave" INTEGER
        );""",
)

create_employees_temp_table = SQLExecuteQueryOperator(
    task_id="create_employees_temp_table",
    conn_id="tutorial_pg_conn",
    sql="""
        DROP TABLE IF EXISTS employees_temp;
        CREATE TABLE employees_temp (
            "Serial Number" NUMERIC PRIMARY KEY,
            "Company Name" TEXT,
            "Employee Markme" TEXT,
            "Description" TEXT,
            "Leave" INTEGER
        );""",
)

您可以選擇將這些 SQL 語句放在 dags/ 資料夾中的 .sql 檔案中,並將檔案路徑傳遞給 sql= 引數。這是一種保持 DAG 程式碼整潔的好方法。

將資料載入到暫存表中

接下來,我們將下載一個 CSV 檔案,將其儲存在本地,並使用 PostgresHook 將其載入到 employees_temp 中。

import os
import requests
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task
def get_data():
    # NOTE: configure this as appropriate for your airflow environment
    data_path = "/opt/airflow/dags/files/employees.csv"
    os.makedirs(os.path.dirname(data_path), exist_ok=True)

    url = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/docs/tutorial/pipeline_example.csv"

    response = requests.request("GET", url)

    with open(data_path, "w") as file:
        file.write(response.text)

    postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
    conn = postgres_hook.get_conn()
    cur = conn.cursor()
    with open(data_path, "r") as file:
        cur.copy_expert(
            "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
            file,
        )
    conn.commit()

此任務讓您體驗 Airflow 與原生 Python 和 SQL Hook 的結合——這在實際管道中是一種常見模式。

合併和清洗資料

現在讓我們對資料進行去重,並將其合併到最終表中。我們將編寫一個執行 SQL INSERT … ON CONFLICT DO UPDATE 語句的任務。

from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@task
def merge_data():
    query = """
        INSERT INTO employees
        SELECT *
        FROM (
            SELECT DISTINCT *
            FROM employees_temp
        ) t
        ON CONFLICT ("Serial Number") DO UPDATE
        SET
              "Employee Markme" = excluded."Employee Markme",
              "Description" = excluded."Description",
              "Leave" = excluded."Leave";
    """
    try:
        postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
        conn = postgres_hook.get_conn()
        cur = conn.cursor()
        cur.execute(query)
        conn.commit()
        return 0
    except Exception as e:
        return 1

定義 DAG

現在我們已經定義了所有任務,是時候將它們組合成一個 DAG 了。

import datetime
import pendulum
import os

import requests
from airflow.sdk import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


@dag(
    dag_id="process_employees",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
)
def ProcessEmployees():
    create_employees_table = SQLExecuteQueryOperator(
        task_id="create_employees_table",
        conn_id="tutorial_pg_conn",
        sql="""
            CREATE TABLE IF NOT EXISTS employees (
                "Serial Number" NUMERIC PRIMARY KEY,
                "Company Name" TEXT,
                "Employee Markme" TEXT,
                "Description" TEXT,
                "Leave" INTEGER
            );""",
    )

    create_employees_temp_table = SQLExecuteQueryOperator(
        task_id="create_employees_temp_table",
        conn_id="tutorial_pg_conn",
        sql="""
            DROP TABLE IF EXISTS employees_temp;
            CREATE TABLE employees_temp (
                "Serial Number" NUMERIC PRIMARY KEY,
                "Company Name" TEXT,
                "Employee Markme" TEXT,
                "Description" TEXT,
                "Leave" INTEGER
            );""",
    )

    @task
    def get_data():
        # NOTE: configure this as appropriate for your airflow environment
        data_path = "/opt/airflow/dags/files/employees.csv"
        os.makedirs(os.path.dirname(data_path), exist_ok=True)

        url = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/docs/tutorial/pipeline_example.csv"

        response = requests.request("GET", url)

        with open(data_path, "w") as file:
            file.write(response.text)

        postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
        conn = postgres_hook.get_conn()
        cur = conn.cursor()
        with open(data_path, "r") as file:
            cur.copy_expert(
                "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
                file,
            )
        conn.commit()

    @task
    def merge_data():
        query = """
            INSERT INTO employees
            SELECT *
            FROM (
                SELECT DISTINCT *
                FROM employees_temp
            ) t
            ON CONFLICT ("Serial Number") DO UPDATE
            SET
              "Employee Markme" = excluded."Employee Markme",
              "Description" = excluded."Description",
              "Leave" = excluded."Leave";
        """
        try:
            postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
            conn = postgres_hook.get_conn()
            cur = conn.cursor()
            cur.execute(query)
            conn.commit()
            return 0
        except Exception as e:
            return 1

    [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()


dag = ProcessEmployees()

將此 DAG 儲存為 dags/process_employees.py。稍後,它將顯示在 UI 中。

觸發並探索你的 DAG

開啟 Airflow UI,在列表中找到 process_employees DAG。使用滑塊將其“開啟”,然後點選播放按鈕觸發執行。

您可以在 Grid 檢視中檢視每個任務的執行情況,並檢視每個步驟的日誌。

DAG List view showing the ``process_employees`` DAG

DAG Overview page for ``process_employees`` DAG showing the DAG run

成功後,您將擁有一個完全可用的管道,該管道可以整合外部資料、將其載入到 Postgres 中並保持其整潔。

接下來是什麼?

幹得好!您現在已經使用 Airflow 的核心模式和工具構建了一個真正的管道。以下是一些您可以進一步探索的方向:

  • 嘗試替換為不同的 SQL provider,例如 MySQL 或 SQLite。

  • 將您的 DAG 拆分為 TaskGroup 或重構為更易用的模式。

  • 新增告警步驟或在資料處理完成後傳送通知。

另請參閱

這篇文章有幫助嗎?