Google Dataplex 運算子

Dataplex 是一種智慧資料架構,可為您的資料湖、資料倉庫和資料市場提供統一的分析和資料管理。

有關任務的更多資訊,請訪問 Dataplex production documentation <Product documentation

建立任務

在建立 Dataplex 任務之前,您需要定義其主體。有關建立任務時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create task API.

一個簡單的任務配置如下所示

tests/system/google/cloud/dataplex/example_dataplex.py

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

使用此配置,我們可以同步和非同步地建立任務: DataplexCreateTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

tests/system/google/cloud/dataplex/example_dataplex.py

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

刪除任務

要刪除任務,您可以使用

DataplexDeleteTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py

delete_dataplex_task_async = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    task_id="delete_dataplex_task_async",
)

列出任務

要列出任務,您可以使用

DataplexListTasksOperator

tests/system/google/cloud/dataplex/example_dataplex.py

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

獲取任務

要獲取任務,您可以使用

DataplexGetTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

等待任務

要等待非同步建立的任務,您可以使用

DataplexTaskStateSensor

tests/system/google/cloud/dataplex/example_dataplex.py

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

建立資料湖

在建立 Dataplex 資料湖之前,您需要定義其主體。

有關建立資料湖時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create lake API.

一個簡單的任務配置如下所示

tests/system/google/cloud/dataplex/example_dataplex.py

EXAMPLE_LAKE_BODY = {
    "display_name": "test_display_name",
    "labels": [],
    "description": "test_description",
    "metastore": {"service": ""},
}

使用此配置,我們可以建立資料湖

DataplexCreateLakeOperator

tests/system/google/cloud/dataplex/example_dataplex.py

create_lake = DataplexCreateLakeOperator(
    project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)

刪除資料湖

要刪除資料湖,您可以使用

DataplexDeleteLakeOperator

tests/system/google/cloud/dataplex/example_dataplex.py

delete_lake = DataplexDeleteLakeOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    task_id="delete_lake",
    trigger_rule=TriggerRule.ALL_DONE,
)

建立或更新資料質量掃描

在建立 Dataplex 資料質量掃描之前,您需要定義其主體。有關建立資料質量掃描時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create data quality API.

一個簡單的資料質量掃描配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_dq.py

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
    {
        "rules": [
            {
                "range_expectation": {
                    "min_value": "0",
                    "max_value": "10000",
                },
                "column": "value",
                "dimension": "VALIDITY",
            }
        ],
    }
)

使用此配置,我們可以建立或更新資料質量掃描

DataplexCreateOrUpdateDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

獲取資料質量掃描

要獲取資料質量掃描,您可以使用

DataplexGetDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan = DataplexGetDataQualityScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

刪除資料質量掃描

要刪除資料質量掃描,您可以使用

DataplexDeleteDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

delete_data_scan = DataplexDeleteDataQualityScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

執行資料質量掃描

您可以在非同步模式下執行 Dataplex 資料質量掃描,然後使用感測器檢查其狀態

DataplexRunDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

run_data_scan_async = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

要檢查執行 Dataplex 資料質量掃描是否成功,您可以使用

DataplexDataQualityJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

您也可以在此操作中使用可延遲模式下的運算子

tests/system/google/cloud/dataplex/example_dataplex_dq.py

run_data_scan_def = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

獲取資料質量掃描作業

要獲取資料質量掃描作業,您可以使用

DataplexGetDataQualityScanResultOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

您也可以在此操作中使用可延遲模式下的運算子

tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

建立區域

在建立 Dataplex 區域之前,您需要定義其主體。

有關建立區域時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create zone API.

一個簡單的區域配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_dq.py

EXAMPLE_ZONE = {
    "type_": "RAW",
    "resource_spec": {"location_type": "SINGLE_REGION"},
}

使用此配置,我們可以建立區域

DataplexCreateZoneOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

create_zone = DataplexCreateZoneOperator(
    task_id="create_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_ZONE,
    zone_id=ZONE_ID,
)

刪除區域

要刪除區域,您可以使用

DataplexDeleteZoneOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

delete_zone = DataplexDeleteZoneOperator(
    task_id="delete_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

建立資產

在建立 Dataplex 資產之前,您需要定義其主體。

有關建立資產時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create asset API.

一個簡單的資產配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_dq.py

EXAMPLE_ASSET = {
    "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
    "discovery_spec": {"enabled": True},
}

使用此配置,我們可以建立資產

DataplexCreateAssetOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

create_asset = DataplexCreateAssetOperator(
    task_id="create_asset",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_ASSET,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
)

刪除資產

要刪除資產,您可以使用

DataplexDeleteAssetOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py

delete_asset = DataplexDeleteAssetOperator(
    task_id="delete_asset",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

建立或更新資料畫像掃描

在建立 Dataplex 資料畫像掃描之前,您需要定義其主體。有關建立資料畫像掃描時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create data profile API.

一個簡單的資料畫像掃描配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_dp.py

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})

