DatabricksSqlOperator¶
使用 DatabricksSqlOperator 在 Databricks SQL warehouse 或 Databricks 叢集上執行 SQL。
使用運算子¶
該運算子針對配置的 warehouse 執行給定的 SQL 查詢。唯一必需的引數是
sql- 要執行的 SQL 查詢。有 3 種指定 SQL 查詢的方式包含 SQL 語句的簡單字串。
表示 SQL 語句的字串列表。
包含 SQL 查詢的檔名。檔案必須具有
.sql副檔名。每個查詢應以;<new_line>結尾
sql_warehouse_name(要使用的 Databricks SQL warehouse 名稱) 或http_path(Databricks SQL warehouse 或 Databricks cluster 的 HTTP 路徑) 之一。
其他引數是可選的,可以在類文件中找到。
示例¶
選擇資料¶
使用 DatabricksSqlOperator 從表中選擇資料的示例用法如下
tests/system/databricks/example_databricks_sql.py
# Example of using the Databricks SQL Operator to select data.
select = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="select_data",
sql="select * from default.my_airflow_table",
)
將資料選擇到檔案¶
使用 DatabricksSqlOperator 從表中選擇資料並存儲到檔案的示例用法如下
tests/system/databricks/example_databricks_sql.py
# Example of using the Databricks SQL Operator to select data into a file with JSONL format.
select_into_file = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="select_data_into_file",
sql="select * from default.my_airflow_table",
output_path="/tmp/1.jsonl",
output_format="jsonl",
)
執行多個語句¶
使用 DatabricksSqlOperator 執行多個 SQL 語句的示例用法如下
tests/system/databricks/example_databricks_sql.py
# Example of using the Databricks SQL Operator to perform multiple operations.
create = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="create_and_populate_table",
sql=[
"drop table if exists default.my_airflow_table",
"create table default.my_airflow_table(id int, v string)",
"insert into default.my_airflow_table values (1, 'test 1'), (2, 'test 2')",
],
)
從檔案執行多個語句¶
使用 DatabricksSqlOperator 執行檔案中的語句的示例用法如下
tests/system/databricks/example_databricks_sql.py
# Example of using the Databricks SQL Operator to select data.
# SQL statements should be in the file with name test.sql
create_file = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="create_and_populate_from_file",
sql="test.sql",
)
DatabricksSqlSensor¶
使用 DatabricksSqlSensor 執行感測器,以透過 Databricks SQL warehouse 或互動式叢集訪問表。
使用感測器¶
感測器執行使用者提供的 SQL 語句。唯一必需的引數是
sql- 感測器要執行的 SQL 查詢。sql_warehouse_name(要使用的 Databricks SQL warehouse 名稱) 或http_path(Databricks SQL warehouse 或 Databricks cluster 的 HTTP 路徑) 之一。
其他引數是可選的,可以在類文件中找到。
示例¶
配置要與感測器一起使用的 Databricks 連線。
tests/system/databricks/example_databricks_sensors.py
# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"
使用 SQL 語句探測指定的表
tests/system/databricks/example_databricks_sensors.py
# Example of using the Databricks SQL Sensor to check existence of data in a table.
sql_sensor = DatabricksSqlSensor(
databricks_conn_id=connection_id,
sql_warehouse_name=sql_warehouse_name,
catalog="hive_metastore",
task_id="sql_sensor_task",
sql="select * from hive_metastore.temp.sample_table_3 limit 1",
timeout=60 * 2,
)
DatabricksPartitionSensor¶
感測器是一種特殊型別的運算子 (Operator),其設計目標是隻做一件事 - 等待某事發生。它可以是基於時間的等待,也可以是等待檔案或外部事件,但它們所做的就是等待直到某事發生,然後成功,以便它們的下游任務可以執行。
對於 Databricks Partition Sensor,我們檢查分割槽及其相關值是否存在,如果不存在,它會等待直到分割槽值到達。等待時間和檢查間隔可以分別在 timeout 和 poke_interval 引數中配置。
使用 DatabricksPartitionSensor 執行感測器,以透過 Databricks SQL warehouse 或互動式叢集訪問表。
使用感測器¶
感測器接受使用者提供的表名和分割槽名稱/值,並生成 SQL 查詢來檢查指定的分割槽名稱/值是否存在於指定的表中。
必需的引數是
table_name(用於檢查分割槽的表的名稱)。partitions(要檢查的分割槽的名稱)。partition_operator(分割槽的比較運算子,用於值的範圍或限制,例如 partition_name >= partition_value)。支援 Databricks 比較運算子。sql_warehouse_name(要使用的 Databricks SQL warehouse 名稱) 或http_path(Databricks SQL warehouse 或 Databricks cluster 的 HTTP 路徑) 之一。
其他引數是可選的,可以在類文件中找到。
示例¶
配置要與感測器一起使用的 Databricks 連線。
tests/system/databricks/example_databricks_sensors.py
# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"
探測特定表是否存在資料/分割槽
tests/system/databricks/example_databricks_sensors.py
# Example of using the Databricks Partition Sensor to check the presence
# of the specified partition(s) in a table.
partition_sensor = DatabricksPartitionSensor(
databricks_conn_id=connection_id,
sql_warehouse_name=sql_warehouse_name,
catalog="hive_metastore",
task_id="partition_sensor_task",
table_name="sample_table_2",
schema="temp",
partitions={"date": "2023-01-03", "name": ["abc", "def"]},
partition_operator="=",
timeout=60 * 2,
)