Google Dataplex 運算子¶
Dataplex 是一種智慧資料架構,可為您的資料湖、資料倉庫和資料市場提供統一的分析和資料管理。
有關任務的更多資訊,請訪問 Dataplex production documentation <Product documentation
建立任務¶
在建立 Dataplex 任務之前,您需要定義其主體。有關建立任務時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create task API.
一個簡單的任務配置如下所示
tests/system/google/cloud/dataplex/example_dataplex.py
EXAMPLE_TASK_BODY = {
"trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
"execution_spec": {"service_account": SERVICE_ACC},
"spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}
使用此配置,我們可以同步和非同步地建立任務: DataplexCreateTaskOperator
tests/system/google/cloud/dataplex/example_dataplex.py
create_dataplex_task = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="create_dataplex_task",
)
tests/system/google/cloud/dataplex/example_dataplex.py
create_dataplex_task_async = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
asynchronous=True,
task_id="create_dataplex_task_async",
)
刪除任務¶
要刪除任務,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
delete_dataplex_task_async = DataplexDeleteTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
task_id="delete_dataplex_task_async",
)
列出任務¶
要列出任務,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
list_dataplex_task = DataplexListTasksOperator(
project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)
獲取任務¶
要獲取任務,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
get_dataplex_task = DataplexGetTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="get_dataplex_task",
)
等待任務¶
要等待非同步建立的任務,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
dataplex_task_state = DataplexTaskStateSensor(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="dataplex_task_state",
)
建立資料湖¶
在建立 Dataplex 資料湖之前,您需要定義其主體。
有關建立資料湖時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create lake API.
一個簡單的任務配置如下所示
tests/system/google/cloud/dataplex/example_dataplex.py
EXAMPLE_LAKE_BODY = {
"display_name": "test_display_name",
"labels": [],
"description": "test_description",
"metastore": {"service": ""},
}
使用此配置,我們可以建立資料湖
tests/system/google/cloud/dataplex/example_dataplex.py
create_lake = DataplexCreateLakeOperator(
project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)
刪除資料湖¶
要刪除資料湖,您可以使用
tests/system/google/cloud/dataplex/example_dataplex.py
delete_lake = DataplexDeleteLakeOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
task_id="delete_lake",
trigger_rule=TriggerRule.ALL_DONE,
)
建立或更新資料質量掃描¶
在建立 Dataplex 資料質量掃描之前,您需要定義其主體。有關建立資料質量掃描時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create data quality API.
一個簡單的資料質量掃描配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_dq.py
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
{
"rules": [
{
"range_expectation": {
"min_value": "0",
"max_value": "10000",
},
"column": "value",
"dimension": "VALIDITY",
}
],
}
)
使用此配置,我們可以建立或更新資料質量掃描
DataplexCreateOrUpdateDataQualityScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
獲取資料質量掃描¶
要獲取資料質量掃描,您可以使用
DataplexGetDataQualityScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
get_data_scan = DataplexGetDataQualityScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
刪除資料質量掃描¶
要刪除資料質量掃描,您可以使用
DataplexDeleteDataQualityScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
delete_data_scan = DataplexDeleteDataQualityScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
執行資料質量掃描¶
您可以在非同步模式下執行 Dataplex 資料質量掃描,然後使用感測器檢查其狀態
DataplexRunDataQualityScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
run_data_scan_async = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
要檢查執行 Dataplex 資料質量掃描是否成功,您可以使用
DataplexDataQualityJobStatusSensor.
tests/system/google/cloud/dataplex/example_dataplex_dq.py
get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
您也可以在此操作中使用可延遲模式下的運算子
tests/system/google/cloud/dataplex/example_dataplex_dq.py
run_data_scan_def = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
獲取資料質量掃描作業¶
要獲取資料質量掃描作業,您可以使用
DataplexGetDataQualityScanResultOperator
tests/system/google/cloud/dataplex/example_dataplex_dq.py
get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
您也可以在此操作中使用可延遲模式下的運算子
tests/system/google/cloud/dataplex/example_dataplex_dq.py
get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
建立區域¶
在建立 Dataplex 區域之前,您需要定義其主體。
有關建立區域時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create zone API.
一個簡單的區域配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_dq.py
EXAMPLE_ZONE = {
"type_": "RAW",
"resource_spec": {"location_type": "SINGLE_REGION"},
}
使用此配置,我們可以建立區域
tests/system/google/cloud/dataplex/example_dataplex_dq.py
create_zone = DataplexCreateZoneOperator(
task_id="create_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_ZONE,
zone_id=ZONE_ID,
)
刪除區域¶
要刪除區域,您可以使用
tests/system/google/cloud/dataplex/example_dataplex_dq.py
delete_zone = DataplexDeleteZoneOperator(
task_id="delete_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
建立資產¶
在建立 Dataplex 資產之前,您需要定義其主體。
有關建立資產時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create asset API.
一個簡單的資產配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_dq.py
EXAMPLE_ASSET = {
"resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
"discovery_spec": {"enabled": True},
}
使用此配置,我們可以建立資產
tests/system/google/cloud/dataplex/example_dataplex_dq.py
create_asset = DataplexCreateAssetOperator(
task_id="create_asset",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_ASSET,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
)
刪除資產¶
要刪除資產,您可以使用
tests/system/google/cloud/dataplex/example_dataplex_dq.py
delete_asset = DataplexDeleteAssetOperator(
task_id="delete_asset",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
建立或更新資料畫像掃描¶
在建立 Dataplex 資料畫像掃描之前,您需要定義其主體。有關建立資料畫像掃描時可傳遞的可用欄位的更多資訊,請訪問 Dataplex create data profile API.
一個簡單的資料畫像掃描配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_dp.py
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})
使用此配置,我們可以建立或更新資料畫像掃描
DataplexCreateOrUpdateDataProfileScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
獲取資料畫像掃描¶
要獲取資料畫像掃描,您可以使用
DataplexGetDataProfileScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
get_data_scan = DataplexGetDataProfileScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
刪除資料畫像掃描¶
要刪除資料畫像掃描,您可以使用
DataplexDeleteDataProfileScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
delete_data_scan = DataplexDeleteDataProfileScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
執行資料畫像掃描¶
您可以在非同步模式下執行 Dataplex 資料畫像掃描,然後使用感測器檢查其狀態
DataplexRunDataProfileScanOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
run_data_scan_async = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
要檢查執行 Dataplex 資料畫像掃描是否成功,您可以使用
DataplexDataProfileJobStatusSensor.
tests/system/google/cloud/dataplex/example_dataplex_dp.py
get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
您也可以在此操作中使用可延遲模式下的運算子
tests/system/google/cloud/dataplex/example_dataplex_dp.py
run_data_scan_def = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
獲取資料畫像掃描作業¶
要獲取資料畫像掃描作業,您可以使用
DataplexGetDataProfileScanResultOperator
tests/system/google/cloud/dataplex/example_dataplex_dp.py
get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Google Dataplex Catalog 運算子¶
Dataplex Catalog 提供 Google Cloud 資源(如 BigQuery)以及其他資源(如本地資源)的統一清單。Dataplex Catalog 自動檢索 Google Cloud 資源的元資料,您可以將第三方資源的元資料引入 Dataplex Catalog。
有關 Dataplex Catalog 的更多資訊,請訪問 Dataplex Catalog production documentation <Product documentation
建立 EntryGroup¶
要在 Dataplex Catalog 中的特定位置建立 Entry Group,您可以使用 DataplexCatalogCreateEntryGroupOperator 有關建立 Entry Group 時可傳遞的可用欄位的更多資訊,請訪問 Entry Group resource configuration.
一個簡單的 Entry Group 配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
ENTRY_GROUP_BODY = {"display_name": "Display Name", "description": "Some description"}
使用此配置,您可以建立 Entry Group 資源
DataplexCatalogCreateEntryGroupOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
create_entry_group = DataplexCatalogCreateEntryGroupOperator(
task_id="create_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
entry_group_configuration=ENTRY_GROUP_BODY,
validate_request=False,
)
刪除 EntryGroup¶
要刪除 Dataplex Catalog 中的特定位置的 Entry Group,您可以使用 DataplexCatalogDeleteEntryGroupOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
列出 EntryGroups¶
要列出 Dataplex Catalog 中特定位置的所有 Entry Groups,您可以使用 DataplexCatalogListEntryGroupsOperator. 此運算子還支援對操作結果進行過濾和排序。
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
list_entry_group = DataplexCatalogListEntryGroupsOperator(
task_id="list_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
order_by="name",
filter_by='display_name = "Display Name"',
)
獲取 EntryGroup¶
要在 Dataplex Catalog 中的特定位置檢索 Entry Group,您可以使用 DataplexCatalogGetEntryGroupOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
get_entry_group = DataplexCatalogGetEntryGroupOperator(
task_id="get_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
)
更新 EntryGroup¶
要在 Dataplex Catalog 中的特定位置更新 Entry Group,您可以使用 DataplexCatalogUpdateEntryGroupOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
task_id="update_entry_group",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
entry_group_configuration={"display_name": "Updated Display Name"},
update_mask=["display_name"],
)
建立 EntryType¶
要在 Dataplex Catalog 中的特定位置建立 Entry Type,您可以使用 DataplexCatalogCreateEntryTypeOperator 有關建立 Entry Type 時可傳遞的可用欄位的更多資訊,請訪問 Entry Type resource configuration.
一個簡單的 Entry Group 配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some description"}
使用此配置,您可以建立 Entry Type 資源
DataplexCatalogCreateEntryTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
create_entry_type = DataplexCatalogCreateEntryTypeOperator(
task_id="create_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_type_id=ENTRY_TYPE_NAME,
entry_type_configuration=ENTRY_TYPE_BODY,
validate_request=False,
)
刪除 EntryType¶
要刪除 Dataplex Catalog 中的特定位置的 Entry Type,您可以使用 DataplexCatalogDeleteEntryTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
delete_entry_type = DataplexCatalogDeleteEntryTypeOperator(
task_id="delete_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_type_id=ENTRY_TYPE_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
列出 EntryTypes¶
要列出 Dataplex Catalog 中特定位置的所有 Entry Types,您可以使用 DataplexCatalogListEntryTypesOperator. 此運算子還支援對操作結果進行過濾和排序。
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
list_entry_type = DataplexCatalogListEntryTypesOperator(
task_id="list_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
order_by="name",
filter_by='display_name = "Display Name"',
)
獲取 EntryType¶
要在 Dataplex Catalog 中的特定位置檢索 Entry Type,您可以使用 DataplexCatalogGetEntryTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
get_entry_type = DataplexCatalogGetEntryTypeOperator(
task_id="get_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_type_id=ENTRY_TYPE_NAME,
)
更新 EntryType¶
要在 Dataplex Catalog 中的特定位置更新 Entry Type,您可以使用 DataplexCatalogUpdateEntryTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
update_entry_type = DataplexCatalogUpdateEntryTypeOperator(
task_id="update_entry_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_type_id=ENTRY_TYPE_NAME,
entry_type_configuration={"display_name": "Updated Display Name"},
update_mask=["display_name"],
)
建立 AspectType¶
要在 Dataplex Catalog 中的特定位置建立 Aspect Type,您可以使用 DataplexCatalogCreateAspectTypeOperator 有關建立 Aspect Type 時可傳遞的可用欄位的更多資訊,請訪問 Aspect Type resource configuration.
一個簡單的 Aspect Type 配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
ASPECT_TYPE_BODY = {
"display_name": "Sample AspectType",
"description": "A simple AspectType for demonstration purposes.",
"metadata_template": {
"name": "sample_field",
"type": "record",
"annotations": {
"display_name": "Sample Field",
"description": "A sample field within the AspectType.",
},
},
}
使用此配置,您可以建立 Aspect Type 資源
DataplexCatalogCreateAspectTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
create_aspect_type = DataplexCatalogCreateAspectTypeOperator(
task_id="create_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
aspect_type_id=ASPECT_TYPE_NAME,
aspect_type_configuration=ASPECT_TYPE_BODY,
validate_request=False,
)
刪除 AspectType¶
要刪除 Dataplex Catalog 中的特定位置的 Aspect Type,您可以使用 DataplexCatalogDeleteAspectTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
delete_aspect_type = DataplexCatalogDeleteAspectTypeOperator(
task_id="delete_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
aspect_type_id=ASPECT_TYPE_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
列出 AspectTypes¶
要列出 Dataplex Catalog 中特定位置的所有 Aspect Types,您可以使用 DataplexCatalogListAspectTypesOperator. 此運算子還支援對操作結果進行過濾和排序。
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
list_aspect_type = DataplexCatalogListAspectTypesOperator(
task_id="list_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
order_by="name",
filter_by='display_name = "Display Name"',
)
獲取 AspectType¶
要在 Dataplex Catalog 中的特定位置檢索 Aspect Type,您可以使用 DataplexCatalogGetAspectTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
get_aspect_type = DataplexCatalogGetAspectTypeOperator(
task_id="get_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
aspect_type_id=ASPECT_TYPE_NAME,
)
更新 AspectType¶
要在 Dataplex Catalog 中的特定位置更新 Aspect Type,您可以使用 DataplexCatalogUpdateAspectTypeOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
update_aspect_type = DataplexCatalogUpdateAspectTypeOperator(
task_id="update_aspect_type",
project_id=PROJECT_ID,
location=GCP_LOCATION,
aspect_type_id=ASPECT_TYPE_NAME,
aspect_type_configuration={"display_name": "Updated Display Name"},
update_mask=["display_name"],
)
建立 Entry¶
要在 Dataplex Catalog 中的特定位置建立 Entry,您可以使用 DataplexCatalogCreateEntryOperator 有關建立 Entry 時可傳遞的可用欄位的更多資訊,請訪問 Entry resource configuration.
一個簡單的 Entry 配置如下所示
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
ENTRY_BODY = {
"name": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryGroups/{ENTRY_GROUP_NAME}/entries/{ENTRY_NAME}",
"entry_type": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryTypes/{ENTRY_TYPE_NAME}",
}
使用此配置,您可以建立 Entry 資源
DataplexCatalogCreateEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
create_entry = DataplexCatalogCreateEntryOperator(
task_id="create_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
entry_configuration=ENTRY_BODY,
)
刪除 Entry¶
要刪除 Dataplex Catalog 中的特定位置的 Entry,您可以使用 DataplexCatalogDeleteEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
delete_entry = DataplexCatalogDeleteEntryOperator(
task_id="delete_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
列出 Entries¶
要列出 Dataplex Catalog 中特定位置的所有 Entries,您可以使用 DataplexCatalogListEntriesOperator. 此運算子還支援對操作結果進行過濾和排序。
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
list_entry = DataplexCatalogListEntriesOperator(
task_id="list_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_group_id=ENTRY_GROUP_NAME,
)
獲取 Entry¶
要在 Dataplex Catalog 中的特定位置檢索 Entry,您可以使用 DataplexCatalogGetEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
get_entry = DataplexCatalogGetEntryOperator(
task_id="get_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
)
更新 Entry¶
要在 Dataplex Catalog 中的特定位置更新 Entry,您可以使用 DataplexCatalogUpdateEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
update_entry = DataplexCatalogUpdateEntryOperator(
task_id="update_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
entry_configuration={
"fully_qualified_name": f"dataplex:{PROJECT_ID}.{GCP_LOCATION}.{ENTRY_GROUP_NAME}.some-entry"
},
update_mask=["fully_qualified_name"],
)
查詢單個 Entry¶
要在 Dataplex Catalog 中使用源系統上的許可權按名稱查詢單個 Entry,您可以使用 DataplexCatalogLookupEntryOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
lookup_entry = DataplexCatalogLookupEntryOperator(
task_id="lookup_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
entry_id=ENTRY_NAME,
entry_group_id=ENTRY_GROUP_NAME,
)
搜尋 Entries¶
要在 Dataplex Catalog 中搜索與給定查詢和範圍匹配的 Entries,您可以使用 DataplexCatalogSearchEntriesOperator
tests/system/google/cloud/dataplex/example_dataplex_catalog.py
search_entry = DataplexCatalogSearchEntriesOperator(
task_id="search_entry",
project_id=PROJECT_ID,
location=GCP_LOCATION,
query=f"name={ENTRY_NAME}",
)