Airflow Summit 2025 將於 10 月 07 日至 09 日舉行。立即註冊可獲得早鳥票!

連線管理

另請參閱

有關 Hook 和連線的概覽,請參閱連線與 Hook

Airflow 的Connection 物件用於儲存連線到外部服務所需的憑據及其他資訊。

連線可以透過以下方式定義

在環境變數中儲存連線

Airflow 連線可以在環境變數中定義。

命名約定是AIRFLOW_CONN_{CONN_ID},全部大寫(注意 CONN 周圍的單下劃線)。因此,如果您的連線 ID 是 my_prod_db,則變數名應為 AIRFLOW_CONN_MY_PROD_DB

值可以是 JSON 格式或 Airflow 的 URI 格式。

JSON 格式示例

始於 2.3.0 版本。

如果使用 JSON 序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='{
    "conn_type": "my-conn-type",
    "login": "my-login",
    "password": "my-password",
    "host": "my-host",
    "port": 1234,
    "schema": "my-schema",
    "extra": {
        "param1": "val1",
        "param2": "val2"
    }
}'

生成連線的 JSON 表示

始於 2.8.0 版本。

為了更方便地生成連線的 JSON,Connection 類提供了一個便利屬性 as_json()。它可以這樣使用

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'

此外,可以使用相同的方法將連線從 URI 格式轉換為 JSON 格式

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="awesome_conn",
...     description="Example Connection",
...     uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'

URI 格式示例

如果使用 Airflow URI 序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1&param2=val2'

有關如何生成有效 URI 的更多詳細資訊,請參閱連線 URI 格式

注意

在環境變數中定義的連線不會顯示在 Airflow UI 中,也不會透過 airflow connections list 命令顯示。

在 Secrets Backend 中儲存連線

您可以將 Airflow 連線儲存在外部 secrets backend 中,例如 HashiCorp Vault、AWS SSM Parameter Store 和其他此類服務。更多詳細資訊請參閱Secrets Backend

在資料庫中儲存連線

另請參閱

連線也可以儲存在環境變數外部 secrets backend 中,例如 HashiCorp Vault、AWS SSM Parameter Store 等。

將連線儲存在資料庫中時,您可以使用 Web UI 或 Airflow CLI 進行管理。

使用 UI 建立連線

開啟 UI 的 Admin->Connections 部分。點選 Add Connection 連結以建立新連線。

../_images/connection_create.png
  1. Connection Id 欄位中填寫所需的連線 ID。建議使用小寫字母並透過下劃線分隔單詞。

  2. Connection Type 欄位中選擇連線型別。

  3. 填寫其餘欄位。有關不同連線型別欄位的描述,請參閱處理 extra 中的任意字典

  4. 點選 Save 按鈕建立連線。

使用 UI 編輯連線

開啟 UI 的 Admin->Connections 部分。在連線列表中點選您希望編輯的連線旁邊的鉛筆圖示。

../_images/connection_edit.png

修改連線屬性並點選 Save 按鈕儲存更改。

從 CLI 建立連線

您可以從 CLI 向資料庫新增連線。

您可以使用 JSON 格式新增連線(始於 2.3.0 版本)

airflow connections add 'my_prod_db' \
    --conn-json '{
        "conn_type": "my-conn-type",
        "login": "my-login",
        "password": "my-password",
        "host": "my-host",
        "port": 1234,
        "schema": "my-schema",
        "extra": {
            "param1": "val1",
            "param2": "val2"
        }
    }'

或者,您可以使用 Airflow 的連線 URI 格式(請參閱生成連線 URI)。

airflow connections add 'my_prod_db' \
    --conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1&param2=val2&...'

最後,您也可以單獨指定每個引數

airflow connections add 'my_prod_db' \
    --conn-type 'my-conn-type' \
    --conn-login 'login' \
    --conn-password 'password' \
    --conn-host 'host' \
    --conn-port 'port' \
    --conn-schema 'schema' \
    ...

匯出連線到檔案