使用此配置,我們可以建立或更新資料畫像掃描

DataplexCreateOrUpdateDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

獲取資料畫像掃描

要獲取資料畫像掃描,您可以使用

DataplexGetDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

get_data_scan = DataplexGetDataProfileScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

刪除資料畫像掃描

要刪除資料畫像掃描,您可以使用

DataplexDeleteDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

delete_data_scan = DataplexDeleteDataProfileScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

執行資料畫像掃描

您可以在非同步模式下執行 Dataplex 資料畫像掃描,然後使用感測器檢查其狀態

DataplexRunDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

run_data_scan_async = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

要檢查執行 Dataplex 資料畫像掃描是否成功,您可以使用

DataplexDataProfileJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dp.py

get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

您也可以在此操作中使用可延遲模式下的運算子

tests/system/google/cloud/dataplex/example_dataplex_dp.py

run_data_scan_def = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

獲取資料畫像掃描作業

要獲取資料畫像掃描作業,您可以使用

DataplexGetDataProfileScanResultOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py

get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Google Dataplex Catalog 運算子

Dataplex Catalog 提供 Google Cloud 資源(如 BigQuery)以及其他資源(如本地資源)的統一清單。Dataplex Catalog 自動檢索 Google Cloud 資源的元資料,您可以將第三方資源的元資料引入 Dataplex Catalog。

有關 Dataplex Catalog 的更多資訊,請訪問 Dataplex Catalog production documentation <Product documentation

建立 EntryGroup

要在 Dataplex Catalog 中的特定位置建立 Entry Group,您可以使用 DataplexCatalogCreateEntryGroupOperator 有關建立 Entry Group 時可傳遞的可用欄位的更多資訊,請訪問 Entry Group resource configuration.

一個簡單的 Entry Group 配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ENTRY_GROUP_BODY = {"display_name": "Display Name", "description": "Some description"}

使用此配置,您可以建立 Entry Group 資源

DataplexCatalogCreateEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_entry_group = DataplexCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration=ENTRY_GROUP_BODY,
    validate_request=False,
)

刪除 EntryGroup

要刪除 Dataplex Catalog 中的特定位置的 Entry Group,您可以使用 DataplexCatalogDeleteEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

列出 EntryGroups

