SnowparkOperator

使用 SnowparkOperatorSnowflake 資料庫中執行 Snowpark Python 程式碼。

警告

  • Snowpark 暫不支援 Python 3.12。

  • 當前,此 Operator 不支援 Snowpark pandas API,因為 Airflow 中使用的 pandas 版本存在衝突。請考慮與其他 Snowpark Decorator 或 Operator 一起使用 Snowpark pandas API。

提示

推薦使用 @task.snowpark Decorator 來執行 Snowpark Python 程式碼,而不是使用 SnowparkOperator

先決條件任務

要使用此 Operator,您需要執行一些步驟

  • 透過 pip 安裝 Provider 包。

    pip install 'apache-airflow-providers-snowflake'
    

    有關詳細資訊,請參閱 安裝

  • 設定 Snowflake 連線.

使用 Operator

使用 snowflake_conn_id 引數指定要使用的連線。如果未指定,將使用 snowflake_default

@task.snowpark 的使用示例如下

tests/system/snowflake/example_snowpark_operator.py

    def setup_data(session: Session):
        # The Snowpark session object is injected as an argument
        data = [
            (1, 0, 5, "Product 1", "prod-1", 1, 10),
            (2, 1, 5, "Product 1A", "prod-1-A", 1, 20),
            (3, 1, 5, "Product 1B", "prod-1-B", 1, 30),
            (4, 0, 10, "Product 2", "prod-2", 2, 40),
            (5, 4, 10, "Product 2A", "prod-2-A", 2, 50),
            (6, 4, 10, "Product 2B", "prod-2-B", 2, 60),
            (7, 0, 20, "Product 3", "prod-3", 3, 70),
            (8, 7, 20, "Product 3A", "prod-3-A", 3, 80),
            (9, 7, 20, "Product 3B", "prod-3-B", 3, 90),
            (10, 0, 50, "Product 4", "prod-4", 4, 100),
            (11, 10, 50, "Product 4A", "prod-4-A", 4, 100),
            (12, 10, 50, "Product 4B", "prod-4-B", 4, 100),
        ]
        columns = ["id", "parent_id", "category_id", "name", "serial_number", "key", "3rd"]
        df = session.create_dataframe(data, schema=columns)
        table_name = "sample_product_data"
        df.write.save_as_table(table_name, mode="overwrite")
        return table_name

    setup_data_operator = SnowparkOperator(
        task_id="setup_data",
        python_callable=setup_data,
        dag=dag,
    )

    def check_num_rows(table_name: str):
        # Alternatively, retrieve the Snowpark session object using `get_active_session`
        from snowflake.snowpark.context import get_active_session

        session = get_active_session()
        df = session.table(table_name)
        assert df.count() == 12

    check_num_rows_operator = SnowparkOperator(
        task_id="check_num_rows",
        python_callable=check_num_rows,
        op_kwargs={"table_name": "{{ task_instance.xcom_pull(task_ids='setup_data') }}"},
        dag=dag,
    )

    setup_data_operator >> check_num_rows_operator

如示例所示,有兩種方法在 Python 函式中使用 Snowpark session 物件

  • 將 Snowpark session 物件作為名為 session 的關鍵字引數傳遞給函式。Snowpark session 將自動注入到函式中,使您可以像往常一樣使用它。

  • 使用 Snowpark 中的 get_active_session 函式在函式內部檢索 Snowpark session 物件。

注意

可以傳遞給 Operator 的引數將優先於 Airflow 連線元資料中已有的引數(例如 schemaroledatabase 等)。

此條目有幫助嗎?