您可以將儲存在資料庫中的連線匯出到檔案(例如,用於將連線從一個環境遷移到另一個環境)。有關用法,請參閱匯出連線

資料庫中連線的安全性

對於儲存在 Airflow 元資料資料庫中的連線,Airflow 使用 Fernet 加密密碼和其他潛在敏感資料。它保證如果沒有加密密碼,連線密碼就無法在沒有金鑰的情況下被篡改或讀取。有關配置 Fernet 的資訊,請檢視Fernet

測試連線

出於安全原因,Airflow UI、API 和 CLI 中的測試連線功能預設處於停用狀態。

有關使用者許可權的更多資訊,請參閱文件:https://airflow.apache.tw/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。強烈建議在確保只有高度信任的 UI/API 使用者擁有“編輯連線”許可權後,再啟用此功能。

此功能的可用性可以透過 Airflow 配置 (airflow.cfg) 的 core 部分中的 test_connection 標誌來控制。也可以透過環境變數 AIRFLOW__CORE__TEST_CONNECTION 來控制。

此配置引數接受以下值

  • Disabled(停用):停用測試連線功能並停用 UI 中的“測試連線”按鈕。這也是 Airflow 配置中設定的預設值。

  • Enabled(啟用):啟用測試連線功能並激活 UI 中的“測試連線”按鈕。

  • Hidden(隱藏):停用測試連線功能並隱藏 UI 中的“測試連線”按鈕。

啟用測試連線後,可以在 UI 的建立編輯連線頁面使用,或者透過呼叫連線 REST API,或者執行 airflow connections test CLI 命令來使用。

警告

使用 Airflow UI 或 REST API 時,對於駐留在外部 secrets backend 中的連線,此功能將不可用。

要測試連線,Airflow 會呼叫相關 Hook 類中的 test_connection 方法並報告結果。連線型別可能沒有關聯的 Hook,或者 Hook 沒有實現 test_connection 方法,這兩種情況下都會顯示錯誤訊息或功能將被停用(如果您在 UI 中測試)。

注意

在 Airflow UI 中進行測試時,測試是從 Webserver 執行的,因此此功能受限於您為 Webserver 設定的網路出站規則。

注意

如果 Webserver 和 Worker 機器(如果透過 Airflow UI 測試)或機器/Pod(如果透過 Airflow CLI 測試)安裝了不同的庫或 Providers,測試結果可能會有所不同。

自定義連線型別

Airflow 允許定義自定義連線型別——包括修改連線的新增/編輯表單。自定義連線型別在社群維護的 Providers 中定義,但您也可以新增自定義 Provider 來新增自定義連線型別。有關如何新增自定義 Provider 的描述,請參閱Providers

自定義連線型別透過 Providers 提供的 Hook 來定義。Hook 可以實現協議類 DiscoverableHook 中定義的方法。請注意,您的自定義 Hook 不應繼承自此類,此類是一個示例,用於記錄您的 Hook 可能定義的類欄位和方法的期望。另一個很好的例子是JdbcHook

透過在您的 Hook 中實現這些方法並透過 Provider 元資料中的 connection-types 陣列(以及已棄用的 hook-class-names)暴露它們,您可以自定義 Airflow:

  • 新增自定義連線型別

  • 新增從連線型別自動建立 Hook

  • 新增自定義表單控制元件以顯示和編輯連線 URL 中的自定義“額外”引數

  • 隱藏連線中未使用的欄位

  • 新增顯示欄位應如何格式化的示例佔位符

您可以在Providers中閱讀更多關於如何新增自定義 Provider 的詳細資訊。

自定義連線欄位

可以在 Airflow Webserver 的連線新增/編輯檢視中新增自定義表單欄位。自定義欄位以 JSON 格式儲存在 Connection.extra 欄位中。要新增自定義欄位,請實現 get_connection_form_widgets() 方法。此方法應返回一個字典。鍵應為欄位的字串名稱,該名稱應儲存在 extra 字典中。值應為 wtforms.fields.core.Field 的繼承者。

這是一個例子

