Apache Spark 運算子¶
前提條件¶
要使用
SparkSubmitOperator,您必須配置 Spark 連線。要使用
SparkJDBCOperator,您必須同時配置 Spark 連線 和 JDBC 連線。SparkSqlOperator的所有配置均來自運算子引數。
SparkJDBCOperator¶
在 Apache Spark 伺服器上啟動應用程式,它使用 SparkSubmitOperator 在基於 JDBC 的資料庫之間進行資料傳輸。
有關引數定義,請參閱 SparkJDBCOperator。
使用運算子¶
使用 cmd_type 引數,可以在 Spark 與資料庫之間傳輸資料(spark_to_jdbc),或在資料庫與 Spark 之間傳輸資料(jdbc_to_spark),後者將使用 Spark 命令 saveAsTable 寫入表。
tests/system/apache/spark/example_spark_dag.py
jdbc_to_spark_job = SparkJDBCOperator(
cmd_type="jdbc_to_spark",
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="overwrite",
save_format="JSON",
task_id="jdbc_to_spark_job",
)
spark_to_jdbc_job = SparkJDBCOperator(
cmd_type="spark_to_jdbc",
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="append",
task_id="spark_to_jdbc_job",
)
參考¶
有關更多資訊,請參閱 Apache Spark DataFrameWriter 文件。
SparkSqlOperator¶
在 Apache Spark 伺服器上啟動應用程式,這要求 spark-sql 指令碼位於 PATH 中。該運算子將在 Spark Hive 元儲存服務上執行 SQL 查詢,sql 引數可以模板化,可以是 .sql 或 .hql 檔案。
有關引數定義,請參閱 SparkSqlOperator。
使用運算子¶
tests/system/apache/spark/example_spark_dag.py
spark_sql_job = SparkSqlOperator(
sql="SELECT COUNT(1) as cnt FROM temp_table", master="local", task_id="spark_sql_job"
)
參考¶
有關更多資訊,請參閱 執行 Spark SQL CLI。
SparkSubmitOperator¶
在 Apache Spark 伺服器上啟動應用程式,它使用 spark-submit 指令碼來設定 Spark 及其依賴項的類路徑,並支援 Spark 支援的不同叢集管理器和部署模式。
有關引數定義,請參閱 SparkSubmitOperator。
使用運算子¶
tests/system/apache/spark/example_spark_dag.py
submit_job = SparkSubmitOperator(
application="${SPARK_HOME}/examples/src/main/python/pi.py", task_id="submit_job"
)
參考¶
有關更多資訊,請參閱 Apache Spark 提交應用程式。