Qubole

Qubole 是一個開放、簡單且安全的資料湖平台,適用於機器學習、串流和臨時分析。Qubole 提供了一個基於 Amazon Web Services、Microsoft 和 Google Clouds 建構的自助式大數據分析平台。

Airflow 提供運算子來在 QDS 上執行任務(命令)並針對 Qubole 命令執行檢查。此外,還提供了一些感測器,用於等待雲端儲存體中出現檔案、資料夾或分割區,並透過 QDS API 檢查其是否存在。

執行任務

要執行以下命令,請使用 QuboleOperator

執行 Hive 命令

要執行顯示所有資料表的查詢,您可以使用

tests/system/providers/qubole/example_qubole.py[原始碼]

hive_show_table = QuboleOperator(
    task_id="hive_show_table",
    command_type="hivecmd",
    query="show tables",
    cluster_label="{{ params.cluster_label }}",
    fetch_logs=True,
    # If `fetch_logs`=true, will fetch qubole command logs and concatenate
    # them into corresponding airflow task logs
    tags="airflow_example_run",
    # To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id
    params={
        "cluster_label": "default",
    },
)

您也可以透過將路徑傳遞到查詢檔案來執行位於儲存空間中的腳本

tests/system/providers/qubole/example_qubole.py[原始碼]

hive_s3_location = QuboleOperator(
    task_id="hive_s3_location",
    command_type="hivecmd",
    script_location="s3n://public-qubole/qbol-library/scripts/show_table.hql",
    notify=True,
    tags=["tag1", "tag2"],
    # If the script at s3 location has any qubole specific macros to be replaced
    # macros='[{"date": "{{ ds }}"}, {"name" : "abc"}]',
)

執行 Hadoop 命令

要在您的 Hadoop 叢集中執行 jar 檔案,請使用

tests/system/providers/qubole/example_qubole.py[原始碼]

hadoop_jar_cmd = QuboleOperator(
    task_id="hadoop_jar_cmd",
    command_type="hadoopcmd",
    sub_command="jar s3://paid-qubole/HadoopAPIExamples/"
    "jars/hadoop-0.20.1-dev-streaming.jar "
    "-mapper wc "
    "-numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/"
    "data/3.tsv -output "
    "s3://paid-qubole/HadoopAPITests/data/3_wc",
    cluster_label="{{ params.cluster_label }}",
    fetch_logs=True,
    params={
        "cluster_label": "default",
    },
)

執行 Pig 命令

要在您的 Hadoop 叢集中以 *Pig Latin* 執行腳本,請使用

tests/system/providers/qubole/example_qubole.py[原始碼]

pig_cmd = QuboleOperator(
    task_id="pig_cmd",
    command_type="pigcmd",
    script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
    parameters="key1=value1 key2=value2",
)

執行 Shell 命令

要執行 Shell 腳本,請使用

tests/system/providers/qubole/example_qubole.py[原始碼]

shell_cmd = QuboleOperator(
    task_id="shell_cmd",
    command_type="shellcmd",
    script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
    parameters="param1 param2",
)

執行 Presto 命令

要使用 Presto 執行查詢,請使用

tests/system/providers/qubole/example_qubole.py[原始碼]

presto_cmd = QuboleOperator(task_id="presto_cmd", command_type="prestocmd", query="show tables")

執行資料庫命令

要以 DbTap 執行查詢,請使用

tests/system/providers/qubole/example_qubole.py[原始碼]

db_query = QuboleOperator(
    task_id="db_query", command_type="dbtapquerycmd", query="show tables", db_tap_id=2064
)

要執行資料庫匯出命令,請使用

tests/system/providers/qubole/example_qubole.py[原始碼]

db_export = QuboleOperator(
    task_id="db_export",
    command_type="dbexportcmd",
    mode=1,
    hive_table="default_qubole_airline_origin_destination",
    db_table="exported_airline_origin_destination",
    partition_spec="dt=20110104-02",
    dbtap_id=2064,
)

要執行資料庫匯入命令,請使用

tests/system/providers/qubole/example_qubole.py[原始碼]

db_import = QuboleOperator(
    task_id="db_import",
    command_type="dbimportcmd",
    mode=1,
    hive_table="default_qubole_airline_origin_destination",
    db_table="exported_airline_origin_destination",
    where_clause="id < 10",
    parallelism=2,
    dbtap_id=2064,
)

執行 Spark 命令

要以 Spark 作業執行 Scala 腳本,請使用

tests/system/providers/qubole/example_qubole.py[原始碼]

prog = """
import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}
"""

spark_cmd = QuboleOperator(
    task_id="spark_cmd",
    command_type="sparkcmd",
    program=prog,
    language="scala",
    arguments="--class SparkPi",
    tags="airflow_example_run",
)

檔案感測器

QuboleFileSensor 的用法範例。

檔案或目錄是否存在

要等待叢集中是否存在檔案或目錄,請使用

tests/system/providers/qubole/example_qubole_sensors.py[原始碼]

check_s3_file = QuboleFileSensor(
    task_id="check_s3_file",
    poke_interval=60,
    timeout=600,
    data={
        "files": [
            "s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar",
            "s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv",
        ]  # will check for availability of all the files in array
    },
)

分割區感測器

QubolePartitionSensor 的使用範例。

分割區存在與否

如需等待叢集中是否存在表格分割區,請使用

tests/system/providers/qubole/example_qubole_sensors.py[原始碼]

check_hive_partition = QubolePartitionSensor(
    task_id="check_hive_partition",
    poke_interval=10,
    timeout=60,
    data={
        "schema": "default",
        "table": "my_partitioned_table",
        "columns": [
            {"column": "month", "values": ["{{ ds.split('-')[1] }}"]},
            {"column": "day", "values": ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]},
        ],  # will check for partitions like [month=12/day=12,month=12/day=13]
    },
)

參考

如需更多資訊,請參閱

這篇文章對您有幫助嗎?