airflow.providers.apache.spark.operators.spark_jdbc

SparkJDBCOperator

擴充套件 SparkSubmitOperator 以使用 Apache Spark 執行到/從基於 JDBC 的資料庫的資料傳輸。

模組內容

class airflow.providers.apache.spark.operators.spark_jdbc.SparkJDBCOperator(*, spark_app_name='airflow-spark-jdbc', spark_conn_id='spark-default', spark_conf=None, spark_py_files=None, spark_files=None, spark_jars=None, cmd_type='spark_to_jdbc', jdbc_table=None, jdbc_conn_id='jdbc-default', jdbc_driver=None, metastore_table=None, jdbc_truncate=False, save_mode=None, save_format=None, batch_size=None, fetch_size=None, num_partitions=None, partition_column=None, lower_bound=None, upper_bound=None, create_table_column_types=None, **kwargs)[source]

基類:airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator

擴充套件 SparkSubmitOperator 以使用 Apache Spark 執行到/從基於 JDBC 的資料庫的資料傳輸。

與 SparkSubmitOperator 一樣,它假定 “spark-submit” 二進位制檔案在 PATH 環境變數中可用。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:SparkJDBCOperator

引數
  • spark_app_name (str) – 作業名稱 (預設 airflow-spark-jdbc)

  • spark_conn_id (str) – 在 Airflow 管理介面中配置的 spark 連線 ID

  • spark_conf (dict[str, Any] | None) – 任何附加的 Spark 配置屬性

  • spark_py_files (str | None) – 使用的額外 Python 檔案 (.zip, .egg, 或 .py)

  • spark_files (str | None) – 要上傳到執行作業的容器的額外檔案

  • spark_jars (str | None) – 要上傳並新增到驅動程式和執行程式類路徑的額外 jar 包

  • cmd_type (str) – 資料流動的方向。2 個可能的值: spark_to_jdbc:Spark 將資料從 metastore 寫入 jdbc jdbc_to_spark:Spark 將資料從 jdbc 寫入 metastore

  • jdbc_table (str | None) – JDBC 表的名稱

  • jdbc_conn_id (str) – 用於連線 JDBC 資料庫的連線 ID

  • jdbc_driver (str | None) – 要用於 JDBC 連線的 JDBC 驅動程式名稱。該驅動程式(通常是 jar 檔案)應透過 'jars' 引數傳遞

  • metastore_table (str | None) – Metastore 表的名稱,

  • jdbc_truncate (bool) – (僅限 spark_to_jdbc) Spark 是否應截斷或刪除並重新建立 JDBC 表。這僅在 'save_mode' 設定為 Overwrite 時生效。此外,如果模式不同,Spark 無法截斷,並將刪除並重新建立

  • save_mode (str | None) – 要使用的 Spark 儲存模式 (例如 overwrite, append 等)

  • save_format (str | None) – (僅限 jdbc_to_spark) 要使用的 Spark 儲存格式 (例如 parquet)

  • batch_size (int | None) – (僅限 spark_to_jdbc) 每次與 JDBC 資料庫往返時插入的批處理大小。預設為 1000

  • fetch_size (int | None) – (僅限 jdbc_to_spark) 每次從 JDBC 資料庫往返時獲取的批處理大小。預設值取決於 JDBC 驅動程式

  • num_partitions (int | None) – Spark 可以同時使用的最大分割槽數,適用於 spark_to_jdbc 和 jdbc_to_spark 操作。這也將限制可以開啟的 JDBC 連線數

  • partition_column (str | None) – (僅限 jdbc_to_spark) 用於按此列對 metastore 表進行分割槽的數字列。如果指定,您還必須指定:num_partitions, lower_bound, upper_bound

  • lower_bound (str | None) – (僅限 jdbc_to_spark) 要獲取的數字分割槽列範圍的下限。如果指定,您還必須指定:num_partitions, partition_column, upper_bound

  • upper_bound (str | None) – (僅限 jdbc_to_spark) 要獲取的數字分割槽列範圍的上限。如果指定,您還必須指定:num_partitions, partition_column, lower_bound

  • create_table_column_types (str | None) – (僅限 spark_to_jdbc) 建立表時,用於代替預設資料型別的資料庫列資料型別。資料型別資訊應按照與 CREATE TABLE columns 語法相同的格式指定 (例如: “name CHAR(64), comments VARCHAR(1024)”)。指定的型別應為有效的 Spark SQL 資料型別。

  • kwargs (Any) – 傳遞給 SparkSubmitOperator 的 kwargs。

execute(context)[source]

呼叫 SparkSubmitHook 來執行提供的 Spark 作業。

on_kill()[source]

覆蓋此方法以在任務例項被終止時清理子程序。

在運算子內使用 threading、subprocess 或 multiprocessing 模組的任何地方都需要進行清理,否則會留下殭屍程序。

此條目是否有幫助?