Google Cloud Dataproc Metastore 運算子

Dataproc Metastore 是執行在 Google Cloud 上的一個完全託管、高可用、自動修復的無伺服器 Apache Hive Metastore (HMS)。它支援 HMS,作為管理關係實體元資料的關鍵元件,併為開源資料生態系統中的資料處理應用程式提供互操作性。

有關該服務的更多資訊,請訪問 Dataproc Metastore 產品文件 <產品文件

建立服務

在建立 Dataproc Metastore 服務之前,您需要定義該服務。有關建立服務時可傳遞的欄位的更多資訊,請訪問 Dataproc Metastore 建立服務 API。

一個簡單的服務配置如下所示

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

SERVICE = {
    "name": "test-service",
}

透過此配置,我們可以建立服務:DataprocMetastoreCreateServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

create_service = DataprocMetastoreCreateServiceOperator(
    task_id="create_service",
    region=REGION,
    project_id=PROJECT_ID,
    service=SERVICE,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

獲取服務

要獲取服務,您可以使用

DataprocMetastoreGetServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

get_service = DataprocMetastoreGetServiceOperator(
    task_id="get_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
)

更新服務

您可以透過提供服務配置和 updateMask 來更新服務。在 updateMask 引數中,您指定了要更新欄位的路徑,該路徑相對於 Service。有關 updateMask 和其他引數的更多資訊,請查閱 Dataproc Metastore 更新服務 API。

一個新的服務配置和 updateMask 示例

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

SERVICE_TO_UPDATE = {
    "labels": {
        "mylocalmachine": "mylocalmachine",
        "systemtest": "systemtest",
    }
}
UPDATE_MASK = FieldMask(paths=["labels"])

要更新服務,您可以使用:DataprocMetastoreUpdateServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

update_service = DataprocMetastoreUpdateServiceOperator(
    task_id="update_service",
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    region=REGION,
    service=SERVICE_TO_UPDATE,
    update_mask=UPDATE_MASK,
    timeout=TIMEOUT,
)

刪除服務

要刪除服務,您可以使用

DataprocMetastoreDeleteServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

delete_service = DataprocMetastoreDeleteServiceOperator(
    task_id="delete_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

匯出服務元資料

要匯出元資料,您可以使用

DataprocMetastoreExportMetadataOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

export_metadata = DataprocMetastoreExportMetadataOperator(
    task_id="export_metadata",
    destination_gcs_folder=DESTINATION_GCS_FOLDER,
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

恢復服務

要恢復服務,您可以使用

DataprocMetastoreRestoreServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

restore_service = DataprocMetastoreRestoreServiceOperator(
    task_id="restore_metastore",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    backup_region=REGION,
    backup_project_id=PROJECT_ID,
    backup_service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

建立元資料匯入

在建立 Dataproc Metastore 元資料匯入之前,您需要定義該元資料匯入。有關建立元資料匯入時可傳遞的欄位的更多資訊,請訪問 Dataproc Metastore 建立元資料匯入 API。

一個簡單的元資料匯入配置如下所示

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

METADATA_IMPORT = {
    "name": "test-metadata-import",
    "database_dump": {
        "gcs_uri": GCS_URI,
        "database_type": DB_TYPE,
    },
}

要建立元資料匯入,您可以使用:DataprocMetastoreCreateMetadataImportOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py

import_metadata = DataprocMetastoreCreateMetadataImportOperator(
    task_id="import_metadata",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    metadata_import=METADATA_IMPORT,
    metadata_import_id=METADATA_IMPORT_ID,
    timeout=TIMEOUT,
)

建立備份

在建立 Dataproc Metastore 服務備份之前,您需要定義該備份。有關建立備份時可傳遞的欄位的更多資訊,請訪問 Dataproc Metastore 建立備份 API。

一個簡單的備份配置如下所示

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

BACKUP = {
    "name": "test-backup",
}

透過此配置,我們可以建立備份:DataprocMetastoreCreateBackupOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

backup_service = DataprocMetastoreCreateBackupOperator(
    task_id="create_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup=BACKUP,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

刪除備份

要刪除備份,您可以使用

DataprocMetastoreDeleteBackupOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

delete_backup = DataprocMetastoreDeleteBackupOperator(
    task_id="delete_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

列出備份

要列出備份,您可以使用

DataprocMetastoreListBackupsOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py

list_backups = DataprocMetastoreListBackupsOperator(
    task_id="list_backups",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
)

檢查 Hive 分割槽是否存在

要檢查 Metastore 中是否已為給定表建立了 Hive 分割槽,您可以使用:MetastoreHivePartitionSensor

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py

hive_partition_sensor = MetastoreHivePartitionSensor(
    task_id="hive_partition_sensor",
    service_id=METASTORE_SERVICE_ID,
    region=REGION,
    table=TABLE_NAME,
    partitions=[PARTITION_1, PARTITION_2],
)

此條目有幫助嗎?