@task.snowpark¶
使用 @task.snowpark 裝飾器在 Snowflake 資料庫中執行 Snowpark Python 程式碼。
警告
Snowpark 尚不支援 Python 3.12。
目前,此裝飾器不支援 Snowpark pandas API,因為 Airflow 中使用了衝突的 pandas 版本。請考慮使用 Snowpark pandas API 配合其他 Snowpark 裝飾器或運算子。
前置任務¶
要使用此裝飾器,您必須完成以下幾項任務
透過 pip 安裝 provider package 包。
pip install 'apache-airflow-providers-snowflake'有關詳細資訊,請參閱安裝。
使用運算子¶
使用 snowflake_conn_id 引數指定使用的連線。如果未指定,將使用 snowflake_default。
`@task.snowpark` 裝飾器的示例用法如下
tests/system/snowflake/example_snowpark_decorator.py
@task.snowpark
def setup_data(session: Session):
# The Snowpark session object is injected as an argument
data = [
(1, 0, 5, "Product 1", "prod-1", 1, 10),
(2, 1, 5, "Product 1A", "prod-1-A", 1, 20),
(3, 1, 5, "Product 1B", "prod-1-B", 1, 30),
(4, 0, 10, "Product 2", "prod-2", 2, 40),
(5, 4, 10, "Product 2A", "prod-2-A", 2, 50),
(6, 4, 10, "Product 2B", "prod-2-B", 2, 60),
(7, 0, 20, "Product 3", "prod-3", 3, 70),
(8, 7, 20, "Product 3A", "prod-3-A", 3, 80),
(9, 7, 20, "Product 3B", "prod-3-B", 3, 90),
(10, 0, 50, "Product 4", "prod-4", 4, 100),
(11, 10, 50, "Product 4A", "prod-4-A", 4, 100),
(12, 10, 50, "Product 4B", "prod-4-B", 4, 100),
]
columns = ["id", "parent_id", "category_id", "name", "serial_number", "key", "3rd"]
df = session.create_dataframe(data, schema=columns)
table_name = "sample_product_data"
df.write.save_as_table(table_name, mode="overwrite")
return table_name
table_name = setup_data() # type: ignore[call-arg, misc]
@task.snowpark
def check_num_rows(table_name: str):
# Alternatively, retrieve the Snowpark session object using `get_active_session`
from snowflake.snowpark.context import get_active_session
session = get_active_session()
df = session.table(table_name)
assert df.count() == 12
check_num_rows(table_name)
如示例所示,在 Python 函式中有兩種使用 Snowpark session 物件的方法
將 Snowpark session 物件作為名為
session的關鍵字引數傳遞給函式。Snowpark session 將被自動注入到函式中,您可以像往常一樣使用它。使用來自 Snowpark 的 get_active_session 函式在函式內部檢索 Snowpark session 物件。
注意
可以傳遞給裝飾器的引數將優先於 Airflow 連線元資料中已有的引數(例如 schema、role、database 等)。