HTTP Operator¶
以下程式碼示例使用 http_default 連線,這意味著請求將傳送到 httpbin 站點以執行基本的 HTTP 操作。
HttpSensor¶
使用 HttpSensor 探查(poke)直到 response_check 可呼叫物件評估為 true。
這裡我們探查直到 httpbin 返回的響應文字包含 httpbin。
tests/system/http/example_http.py
task_http_sensor_check = HttpSensor(
task_id="http_sensor_check",
http_conn_id="http_default",
endpoint="",
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)
該 Sensor 也可以在 deferrable 模式下使用
tests/system/http/example_http.py
task_http_sensor_check_async = HttpSensor(
task_id="http_sensor_check_async",
http_conn_id="http_default",
endpoint="",
deferrable=True,
request_params={},
poke_interval=5,
dag=dag,
)
HttpOperator¶
使用 HttpOperator 呼叫 HTTP 請求並獲取響應文字。
警告
透過 HttpOperator 配置 https 不直觀
由於歷史原因,透過 HTTP Operator 配置 HTTPS 連線性,嗯,既困難又不直觀。Operator 預設為 http 協議,你可以透過 scheme 連線屬性更改 Operator 使用的 scheme。然而,此欄位最初是為資料庫型別的 URI 新增到連線中的,其中資料庫 scheme 傳統上設定為 URI path 的第一個元件。因此,如果你想透過 URI 配置 https 連線,你需要將 https scheme 傳遞給 HttpOperator。儘管看起來很蠢,你的連線 URI 將是這樣:http://your_host:443/https。然後,如果你想在 HttpOperator 中使用不同的 URL path,你應該在執行任務時將你的 path 作為 endpoint 引數傳遞。例如,要向 https://your_host:443/my_endpoint 執行查詢,你需要將 endpoint 引數設定為 my_endpoint。或者,如果你願意,你也可以對包含 https:// 字首的主機進行百分比編碼,只要它包含 ://(百分比編碼後為 %3a%2f%2f),path 的第一個元件就不會被用作 scheme。你的 URI 定義可能看起來像這樣:http://https%3a%2f%2fyour_host:443/。然而,在這種情況下,path 將完全不被使用 - 如果你想使用特定的 path 發起請求,你仍然需要在任務中使用 endpoint 引數。儘管這不直觀,但從歷史上看,Operator/Hook 的工作方式就是這樣,並且在不破壞向後相容性的前提下很難更改,因為有其他基於 HttpOperator 構建的 Operator 依賴於此功能,並且已經有很多使用者在使用它了。
在第一個示例中,我們使用 json 資料呼叫 POST 請求,並在收到相同的 json 資料時成功,否則任務將失敗。
tests/system/http/example_http.py
task_post_op = HttpOperator(
task_id="post_op",
endpoint="post",
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()["json"]["priority"] == 5,
dag=dag,
)
這裡我們呼叫一個 GET 請求並向其傳遞引數。無論響應文字是什麼,任務都將成功。
tests/system/http/example_http.py
task_get_op = HttpOperator(
task_id="get_op",
method="GET",
endpoint="get",
data={"param1": "value1", "param2": "value2"},
headers={},
dag=dag,
)
HttpOperator 預設將響應體作為文字返回。如果你想在將其傳遞給下游的下一個任務之前修改響應,請使用 response_filter。這在你遇到以下情況時非常有用:
你正在使用的 API 返回大型 JSON 負載,而你只對其中一部分資料感興趣
API 返回 xml 或 csv 格式的資料,而你想將其轉換為 JSON
你對響應的頭部而不是響應體感興趣
下面是一個從 REST API 檢索資料並僅返回巢狀屬性而不是完整響應體的示例。
tests/system/http/example_http.py
task_get_op_response_filter = HttpOperator(
task_id="get_op_response_filter",
method="GET",
endpoint="get",
response_filter=lambda response: response.json()["nested"]["property"],
dag=dag,
)
在第三個示例中,我們執行 PUT 操作,根據提供給請求的資料放置/設定資料。
tests/system/http/example_http.py
task_put_op = HttpOperator(
task_id="put_op",
method="PUT",
endpoint="put",
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
dag=dag,
)
在此示例中,我們對 delete endpoint 呼叫 DELETE 操作。這次我們向請求傳遞表單資料。
tests/system/http/example_http.py
task_del_op = HttpOperator(
task_id="del_op",
method="DELETE",
endpoint="delete",
data="some=data",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)
這裡我們將表單資料傳遞給 POST 操作,這等同於通常的表單提交。
tests/system/http/example_http.py
task_post_op_formenc = HttpOperator(
task_id="post_op_formenc",
endpoint="post",
data="name=Joe",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)
HttpOperator 還允許重複呼叫 API endpoint,通常用於迴圈處理其分頁。所有 API 響應都由 Operator 儲存在記憶體中並以一個單一結果返回。因此,與非分頁呼叫相比,它可能消耗更多的記憶體和 CPU。
預設情況下,HttpOperator 的結果將成為一個 Response.text 列表(而不是一個單一的 Response.text 物件)。
示例 - 假設你的 API 返回一個包含 cursor 的 JSON body:你可以編寫一個 pagination_function,它將接收你的請求的原始 request.Response 物件,並基於此 cursor 生成新的請求引數(作為 dict)。HttpOperator 將重複呼叫 API 直到該函式停止返回任何內容。
tests/system/http/example_http.py
def get_next_page_cursor(response) -> dict | None:
"""
Take the raw `request.Response` object, and check for a cursor.
If a cursor exists, this function creates and return parameters to call
the next page of result.
"""
next_cursor = response.json().get("cursor")
if next_cursor:
return dict(data={"cursor": next_cursor})
return None
task_get_paginated = HttpOperator(
task_id="get_paginated",
method="GET",
endpoint="get",
data={"cursor": ""},
pagination_function=get_next_page_cursor,
dag=dag,
)