要列出 Dataplex Catalog 中特定位置的所有 Entry Groups,您可以使用 DataplexCatalogListEntryGroupsOperator. 此運算子還支援對操作結果進行過濾和排序。

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_entry_group = DataplexCatalogListEntryGroupsOperator(
    task_id="list_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

獲取 EntryGroup

要在 Dataplex Catalog 中的特定位置檢索 Entry Group,您可以使用 DataplexCatalogGetEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_entry_group = DataplexCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

更新 EntryGroup

要在 Dataplex Catalog 中的特定位置更新 Entry Group,您可以使用 DataplexCatalogUpdateEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
    task_id="update_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

建立 EntryType

要在 Dataplex Catalog 中的特定位置建立 Entry Type,您可以使用 DataplexCatalogCreateEntryTypeOperator 有關建立 Entry Type 時可傳遞的可用欄位的更多資訊,請訪問 Entry Type resource configuration.

一個簡單的 Entry Group 配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some description"}

使用此配置,您可以建立 Entry Type 資源

DataplexCatalogCreateEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_entry_type = DataplexCatalogCreateEntryTypeOperator(
    task_id="create_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration=ENTRY_TYPE_BODY,
    validate_request=False,
)

刪除 EntryType

要刪除 Dataplex Catalog 中的特定位置的 Entry Type,您可以使用 DataplexCatalogDeleteEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_entry_type = DataplexCatalogDeleteEntryTypeOperator(
    task_id="delete_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

列出 EntryTypes

要列出 Dataplex Catalog 中特定位置的所有 Entry Types,您可以使用 DataplexCatalogListEntryTypesOperator. 此運算子還支援對操作結果進行過濾和排序。

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_entry_type = DataplexCatalogListEntryTypesOperator(
    task_id="list_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

獲取 EntryType

要在 Dataplex Catalog 中的特定位置檢索 Entry Type,您可以使用 DataplexCatalogGetEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_entry_type = DataplexCatalogGetEntryTypeOperator(
    task_id="get_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
)

更新 EntryType

要在 Dataplex Catalog 中的特定位置更新 Entry Type,您可以使用 DataplexCatalogUpdateEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_entry_type = DataplexCatalogUpdateEntryTypeOperator(
    task_id="update_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

建立 AspectType

要在 Dataplex Catalog 中的特定位置建立 Aspect Type,您可以使用 DataplexCatalogCreateAspectTypeOperator 有關建立 Aspect Type 時可傳遞的可用欄位的更多資訊,請訪問 Aspect Type resource configuration.

一個簡單的 Aspect Type 配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ASPECT_TYPE_BODY = {
    "display_name": "Sample AspectType",
    "description": "A simple AspectType for demonstration purposes.",
    "metadata_template": {
        "name": "sample_field",
        "type": "record",
        "annotations": {
            "display_name": "Sample Field",
            "description": "A sample field within the AspectType.",
        },
    },
}

使用此配置,您可以建立 Aspect Type 資源

DataplexCatalogCreateAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_aspect_type = DataplexCatalogCreateAspectTypeOperator(
    task_id="create_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration=ASPECT_TYPE_BODY,
    validate_request=False,
)

刪除 AspectType

要刪除 Dataplex Catalog 中的特定位置的 Aspect Type,您可以使用 DataplexCatalogDeleteAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_aspect_type = DataplexCatalogDeleteAspectTypeOperator(
    task_id="delete_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

列出 AspectTypes

要列出 Dataplex Catalog 中特定位置的所有 Aspect Types,您可以使用 DataplexCatalogListAspectTypesOperator. 此運算子還支援對操作結果進行過濾和排序。

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_aspect_type = DataplexCatalogListAspectTypesOperator(
    task_id="list_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

獲取 AspectType

要在 Dataplex Catalog 中的特定位置檢索 Aspect Type,您可以使用 DataplexCatalogGetAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_aspect_type = DataplexCatalogGetAspectTypeOperator(
    task_id="get_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
)

更新 AspectType

要在 Dataplex Catalog 中的特定位置更新 Aspect Type,您可以使用 DataplexCatalogUpdateAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_aspect_type = DataplexCatalogUpdateAspectTypeOperator(
    task_id="update_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

建立 Entry

要在 Dataplex Catalog 中的特定位置建立 Entry,您可以使用 DataplexCatalogCreateEntryOperator 有關建立 Entry 時可傳遞的可用欄位的更多資訊,請訪問 Entry resource configuration.

一個簡單的 Entry 配置如下所示

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ENTRY_BODY = {
    "name": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryGroups/{ENTRY_GROUP_NAME}/entries/{ENTRY_NAME}",
    "entry_type": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryTypes/{ENTRY_TYPE_NAME}",
}

使用此配置,您可以建立 Entry 資源

DataplexCatalogCreateEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_entry = DataplexCatalogCreateEntryOperator(
    task_id="create_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration=ENTRY_BODY,
)

刪除 Entry

要刪除 Dataplex Catalog 中的特定位置的 Entry,您可以使用 DataplexCatalogDeleteEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_entry = DataplexCatalogDeleteEntryOperator(
    task_id="delete_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

列出 Entries

要列出 Dataplex Catalog 中特定位置的所有 Entries,您可以使用 DataplexCatalogListEntriesOperator. 此運算子還支援對操作結果進行過濾和排序。

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_entry = DataplexCatalogListEntriesOperator(
    task_id="list_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

獲取 Entry

要在 Dataplex Catalog 中的特定位置檢索 Entry,您可以使用 DataplexCatalogGetEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_entry = DataplexCatalogGetEntryOperator(
    task_id="get_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

更新 Entry

要在 Dataplex Catalog 中的特定位置更新 Entry,您可以使用 DataplexCatalogUpdateEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_entry = DataplexCatalogUpdateEntryOperator(
    task_id="update_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration={
        "fully_qualified_name": f"dataplex:{PROJECT_ID}.{GCP_LOCATION}.{ENTRY_GROUP_NAME}.some-entry"
    },
    update_mask=["fully_qualified_name"],
)

查詢單個 Entry

要在 Dataplex Catalog 中使用源系統上的許可權按名稱查詢單個 Entry,您可以使用 DataplexCatalogLookupEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

lookup_entry = DataplexCatalogLookupEntryOperator(
    task_id="lookup_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

搜尋 Entries

要在 Dataplex Catalog 中搜索與給定查詢和範圍匹配的 Entries,您可以使用 DataplexCatalogSearchEntriesOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py

search_entry = DataplexCatalogSearchEntriesOperator(
    task_id="search_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    query=f"name={ENTRY_NAME}",
)

本條目有幫助嗎?