SFTP 感測器¶
使用 SFTP 協議在伺服器上查詢特定檔案或符合特定模式的檔案。要獲取有關此感測器的更多資訊,請訪問 SFTPSensor
tests/system/sftp/example_sftp_sensor.py
sftp_with_operator = SFTPSensor(task_id="sftp_operator", path=FULL_FILE_PATH, poke_interval=10)
我們也可以使用 Taskflow API。它接受與 SFTPSensor 相同的引數,以及:
- op_args(可選)
一個位置引數列表,將在呼叫可呼叫物件時解包(模板化)
- op_kwargs(可選)
一個關鍵字引數字典,將在您的函式中解包(模板化)
Python 可呼叫物件返回的任何內容都將放入 XCom。
tests/system/sftp/example_sftp_sensor.py
@task.sftp_sensor( # type: ignore[attr-defined]
task_id="sftp_sensor", # type: ignore[attr-defined]
path=FULL_FILE_PATH,
poke_interval=10,
)
def sftp_sensor_decorator():
print("Files were successfully found!")
# add your logic
return "done"
在可延遲模式下檢查 SFTP 伺服器上是否存在檔案
tests/system/sftp/example_sftp_sensor.py
sftp_sensor_with_async = SFTPSensor(
task_id="sftp_operator_async", path=FULL_FILE_PATH, poke_interval=10, deferrable=True
)