Apache Spark 運算子

前提條件

SparkJDBCOperator

在 Apache Spark 伺服器上啟動應用程式,它使用 SparkSubmitOperator 在基於 JDBC 的資料庫之間進行資料傳輸。

有關引數定義,請參閱 SparkJDBCOperator

使用運算子

使用 cmd_type 引數,可以在 Spark 與資料庫之間傳輸資料(spark_to_jdbc),或在資料庫與 Spark 之間傳輸資料(jdbc_to_spark),後者將使用 Spark 命令 saveAsTable 寫入表。

tests/system/apache/spark/example_spark_dag.py

jdbc_to_spark_job = SparkJDBCOperator(
    cmd_type="jdbc_to_spark",
    jdbc_table="foo",
    spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
    jdbc_driver="org.postgresql.Driver",
    metastore_table="bar",
    save_mode="overwrite",
    save_format="JSON",
    task_id="jdbc_to_spark_job",
)

spark_to_jdbc_job = SparkJDBCOperator(
    cmd_type="spark_to_jdbc",
    jdbc_table="foo",
    spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
    jdbc_driver="org.postgresql.Driver",
    metastore_table="bar",
    save_mode="append",
    task_id="spark_to_jdbc_job",
)

參考

有關更多資訊,請參閱 Apache Spark DataFrameWriter 文件

SparkSqlOperator

在 Apache Spark 伺服器上啟動應用程式,這要求 spark-sql 指令碼位於 PATH 中。該運算子將在 Spark Hive 元儲存服務上執行 SQL 查詢,sql 引數可以模板化,可以是 .sql.hql 檔案。

有關引數定義,請參閱 SparkSqlOperator

使用運算子

tests/system/apache/spark/example_spark_dag.py

spark_sql_job = SparkSqlOperator(
    sql="SELECT COUNT(1) as cnt FROM temp_table", master="local", task_id="spark_sql_job"
)

參考

有關更多資訊,請參閱 執行 Spark SQL CLI

SparkSubmitOperator

在 Apache Spark 伺服器上啟動應用程式,它使用 spark-submit 指令碼來設定 Spark 及其依賴項的類路徑,並支援 Spark 支援的不同叢集管理器和部署模式。

有關引數定義,請參閱 SparkSubmitOperator

使用運算子

tests/system/apache/spark/example_spark_dag.py

submit_job = SparkSubmitOperator(
    application="${SPARK_HOME}/examples/src/main/python/pi.py", task_id="submit_job"
)

參考

有關更多資訊,請參閱 Apache Spark 提交應用程式

這篇文章有幫助嗎?