@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
    """Returns connection widgets to add to connection form"""
    from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
    from flask_babel import lazy_gettext
    from wtforms import StringField

    return {
        "workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
        "project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
    }

注意

自定義欄位不再需要 extra__<conn type>__ 字首

在 Airflow 2.3 之前,如果您想在 UI 中新增自定義欄位,必須在其前面加上 extra__<conn type>__ 字首,並且其值將以這種方式儲存在 extra 字典中。從 2.3 版本開始,您不再需要這樣做。

get_ui_field_behaviour() 方法允許您自定義兩者的行為。例如,您可以隱藏或重新標記欄位(例如,如果它未使用或重新用途),並且可以新增佔位符文字。

一個例子

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
    """Returns custom field behaviour"""
    return {
        "hidden_fields": ["port", "host", "login", "schema"],
        "relabeling": {},
        "placeholders": {
            "password": "Asana personal access token",
            "workspace": "My workspace gid",
            "project": "My project gid",
        },
    }

注意

如果您想為一個名稱與標準連線屬性(即 login, password, host, scheme, port, extra)衝突的 extra 欄位新增表單佔位符,則必須在其前面加上 extra__<conn type>__ 字首。例如 extra__myservice__password

檢視 Provider 可以獲得一些您可以做到的例子,例如 JdbcHookAsanaHook 都使用了這個功能。

注意

已棄用的 hook-class-names

在 Airflow 2.2.0 之前,Provider 中的連線透過 Provider 元資料中的 hook-class-names 陣列暴露。然而,事實證明,在 Worker 中使用單個 Hook 時效率低下,現在 hook-class-names 陣列已被 connection-types 陣列取代。在 Provider 支援低於 2.2.0 的 Airflow 版本之前,connection-typeshook-class-names 都應該存在。CI 構建期間的自動化檢查將驗證這兩個陣列的一致性。

URI 格式

注意

從 2.3.0 版本開始,您可以使用 JSON 序列化連線。請參閱示例

出於歷史原因,Airflow 有一種特殊的 URI 格式,可用於將 Connection 物件序列化為字串值。

通常,Airflow 的 URI 格式如下所示

my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2

上述 URI 將生成一個等同於以下內容的 Connection 物件

Connection(
    conn_id="",
    conn_type="my_conn_type",
    description=None,
    login="my-login",
    password="my-password",
    host="my-host",
    port=5432,
    schema="my-schema",
    extra=json.dumps(dict(param1="val1", param2="val2")),
)

生成連線 URI

為了更方便地生成連線 URI,Connection 類提供了一個便利方法 get_uri()。它可以這樣使用

>>> import json
>>> from airflow.sdk import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'

注意

get_uri() 方法返回的是 Airflow 格式的連線 URI,不是 SQLAlchemy 相容的 URI。如果您需要用於資料庫連線的 SQLAlchemy 相容 URI,請改用 sqlalchemy_url 屬性。

此外,如果您已建立連線,可以使用 airflow connections get 命令。

$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db

處理 extra 中的任意字典

某些 JSON 結構無法進行 URL 編碼而不會丟失資料。對於此類 JSON,get_uri 會將整個字串儲存在 URL 查詢引數 __extra__ 下。

例如

>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
...     conn_type="scheme",
...     host="host/location",
...     schema="schema",
...     login="user",
...     password="password",
...     port=1234,
...     extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'

並且我們可以驗證它返回相同的字典

>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True

但對於僅儲存鍵值對的最常見情況,使用簡單的 URL 編碼。

您可以這樣驗證 URI 是否被正確解析

>>> from airflow.sdk import Connection

>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password

處理連線引數中的特殊字元

注意

生成連線時,請使用 Connection.get_uri 便利方法,如 生成連線 URI 部分所述。本節僅供參考。

手動構建 URI 時,某些字元需要特殊處理。

例如,如果您的密碼包含 /,則會失敗

>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1&param2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'

要解決此問題,您可以使用 quote_plus() 進行編碼

>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.password)
my-pa/ssword

本條目有幫助嗎?