OdbcOperator

開放資料庫連線 (ODBC) 是一種用於訪問資料庫管理系統 (DBMS) 的標準 API。

先決條件任務

要使用此運算子,您需要

  • 安裝 python 模組 pyodbc: .. code-block:: bash

    pip install apache-airflow[odbc]

  • 安裝您的資料庫的 ODBC 驅動程式。

  • 如果您的資料庫需要,配置 ODBC 資料來源名稱 (DSN)。

滿足這些先決條件後,您應該能夠執行此 Python 片段(將變數值替換為您驅動程式相關的值)。

如果 pyodbc 模組缺失或驅動程式不可用,其他錯誤訊息將通知您。一個 Connection Refused 錯誤意味著連線字串指向的主機上沒有資料庫正在監聽新連線。

import pyodbc

driver = "{ODBC Driver 17 for SQL Server}"
server = "localhost"
database = "testdb"
username = "user"
password = "password"

conn_str = (
    f"DRIVER={driver};" f"SERVER={server};" f"DATABASE={database};" f"UID={username};" f"PWD={password};"
)

conn = pyodbc.connect(conn_str)

用法

使用 SQLExecuteQueryOperator 對可透過 ODBC 驅動程式訪問的資料庫(或資料儲存)執行命令。

ODBC 連線 (ODBC Connection) 必須作為 conn_id 傳遞。

tests/system/odbc/example_odbc.py


    create_table = SQLExecuteQueryOperator(
        task_id="create_table",
        sql="""
        CREATE TABLE IF NOT EXISTS my_table (
            dt VARCHAR(50),
            value VARCHAR(255)
        );
        """,
        conn_id="my_odbc_conn",
        autocommit=True,
    )

引數 sql 可以接收一個字串或一個字串列表。每個字串可以是 SQL 語句,也可以是模板檔案的引用。模板引用以 '.sql' 結尾來識別。

引數 autocommit 如果設定為 True,將在每個命令後執行一次提交(預設為 False)。

模板化

您可以使用 Jinja 模板 來引數化 sql

tests/system/odbc/example_odbc.py


    insert_data = SQLExecuteQueryOperator(
        task_id="insert_data",
        sql="""
        INSERT INTO my_table (dt, value)
        VALUES ('{{ ds }}', 'test_value');
        """,
        conn_id="my_odbc_conn",
        autocommit=True,
    )

此條目有幫助嗎?