PySpark 裝飾器

包裹在 @task.pyspark 裝飾器中的 Python 可呼叫物件,如果可用,會被注入一個 SparkSession 和 SparkContext 物件。

引數

以下引數可以傳遞給裝飾器

conn_id: str

用於連線 Spark 叢集的連線 ID。如果未指定,spark master 將設定為 local[*]

config_kwargs: dict

用於初始化 SparkConf 物件的 kwargs。這將覆蓋連線中設定的 Spark 配置選項。

示例

以下示例展示瞭如何使用 @task.pyspark 裝飾器。注意 sparksc 物件會被注入到函式中。

tests/system/apache/spark/example_pyspark.py

@task.pyspark(conn_id="spark-local")
def spark_task(spark: SparkSession, sc: SparkContext) -> pd.DataFrame:
    df = spark.createDataFrame(
        [
            (1, "John Doe", 21),
            (2, "Jane Doe", 22),
            (3, "Joe Bloggs", 23),
        ],
        ["id", "name", "age"],
    )
    df.show()

    return df.toPandas()

Spark Connect

Apache Spark 3.4 中,Spark Connect 引入了一種解耦的客戶端-伺服器架構,允許使用 DataFrame API 遠端連線到 Spark 叢集。在 Airflow 中使用 Spark Connect 是利用 PySpark 裝飾器的首選方式,因為它不需要在與 Airflow 相同的宿主機上執行 Spark 驅動程式。要使用 Spark Connect,請在您的主機 URL 前加上 sc://。例如,sc://spark-cluster:15002

身份驗證

Spark Connect 沒有內建的身份驗證。但是,gRPC HTTP/2 介面允許透過身份驗證代理使用身份驗證來與 Spark Connect 伺服器通訊。要使用身份驗證,請確保建立一個 Spark Connect 連線並設定正確的憑據。

此條目是否有幫助?