BashOperator

使用 BashOperatorBash shell 中執行命令。要執行的 Bash 命令或指令碼由以下內容決定:

  1. 使用 BashOperator 時的 bash_command 引數,或

  2. 如果使用 TaskFlow 裝飾器 @task.bash,由被裝飾的可呼叫物件返回的非空字串值。

提示

推薦使用 @task.bash 裝飾器執行 Bash 命令,而不是經典的 BashOperator

airflow/example_dags/example_bash_decorator.py

@task.bash
def run_after_loop() -> str:
    return "echo https://airflow.apache.tw/"

run_this = run_after_loop()

airflow/example_dags/example_bash_operator.py

run_this = BashOperator(
    task_id="run_after_loop",
    bash_command="echo https://airflow.apache.tw/",
)

模板化

您可以使用 Jinja 模板來引數化 Bash 命令。

airflow/example_dags/example_bash_decorator.py

@task.bash
def also_run_this() -> str:
    return 'echo "ti_key={{ task_instance_key_str }}"'

also_this = also_run_this()

airflow/example_dags/example_bash_operator.py

also_run_this = BashOperator(
    task_id="also_run_this",
    bash_command='echo "ti_key={{ task_instance_key_str }}"',
)

使用 @task.bash TaskFlow 裝飾器允許您返回格式化字串,並利用所有執行上下文變數可直接訪問被裝飾的任務的優勢。

airflow/example_dags/example_bash_decorator.py

@task.bash
def also_run_this_again(task_instance_key_str) -> str:
    return f'echo "ti_key={task_instance_key_str}"'

also_this_again = also_run_this_again()

我們鼓勵您利用這種方法,因為它非常適合整體 TaskFlow 範例。

注意

在使用 Jinja 模板生成 Bash 命令時,對於“使用者”輸入應謹慎處理,因為不會對 Bash 命令進行轉義和淨化。

這主要適用於使用 dag_run.conf,因為使用者可以透過 Web UI 提交它。大多數預設模板變數沒有風險。

例如,請勿執行以下操作:

@task.bash
def bash_task() -> str:
    return 'echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"'


# Or directly accessing `dag_run.conf`
@task.bash
def bash_task(dag_run) -> str:
    message = dag_run.conf["message"] if dag_run.conf else ""
    return f'echo "here is the message: {message}"'
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"',
)

相反,您應該透過 env kwarg 傳遞此值,並在 Bash 命令內部使用雙引號。

