Papermill

Apache Airflow 支援與 Papermill 整合。Papermill 是一個用於引數化和執行 Jupyter Notebook 的工具。也許你有一個財務報告,你想在每月的月初或月末,或每年的年初或年末使用不同的數值來執行它。在你的 notebook 中使用 引數 並使用 PapermillOperator 讓這一切變得輕而易舉。

用法

建立 notebook

要引數化你的 notebook,請指定一個帶有 parameters 標籤的單元格。Papermill 會尋找 parameters 單元格,並將該單元格視為執行時傳入引數的預設值。Papermill 會新增一個帶有 injected-parameters 標籤的新單元格,其中包含輸入引數,以覆蓋 parameters 中的值。如果沒有單元格被標記為 parameters,注入的單元格將被插入到 notebook 的頂部。

請確保你將 notebook 儲存到 Airflow 可以訪問的地方。Papermill 支援 S3、GCS、Azure 和本地檔案系統。不支援 HDFS。

示例 DAG

使用 PapermillOperator 執行 jupyter notebook

tests/system/papermill/example_papermill.py

run_this = PapermillOperator(
    task_id="run_example_notebook",
    input_nb="/tmp/hello_world.ipynb",
    output_nb="/tmp/out-{{ execution_date }}.ipynb",
    parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
)

驗證 notebook 中訊息的示例 DAG

tests/system/papermill/example_papermill_verify.py

@task
def check_notebook(output, logical_date):
    """
    Verify the message in the notebook
    """
    notebook = sb.read_notebook(output.url)
    message = notebook.scraps["message"]
    print(f"Message in notebook {message} for {logical_date}")

    if message.data != f"Ran from Airflow at {logical_date}!":
        return False

    return True


with DAG(
    dag_id="example_papermill_operator_verify",
    schedule=SCHEDULE_INTERVAL,
    start_date=START_DATE,
    dagrun_timeout=DAGRUN_TIMEOUT,
    catchup=False,
) as dag:
    run_this = PapermillOperator(
        task_id="run_example_notebook",
        input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
        output_nb="/tmp/out-{{ logical_date }}.ipynb",
        parameters={"msgs": "Ran from Airflow at {{ logical_date }}!"},
    )

    check_notebook(output=run_this.output, logical_date="{{ logical_date }}")

使用遠端 jupyter kernel 驗證 notebook 中訊息的示例 DAG

tests/system/papermill/example_papermill_remote_verify.py

@task
def check_notebook(output_notebook, execution_date):
    """
    Verify the message in the notebook
    """
    notebook = sb.read_notebook(output_notebook)
    message = notebook.scraps["message"]
    print(f"Message in notebook {message} for {execution_date}")

    if message.data != f"Ran from Airflow at {execution_date}!":
        return False

    return True


with DAG(
    dag_id="example_papermill_operator_remote_verify",
    schedule="@once",
    start_date=START_DATE,
    dagrun_timeout=DAGRUN_TIMEOUT,
    catchup=False,
) as dag:
    run_this = PapermillOperator(
        task_id="run_example_notebook",
        input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
        output_nb="/tmp/out-{{ execution_date }}.ipynb",
        parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
        kernel_conn_id="jupyter_kernel_default",
    )

    run_this >> check_notebook(
        output_notebook="/tmp/out-{{ execution_date }}.ipynb", execution_date="{{ execution_date }}"
    )

此條目有幫助嗎?