構建簡單資料管道¶
歡迎閱讀本系列第三個教程!至此,您已經編寫了第一個 DAG 並使用了基本 Operator。現在是時候構建一個小而有意義的資料管道了——一個從外部源檢索資料、將其載入到資料庫並在此過程中進行清洗的資料管道。
本教程將介紹 SQLExecuteQueryOperator,這是一種在 Airflow 中執行 SQL 的靈活且現代的方式。我們將使用它與本地 Postgres 資料庫進行互動,我們將在 Airflow UI 中配置該資料庫。
透過本教程,您將擁有一個可用的管道,該管道將:
下載 CSV 檔案
將資料載入到暫存表中
清洗資料並將其 upsert 到目標表中
在此過程中,您將獲得 Airflow UI、連線系統、SQL 執行和 DAG 編寫模式的實踐經驗。
想在學習過程中深入瞭解嗎?這裡有兩個有用的參考資料:
Postgres provider 文件
讓我們開始吧!
初始設定¶
注意
您需要安裝 Docker 才能執行本教程。我們將使用 Docker Compose 在本地啟動 Airflow。如果您在設定方面需要幫助,請檢視Docker Compose 快速入門指南。
要執行我們的管道,我們需要一個可用的 Airflow 環境。Docker Compose 使這變得簡單安全——無需進行系統範圍的安裝。只需開啟您的終端並執行以下命令:
# Download the docker-compose.yaml file
curl -LfO 'https://airflow.apache.tw/docs/apache-airflow/stable/docker-compose.yaml'
# Make expected directories and set an expected environment variable
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
# Initialize the database
docker compose up airflow-init
# Start up all services
docker compose up
Airflow 啟動並執行後,訪問 UI 地址 https://:8080。
使用以下憑據登入:
使用者名稱:
airflow密碼:
airflow
您將進入 Airflow 儀表板,在那裡您可以觸發 DAG、檢視日誌以及管理您的環境。
建立 Postgres 連線¶
在我們的管道能夠寫入 Postgres 之前,我們需要告訴 Airflow 如何連線到它。在 UI 中,開啟 Admin > Connections 頁面,然後點選 + 按鈕新增一個新連線。
填寫以下詳細資訊:
連線 ID:
tutorial_pg_conn連線型別:
postgres主機:
postgres資料庫:
airflow(這是我們容器中的預設資料庫)登入使用者:
airflow密碼:
airflow埠:
5432
儲存連線。這會告訴 Airflow 如何訪問在您的 Docker 環境中執行的 Postgres 資料庫。
接下來,我們將開始構建使用此連線的管道。
建立用於暫存和最終資料的表¶
讓我們從建立表開始。我們將建立兩個表:
employees_temp: 用於原始資料的暫存表employees: 清洗並去重後的目標表
我們將使用 SQLExecuteQueryOperator 來執行建立這些表所需的 SQL 語句。
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
create_employees_table = SQLExecuteQueryOperator(
task_id="create_employees_table",
conn_id="tutorial_pg_conn",
sql="""
CREATE TABLE IF NOT EXISTS employees (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
create_employees_temp_table = SQLExecuteQueryOperator(
task_id="create_employees_temp_table",
conn_id="tutorial_pg_conn",
sql="""
DROP TABLE IF EXISTS employees_temp;
CREATE TABLE employees_temp (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
您可以選擇將這些 SQL 語句放在 dags/ 資料夾中的 .sql 檔案中,並將檔案路徑傳遞給 sql= 引數。這是一種保持 DAG 程式碼整潔的好方法。
將資料載入到暫存表中¶
接下來,我們將下載一個 CSV 檔案,將其儲存在本地,並使用 PostgresHook 將其載入到 employees_temp 中。
import os
import requests
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
@task
def get_data():
# NOTE: configure this as appropriate for your airflow environment
data_path = "/opt/airflow/dags/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)
url = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/docs/tutorial/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
此任務讓您體驗 Airflow 與原生 Python 和 SQL Hook 的結合——這在實際管道中是一種常見模式。
合併和清洗資料¶
現在讓我們對資料進行去重,並將其合併到最終表中。我們將編寫一個執行 SQL INSERT … ON CONFLICT DO UPDATE 語句的任務。
from airflow.sdk import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
@task
def merge_data():
query = """
INSERT INTO employees
SELECT *
FROM (
SELECT DISTINCT *
FROM employees_temp
) t
ON CONFLICT ("Serial Number") DO UPDATE
SET
"Employee Markme" = excluded."Employee Markme",
"Description" = excluded."Description",
"Leave" = excluded."Leave";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
return 1
定義 DAG¶
現在我們已經定義了所有任務,是時候將它們組合成一個 DAG 了。
import datetime
import pendulum
import os
import requests
from airflow.sdk import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
@dag(
dag_id="process_employees",
schedule="0 0 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
def ProcessEmployees():
create_employees_table = SQLExecuteQueryOperator(
task_id="create_employees_table",
conn_id="tutorial_pg_conn",
sql="""
CREATE TABLE IF NOT EXISTS employees (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
create_employees_temp_table = SQLExecuteQueryOperator(
task_id="create_employees_temp_table",
conn_id="tutorial_pg_conn",
sql="""
DROP TABLE IF EXISTS employees_temp;
CREATE TABLE employees_temp (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
@task
def get_data():
# NOTE: configure this as appropriate for your airflow environment
data_path = "/opt/airflow/dags/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)
url = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/docs/tutorial/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
@task
def merge_data():
query = """
INSERT INTO employees
SELECT *
FROM (
SELECT DISTINCT *
FROM employees_temp
) t
ON CONFLICT ("Serial Number") DO UPDATE
SET
"Employee Markme" = excluded."Employee Markme",
"Description" = excluded."Description",
"Leave" = excluded."Leave";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
return 1
[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
dag = ProcessEmployees()
將此 DAG 儲存為 dags/process_employees.py。稍後,它將顯示在 UI 中。
觸發並探索你的 DAG¶
開啟 Airflow UI,在列表中找到 process_employees DAG。使用滑塊將其“開啟”,然後點選播放按鈕觸發執行。
您可以在 Grid 檢視中檢視每個任務的執行情況,並檢視每個步驟的日誌。
成功後,您將擁有一個完全可用的管道,該管道可以整合外部資料、將其載入到 Postgres 中並保持其整潔。
接下來是什麼?¶
幹得好!您現在已經使用 Airflow 的核心模式和工具構建了一個真正的管道。以下是一些您可以進一步探索的方向:
嘗試替換為不同的 SQL provider,例如 MySQL 或 SQLite。
將您的 DAG 拆分為 TaskGroup 或重構為更易用的模式。
新增告警步驟或在資料處理完成後傳送通知。
另請參閱
在Airflow 文件中瀏覽更多操作指南