@task.bash(env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'})
def bash_task() -> str:
    return "echo \"here is the message: '$message'\""
bash_task = BashOperator(
    task_id="bash_task",
    bash_command="echo \"here is the message: '$message'\"",
    env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'},
)

跳過

通常,非零退出程式碼會引發 AirflowException,從而導致任務失敗。在某些情況下,希望任務結束於 skipped 狀態,您可以使用退出程式碼 99 (或者如果您傳遞了 skip_on_exit_code,則使用另一個退出程式碼)來退出。

airflow/example_dags/example_bash_decorator.py

@task.bash
def this_will_skip() -> str:
    return 'echo "hello world"; exit 99;'

this_skips = this_will_skip()

airflow/example_dags/example_bash_operator.py

this_will_skip = BashOperator(
    task_id="this_will_skip",
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)

輸出處理器

output_processor 引數允許您指定一個 lambda 函式,該函式在 bash 指令碼的輸出被推送到 XCom 之前對其進行處理。此功能在 BashOperator 中直接操作指令碼輸出而無需額外的 operator 或任務時特別有用。

例如,考慮一個 bash 指令碼輸出為 JSON 字串的場景。使用 output_processor,您可以將此字串轉換為 JSON 物件,然後再將其儲存到 XCom 中。這簡化了工作流,並確保下游任務以所需的格式接收處理後的資料。

以下是如何在 BashOperator 中使用 result_processor 的示例:

@task.bash(output_processor=lambda output: json.loads(output))
def bash_task() -> str:
    return """
        jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
        example.json
    """
bash_task = BashOperator(
    task_id="filter_today_changes",
    bash_command="""
        jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
        example.json
    """,
    output_processor=lambda output: json.loads(output),
)

從檔案執行命令

BashOperator@task.bash TaskFlow 裝飾器都允許您執行儲存在檔案中的 Bash 命令。檔案必須具有 .sh.bash 副檔名。

使用 Jinja 模板

您可以執行包含 Jinja 模板的 bash 指令碼。當您這樣做時,Airflow 會載入檔案內容,渲染模板,並將渲染後的指令碼寫入臨時檔案。預設情況下,檔案放置在臨時目錄(/tmp 下)中。您可以使用 cwd 引數更改此位置。

注意

Airflow 必須對 /tmpcwd 目錄具有寫訪問許可權,才能將臨時檔案寫入磁碟。

要執行 bash 指令碼,請將其放置在相對於包含 DAG 檔案的目錄的位置。因此,如果您的 DAG 檔案位於 /usr/local/airflow/dags/test_dag.py,您可以將 test.sh 檔案移動到 /usr/local/airflow/dags/ 下的任何位置(示例:/usr/local/airflow/dags/scripts/test.sh),並如下所示將相對路徑傳遞給 bash_command

@task.bash
def bash_example():
    # "scripts" folder is under "/usr/local/airflow/dags"
    return "scripts/test.sh"
t2 = BashOperator(
    task_id="bash_example",
    # "scripts" folder is under "/usr/local/airflow/dags"
    bash_command="scripts/test.sh",
)

出於多種原因,為 Bash 指令碼建立單獨的資料夾可能是理想的,例如分離指令碼邏輯和管道程式碼,允許在不同語言編寫的檔案中進行適當的程式碼高亮顯示,以及管道結構的總體靈活性。

也可以在 DAG 建構函式呼叫中將 template_searchpath 指向任何資料夾位置。

@dag(..., template_searchpath="/opt/scripts")
def example_bash_dag():
    @task.bash
    def bash_example():
        return "test.sh "
with DAG("example_bash_dag", ..., template_searchpath="/opt/scripts"):
    t2 = BashOperator(
        task_id="bash_example",
        bash_command="test.sh ",
    )

不使用 Jinja 模板

如果您的指令碼不包含任何 Jinja 模板,請在指令碼名稱後新增一個空格以停用 Airflow 的渲染。

@task.bash
def run_command_from_script() -> str:
    return "$AIRFLOW_HOME/scripts/example.sh "


run_script = run_command_from_script()
run_script = BashOperator(
    task_id="run_command_from_script",
    bash_command="$AIRFLOW_HOME/scripts/example.sh ",
)

未找到 Jinja 模板

如果您在嘗試執行 Bash 指令碼時遇到“Template not found”異常,請在指令碼名稱後新增一個空格。這是因為 Airflow 嘗試對其應用 Jinja 模板,這將導致失敗。

@task.bash
def bash_example():
    # This fails with 'Jinja template not found' error
    # return "/home/batcher/test.sh",
    # This works (has a space after)
    return "/home/batcher/test.sh "
BashOperator(
    task_id="bash_example",
    # This fails with 'Jinja template not found' error
    # bash_command="/home/batcher/test.sh",
    # This works (has a space after)
    bash_command="/home/batcher/test.sh ",
)

但是,如果您想在 Bash 指令碼中使用模板,請不要新增空格,而是檢視使用 Jinja 模板的 bash 指令碼部分。

用 Python 豐富 Bash

@task.bash TaskFlow 裝飾器允許您在任務中將 Bash 和 Python 結合成一個強大的組合。

@task.bash 任務中使用 Python 條件語句、其他函式呼叫等,可以幫助定義、增強甚至構建要執行的 Bash 命令。

例如,使用條件邏輯來確定任務行為:

airflow/example_dags/example_bash_decorator.py

@task.bash
def sleep_in(day: str) -> str:
    if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
        return f"sleep {60 * 60}"
    raise AirflowSkipException("No sleeping in today!")

sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")

或者呼叫函式來幫助構建 Bash 命令:

airflow/example_dags/example_bash_decorator.py

def _get_files_in_cwd() -> list[str]:
    from pathlib import Path

    dir_contents = Path.cwd().glob("airflow-core/src/airflow/example_dags/*.py")
    files = [str(elem) for elem in dir_contents if elem.is_file()]

    return files

@task.bash
def get_file_stats() -> str:
    from shlex import join

    files = _get_files_in_cwd()
    cmd = join(["stat", *files])

    return cmd

get_file_stats()

這種預執行增強的可能性有很多。

此條目是否有幫助?