SFTP 感測器

使用 SFTP 協議在伺服器上查詢特定檔案或符合特定模式的檔案。要獲取有關此感測器的更多資訊,請訪問 SFTPSensor

tests/system/sftp/example_sftp_sensor.py

sftp_with_operator = SFTPSensor(task_id="sftp_operator", path=FULL_FILE_PATH, poke_interval=10)

我們也可以使用 Taskflow API。它接受與 SFTPSensor 相同的引數,以及:

op_args(可選)

一個位置引數列表,將在呼叫可呼叫物件時解包(模板化)

op_kwargs(可選)

一個關鍵字引數字典,將在您的函式中解包(模板化)

Python 可呼叫物件返回的任何內容都將放入 XCom。

tests/system/sftp/example_sftp_sensor.py

@task.sftp_sensor(  # type: ignore[attr-defined]
    task_id="sftp_sensor",  # type: ignore[attr-defined]
    path=FULL_FILE_PATH,
    poke_interval=10,
)
def sftp_sensor_decorator():
    print("Files were successfully found!")
    # add your logic
    return "done"

在可延遲模式下檢查 SFTP 伺服器上是否存在檔案

tests/system/sftp/example_sftp_sensor.py

sftp_sensor_with_async = SFTPSensor(
    task_id="sftp_operator_async", path=FULL_FILE_PATH, poke_interval=10, deferrable=True
)

本條目有幫助嗎?