SQLExecuteQueryOperator 連線到 Apache Druid

使用 SQLExecuteQueryOperator 來針對 Apache Druid 叢集執行 SQL 查詢。

注意

之前可能使用了專用的 Druid operator。在棄用後,請改用 SQLExecuteQueryOperator

注意

請確保已安裝 apache-airflow-providers-apache-druid 軟體包以啟用 Druid 支援。

使用 Operator

使用 conn_id 引數連線到您的 Apache Druid 例項,連線元資料結構如下

Druid Airflow 連線元資料

引數

輸入

主機:字串

Druid broker 主機名或 IP 地址

Schema:字串

不適用(留空)

登入名:字串

不適用(留空)

密碼:字串

不適用(留空)

埠:整數

Druid broker 埠(預設:8082)

額外:JSON

附加連線配置,例如:{"endpoint": "/druid/v2/sql/", "method": "POST"}

使用 SQLExecuteQueryOperator 連線到 Apache Druid 的示例如下

tests/system/apache/druid/example_druid.py


    # Task: List all published datasources in Druid.
    list_datasources_task = SQLExecuteQueryOperator(
        task_id="list_datasources",
        sql="SELECT DISTINCT datasource FROM sys.segments WHERE is_published = 1",
    )

    # Task: Describe the schema for the 'wikipedia' datasource.
    # Note: This query returns column information if the datasource exists.
    describe_wikipedia_task = SQLExecuteQueryOperator(
        task_id="describe_wikipedia",
        sql=dedent("""
            SELECT COLUMN_NAME, DATA_TYPE
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_NAME = 'wikipedia'
        """).strip(),
    )

    # Task: Count rows for the 'wikipedia' datasource.
    # Here we count the segments for 'wikipedia'. If the datasource is not ingested, it returns 0.
    select_count_from_datasource = SQLExecuteQueryOperator(
        task_id="select_count_from_datasource",
        sql="SELECT COUNT(*) FROM sys.segments WHERE datasource = 'wikipedia'",
    )

參考

更多資訊,請參閱

注意

透過 SQLExecuteQueryOperator() 直接提供的引數優先於 Airflow 連線元資料中指定的引數(如 schemaloginpassword 等)。

此條目是否有幫助?