Google Dataprep Operator¶
Dataprep 是一款智慧雲資料服務,用於視覺化探索、清理和準備資料以進行分析和機器學習。該服務可用於探索和轉換來自不同和/或大型資料集的原始資料,將其轉換為乾淨且結構化的資料,以便進一步分析和處理。Dataprep Job 是一個內部物件,編碼了執行 Cloud Dataprep job group 的一部分所需的資訊。有關該服務的更多資訊,請訪問 Google Dataprep API 文件
開始之前¶
在 Airflow 中使用 Dataprep 之前,您需要使用 TOKEN 對您的帳戶進行身份驗證。要連線 Dataprep 與 Airflow,您需要 Dataprep TOKEN。請按照 Dataprep 說明進行操作。
TOKEN 應以 JSON 格式新增到 Airflow 的 Connection 中。您可以檢視 管理連線
DataprepRunJobGroupOperator 將執行指定的 job。Operator 需要一個 recipe ID。要確定 recipe ID,請使用 runJobGroup 的 API 文件。例如,如果 URL 是 /flows/10?recipe=7,則 recipe ID 為 7。無法透過此 Operator 建立 recipe。只能透過 此處 提供的 UI 建立。某些引數可以透過 DAG 的 body 請求覆蓋。在示例 DAG 中顯示瞭如何執行此操作。
請看以下示例:設定這些欄位的值:.. code-block
Connection Id: "your_conn_id"
Extra: {"token": "TOKEN", "base_url": "https://api.clouddataprep.com"}
前提任務¶
要使用這些 Operator,您必須執行以下幾項操作
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用結算功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'安裝有詳細資訊。
執行 Job Group¶
Operator 的任務是建立一個 job group,該 job group 以經過身份驗證的使用者身份啟動指定的 job。這與在應用程式中單擊“執行 Job”按鈕執行的操作相同。
要獲取 Cloud Dataprep job 中的 job 資訊,請使用:DataprepRunJobGroupOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
run_job_group_task = DataprepRunJobGroupOperator(
task_id="run_job_group",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
body_request={
"wrangledDataset": {"id": DATASET_WRANGLED_ID},
"overrides": WRITE_SETTINGS,
},
)
獲取 Job Group 的 Job¶
Operator 的任務是獲取 Cloud Dataprep job 中的批處理 job 的資訊。
要獲取 Cloud Dataprep job 中的 job 資訊,請使用:DataprepGetJobsForJobGroupOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
get_jobs_for_job_group_task = DataprepGetJobsForJobGroupOperator(
task_id="get_jobs_for_job_group",
dataprep_conn_id=CONNECTION_ID,
job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)
獲取 Job Group¶
Operator 的任務是獲取指定的 job group。Job group 是從 flow 中的特定節點執行的 job。
要獲取 Cloud Dataprep job 中的 job 資訊,請使用:DataprepGetJobGroupOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
get_job_group_task = DataprepGetJobGroupOperator(
task_id="get_job_group",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
embed="",
include_deleted=False,
)
複製 Flow¶
Operator 的任務是複製 flow。
要獲取 Cloud Dataprep job 中的 job 資訊,請使用:DataprepCopyFlowOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
copy_task = DataprepCopyFlowOperator(
task_id="copy_flow",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
flow_id=FLOW_ID,
name=f"copy_{DATASET_NAME}",
)
執行 Flow¶
Operator 的任務是執行 flow。Flow 是用於整理邏輯的容器,包含匯入的資料集、recipe、輸出物件和引用。
要獲取 Cloud Dataprep job 中的 job 資訊,請使用:DataprepRunFlowOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
run_flow_task = DataprepRunFlowOperator(
task_id="run_flow",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
flow_id=FLOW_COPY_ID,
body_request={},
)
刪除 Flow¶
Operator 的任務是刪除 flow。Flow 是用於整理邏輯的容器,包含匯入的資料集、recipe、輸出物件和引用。
要獲取 Cloud Dataprep job 中的 job 資訊,請使用:DataprepDeleteFlowOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
delete_flow_task = DataprepDeleteFlowOperator(
task_id="delete_flow",
dataprep_conn_id=CONNECTION_ID,
flow_id="{{ task_instance.xcom_pull('copy_flow')['id'] }}",
)
檢查 Job Group 是否完成¶
Sensor 的任務是告知系統何時已啟動的 job group 已完成,無論成功與否。Job group 是從 flow 中的特定節點執行的 job。
要獲取 Cloud Dataprep job 中的 job 資訊,請使用:DataprepJobGroupIsFinishedSensor
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
check_flow_status_sensor = DataprepJobGroupIsFinishedSensor(
task_id="check_flow_status",
dataprep_conn_id=CONNECTION_ID,
job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)