如何建立你自己的 Provider¶
定製 Provider¶
你可以開發併發布自己的 Provider。你的定製 Operator、Hook、Sensor、Transfer Operator 可以打包在一個標準的 Airflow 包中,並使用相同的機制進行安裝。此外,它們還可以使用相同的機制來擴充套件 Airflow 核心,包括認證後端、定製連線、日誌記錄、金鑰後端以及額外 Operator 連結,如前一章所述。
如 Provider 文件所述,定製 Provider 可以擴充套件 Airflow 核心 - 它們可以為 Operator 新增額外的連結以及定製連線。如果你想為自己的定製 Provider 使用這種機制,可以構建並安裝它們作為包。
如何建立 Provider¶
將 Provider 新增到 Airflow 只是構建一個 Python 包並將正確的元資料新增到包中的問題。我們使用標準的 Python 機制來定義入口點(entry points)。你的包需要定義適當的入口點 apache_airflow_provider,它必須指向你的包實現的函式,並返回一個字典,其中包含你包的可發現功能的列表。該字典必須遵循json-schema 規範。
Schema 中的大部分欄位為文件提供了擴充套件點(你也可能想用於自己的目的),但從可擴充套件性角度來看,重要的欄位是這些:
在 CLI/API 中顯示包資訊
package-name- Provider 的包名。name- Provider 的易讀名稱。description- Provider 的額外描述。version- 包的版本列表(按時間倒序)。列表中的第一個版本是當前包版本。它取自已安裝包的版本,而不是 provider_info 資訊中的版本。
向 Airflow 核心暴露定製功能
extra-links- 此欄位應包含所有新增額外連結功能的 Operator 類名的列表。請參閱定義 Operator 額外連結,瞭解如何為你的 Operator 新增額外連結功能。connection-types- 此欄位應包含所有連線型別的列表,以及實現這些定製連線型別(提供定製額外欄位和定製欄位行為)的 Hook 類名。此欄位從 Airflow 2.2.0 開始可用,它取代了已棄用的hook-class-names。有關更多詳細資訊,請參閱管理連線。secret-backends- 此欄位應包含 Provider 提供的所有金鑰後端類名的列表。請參閱金鑰後端,瞭解如何新增。task-decorators- 此欄位應包含字典列表,其中包含裝飾器可用的名稱/路徑。請參閱建立定製 @task 裝飾器,瞭解如何新增定製裝飾器。logging- 此欄位應包含 Provider 提供的所有日誌處理程式類名的列表。請參閱任務日誌記錄,瞭解日誌處理程式。notifications- 此欄位應包含通知類。請參閱建立通知器,瞭解通知。executors- 此欄位應包含執行器類名。請參閱執行器,瞭解執行器。config- 此欄位應包含一個字典,該字典應符合airflow/config_templates/config.yml.schema.json,其中包含 Provider 貢獻的配置。請參閱設定配置選項,瞭解設定配置的詳細資訊。
filesystems- 此欄位應包含所有檔案系統模組名的列表。請參閱物件儲存,瞭解檔案系統。integrations- 提供 Provider 中可用的整合列表。transfers- 此欄位應包含 Provider 提供的所有 Transfer Operator 類名的列表。請參閱Operator,瞭解 Operator(其中 Transfer 是 Operator 的一種型別)。operators- 此欄位應包含 Provider 提供的所有 Operator 類名的列表。請參閱Operator,瞭解 Operator。hooks- 此欄位應包含 Provider 提供的所有 Hook 類名的列表。請參閱管理連線,瞭解 Hook 及其提供的連線。sensors- 此欄位應包含 Provider 提供的所有 Sensor 類名的列表。請參閱Sensor,瞭解 Sensor。bundles- 此欄位應包含 Provider 提供的所有 Bundle 類名的列表。triggers- 此欄位應包含 Provider 提供的所有 Trigger 類名的列表。請參閱可延遲 Operator 和 Trigger,瞭解 Trigger。auth-backends- 此欄位應包含 Provider 提供的所有認證後端類名的列表。請參閱認證管理器,瞭解認證。auth-managers- 此欄位應包含 Provider 提供的所有認證管理器類名的列表。請參閱認證管理器,瞭解認證管理器。notifications- 此欄位應包含 Provider 提供的所有通知類名的列表。請參閱建立通知器,瞭解通知。task-decorators- 此欄位應包含 Provider 提供的所有任務裝飾器類名的列表。請參閱建立定製 @task 裝飾器,瞭解任務裝飾器。config- 此欄位應包含 Provider 提供的所有定製配置選項的列表。asset-uris(已棄用) - 此欄位應包含 URI scheme 列表以及實現標準化函式的類名。請參閱資產定義,瞭解資料集 URI。
注意
已棄用的值
hook-class-names(已棄用) - 此欄位應包含所有提供帶定製額外欄位和欄位行為的定製連線型別的 Hook 類名列表。hook-class-names陣列自 Airflow 2.2.0 起已棄用(出於最佳化原因),並將在 Airflow 3 中移除。如果你的 Provider 定位到 Airflow 2.2.0+,則無需包含hook-class-names陣列;如果你還想相容 Airflow 2 的早期版本,則應同時包含hook-class-names和connection-types陣列。有關更多詳細資訊,請參閱管理連線。dataset-uris(已棄用) - 此欄位應包含 URI scheme 列表以及實現標準化函式的類名。在 Airflow 3.0 中棄用,取而代之的是asset-uris。
安裝你的 Provider 後,你可以使用 airflow providers 命令查詢已安裝的 Provider 及其功能。這樣可以驗證你的 Provider 是否被正確識別以及它們是否正確定義了擴充套件。請參閱命令列介面和環境變數參考,瞭解可用 CLI 子命令的詳細資訊。
編寫自己的 Provider 時,請考慮遵循Provider 命名約定
特殊注意事項¶
可選的 Provider 功能¶
2.3.0 版本新增: 此功能在 Airflow 2.3+ 中可用。
一些 Provider 可能提供可選功能,這些功能僅在安裝了某些包或庫時才可用。此類功能通常會導致 ImportErrors;但是,這些匯入錯誤應被靜默忽略,而不是用虛假警告汙染 Airflow 的日誌。虛假警告是一種非常糟糕的模式,因為它們往往會變成盲點,因此鼓勵避免虛假警告。然而,在 Airflow 2.3 之前,Airflow 沒有機制選擇性地忽略“已知”ImportErrors。因此 Airflow 2.1 和 2.2 靜默忽略了所有來自 Provider 的 ImportErrors,實際上導致忽略了重要的匯入錯誤 - 沒有給 Airflow 使用者提示 Provider 的依賴項中缺少某些東西。
將 Provider 與動態任務對映結合使用¶
Airflow 2.3 添加了動態任務對映,並增加了為每個任務分配唯一鍵的可能性。這意味著當這樣的動態對映任務想要從 XCom 中檢索值時(例如,在需要計算額外連結的情況下),它應該始終檢查傳入的 ti_key 值是否不為 None,然後才使用 XCom.get_value 檢索 XCom 值。這使得能夠與早期版本的 Airflow 保持向後相容性。
想要保持向後相容性的 Provider 中訪問 XCom 值的典型程式碼應該類似於這樣(注意 if ti_key is not None: 條件)。
def get_link( self, operator: BaseOperator, dttm: datetime | None = None, ti_key: "TaskInstanceKey" | None = None, ): if ti_key is not None: job_ids = XCom.get_value(key="job_id", ti_key=ti_key) else: assert dttm is not None job_ids = XCom.get_one( key="job_id", dag_id=operator.dag.dag_id, task_id=operator.task_id, execution_date=dttm, ) if not job_ids: return None if len(job_ids) < self.index: return None job_id = job_ids[self.index] return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
定製 Provider 常見問題¶
當我編寫自己的 Provider 時,需要做些什麼特殊的事情來使其對其他人可用嗎?
你無需做任何特殊的事情,只需建立返回格式正確的元資料(包含 extra-links 和 connection-types 欄位的字典,如果你還定位到 Airflow 2.2.0 之前的版本,則還包含已棄用的 hook-class-names 欄位)的 apache_airflow_provider 入口點。
任何在安裝了你的 Python 包的環境中執行 Airflow 的人都可以將該包用作 Provider 包。
我應該給我的 Provider 起一個特定的名稱,還是應該在 ``airflow.providers`` 包中建立它?
我們有相當數量(>80 個)由社群管理的 Provider,我們將與 Apache Airflow 一起維護它們。所有這些 Provider 都有明確定義的結構並遵循我們定義的命名約定,並且它們都在 airflow.providers 包中。如果你的意圖是貢獻你的 Provider,那麼你應該遵循這些約定並向 Apache Airflow 提交 PR 以貢獻它。但你可以自由使用任何包名,只要不與其他名稱衝突,所以最好選擇在你“領域”內的包。
我需要做些什麼才能將一個包變成一個 Provider?
要將現有的 Python 包變成一個 Provider,你需要執行以下操作(參見下面的示例):
在
pyproject.toml檔案中新增apache_airflow_provider入口點 - 這會告訴 Airflow 從哪裡獲取所需的 Provider 元資料。建立你在第一步中提到的函式,作為你的包的一部分:該函式返回一個字典,其中包含關於你的 Provider 包的所有元資料。
如果你希望 Airflow 在 Provider 頁面中連結到你的 Provider 文件,請確保為你的包新增“project-url/documentation”元資料。這也會在 PyPI 中新增你的文件連結。
請注意,該字典應符合
airflow/provider_info.schema.jsonJSON-schema 規範。社群管理的 Provider 有更多用於構建文件的欄位,但執行時資訊的要求只包含 schema 中定義的幾個欄位。
airflow/provider_info.schema.json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"package-name": {
"description": "Package name available under which the package is available in the PyPI repository.",
"type": "string"
},
"name": {
"description": "Provider name",
"type": "string"
},
"description": {
"description": "Information about the package in RST format",
"type": "string"
},
"hook-class-names": {
"type": "array",
"description": "Hook class names that provide connection types to core (deprecated by connection-types)",
"items": {
"type": "string"
},
"deprecated": {
"description": "The hook-class-names property has been deprecated in favor of connection-types which is more performant version allowing to only import individual Hooks rather than all hooks at once",
"deprecatedVersion": "2.2.0"
}
},
"filesystems": {
"type": "array",
"description": "Filesystem module names",
"items": {
"type": "string"
}
},
"integrations": {
"description": "Array of integrations provided by the provider",
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"description": "Name of the integrations to expose by the provider",
"type": "string"
},
"external-doc-url": {
"description": "URL that describes the integration",
"type": "string"
},
"logo": {
"description": "URL or path on the airflow side where you can find the logo",
"type": "string"
},
"tags": {
"description": "Tags describing the integration (free-form)",
"type": "array",
"items": {
"type": "string"
}
}
}
}
},
"operators": {
"description": "List of operators provided by the integration",
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"description": "Name of the integration",
"type": "string"
},
"python-modules": {
"description": "List of modules where operators are found",
"type" : "array",
"items": {
"type": "string"
}
}
}
}
},
"sensors": {
"description": "List of sensors provided by the integration",
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"description": "Name of the integration",
"type": "string"
},
"python-modules": {
"description": "List of modules where sensors are found",
"type" : "array",
"items": {
"type": "string"
}
}
}
}
},
"hooks": {
"description": "List of hooks provided by the integration",
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"description": "Name of the integration",
"type": "string"
},
"python-modules": {
"description": "List of modules where hooks are found",
"type" : "array",
"items": {
"type": "string"
}
}
}
}
},
"asset-uris": {
"description": "List of asset uris provided by the provider",
"type": "array",
"items": {
"type": "object",
"properties": {
"schemes": {
"description": "Array of schemes supported",
"type": "array",
"items" : {
"type": "string"
}
},
"handler": {
"description": "Handler used to handle the asset",
"anyOf": [
{ "type": "string" },
{ "type": "null" }
]
},
"factory": {
"description": "Factory to create the asset",
"type": "string"
},
"to_openlineage_converter": {
"description": "Converter to open-lineage event",
"type": "string"
}
},
"required": ["schemes"]
}
},
"dataset-uris": {
"description": "List of asset uris provided by the provider",
"type": "array",
"items": {
"type": "object",
"properties": {
"schemes": {
"description": "Array of schemes supported",
"type": "array",
"items" : {
"type": "string"
}
},
"handler": {
"description": "Handler used to handle the asset",
"anyOf": [
{ "type": "string" },
{ "type": "null" }
]
},
"factory": {
"description": "Factory to create the asset",
"type": "string"
},
"to_openlineage_converter": {
"description": "Converter to open-lineage event",
"type": "string"
}
},
"required": ["schemes"]
},
"deprecated": {
"description": "The dataset-uris property has been deprecated in favor of asset-uris in airflow 3.0",
"deprecatedVersion": "3.0.0"
}
},
"dialects": {
"description": "List of dialects the provider provides",
"type": "array",
"items": {
"type": "object",
"properties": {
"dialect-type": {
"description": "Type of SQL dialect",
"type": "string"
},
"dialect-class-name": {
"description": "Class name that implements the dialect",
"type": "string"
}
}
}
},
"transfers": {
"description": "List of transfer operators the provider provides",
"type": "array",
"items": {
"type": "object",
"properties": {
"how-to-guide": {
"description": "Path to how-to-guide for the transfer. The path must start with '/docs/'",
"type": "string"
},
"source-integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider"
},
"target-integration-name": {
"type": "string",
"description": "Target integration name. It must have a matching item in the 'integration' section of any provider"
},
"python-module": {
"type": "string",
"description": "List of python modules containing the transfers"
}
},
"additionalProperties": false,
"required": [
"source-integration-name",
"target-integration-name",
"python-module"
]
}
},
"triggers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider"
},
"python-modules": {
"description": "List of Python modules containing the triggers",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false,
"required": [
"integration-name",
"python-modules"
]
}
},
"bundles": {
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider"
},
"python-modules": {
"description": "List of python modules containing the bundles",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false,
"required": [
"integration-name",
"python-modules"
]
}
},
"connection-types": {
"type": "array",
"description": "Map of connection types mapped to hook class names",
"items": {
"type": "object",
"properties": {
"connection-type": {
"description": "Type of connection defined by the provider",
"type": "string"
},
"hook-class-name": {
"description": "Hook class name that implements the connection type",
"type": "string"
}
},
"required": [
"connection-type",
"hook-class-name"
]
}
},
"extra-links": {
"type": "array",
"description": "Operator class names that provide extra link functionality",
"items": {
"type": "string"
}
},
"secrets-backends": {
"type": "array",
"description": "Secrets Backend class names",
"items": {
"type": "string"
}
},
"logging": {
"type": "array",
"description": "Logging Task Handlers class names",
"items": {
"type": "string"
}
},
"auth-backends": {
"type": "array",
"description": "API Auth Backend module names",
"items": {
"type": "string"
}
},
"auth-managers": {
"type": "array",
"description": "Auth managers module names",
"items": {
"type": "string"
}
},
"notifications": {
"type": "array",
"description": "Notification class names",
"items": {
"type": "string"
}
},
"executors": {
"type": "array",
"description": "Executor class names",
"items": {
"type": "string"
}
},
"config": {
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"options": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/option"
}
},
"renamed": {
"type": "object",
"properties": {
"previous_name": {
"type": "string"
},
"version": {
"type": "string"
}
}
}
},
"required": [
"description",
"options"
],
"additionalProperties": false
}
},
"task-decorators": {
"type": "array",
"description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",
"items": {
"name": {
"type": "string"
},
"path": {
"type": "string"
}
}
},
"plugins": {
"type": "array",
"description": "Plugins provided by the provider",
"items": {
"name": {
"type": "string",
"description": "Name of the plugin"
},
"plugin-class": {
"type": "string",
"description": "Class to instantiate the plugin"
}
}
}
},
"definitions": {
"option": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"version_added": {
"type": [
"string",
"null"
]
},
"type": {
"type": "string",
"enum": [
"string",
"boolean",
"integer",
"float"
]
},
"example": {
"type": [
"string",
"null",
"number"
]
},
"default": {
"type": [
"string",
"null",
"number"
]
},
"sensitive": {
"type": "boolean",
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
}
},
"required": [
"description",
"version_added",
"type",
"example",
"default"
],
"additional_properties": false
}
},
"required": [
"name",
"description"
]
}
pyproject.toml 示例
[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.myproviderpackage.get_provider_info:get_provider_info"
myproviderpackage/get_provider_info.py 示例
def get_provider_info():
return {
"package-name": "my-package-name",
"name": "name",
"description": "a description",
"hook-class-names": [
"myproviderpackage.hooks.source.SourceHook",
],
}
連線 ID 和型別有約定嗎?
非常好的問題。很高興你問了。我們通常遵循 <NAME>_default 作為連線 ID 的約定,只使用 <NAME> 作為連線型別。一些例子:
google_cloud_defaultID 和google_cloud_platform型別aws_defaultID 和aws型別
你應該遵循這個約定。使用唯一的連線型別名稱很重要,因此它對於你的 Provider 應該是唯一的。如果兩個 Provider 嘗試新增具有相同型別的連線,只有一個會成功。
我可以將自己的 Provider 貢獻給 Apache Airflow 嗎?
答案取決於 Provider。我們在 PROVIDERS.rst 開發者文件中有相關政策。
我可以在 PyPI 中向 Apache Airflow 使用者推廣我的 Provider 並將其作為包分享給其他人嗎?
當然可以!我們的網站有一個生態系統區域,我們在其中分享非社群管理的 Airflow 擴充套件和工作。歡迎向該頁面提交 PR,我們將評估並在我們認為該 Provider 對 Airflow 使用者社群有用時合併它。
我可以對使用我的 Provider 收費嗎?
這超出了我們的控制範圍和領域。作為一個 Apache 專案,我們對商業友好,並且有許多圍繞 Apache Airflow 和許多其他 Apache 專案建立起來的企業。作為一個社群,我們免費提供所有軟體,這一點永遠不會改變。第三方開發者正在做的事情不受 Apache Airflow 社群的控制。