Google Cloud Data Catalog Operator¶
Data Catalog 是一項完全託管且可擴充套件的元資料管理服務,使組織能夠在 Google Cloud 中快速發現、管理和理解其所有資料。它提供
一個簡單易用的資料發現搜尋介面,由支援 Gmail 和 Drive 的相同 Google 搜尋技術提供支援
一個靈活且強大的目錄系統,用於捕獲技術和業務元資料
一個用於標記敏感資料的自動標記機制,集成了 DLP API
前提任務¶
要使用這些 operator,您必須執行以下幾項操作
使用Cloud 控制檯選擇或建立 Cloud Platform 專案。
為您的專案啟用結算功能,詳見Google Cloud 文件。
啟用 API,詳見Cloud 控制檯文件。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關安裝的詳細資訊。
管理條目(entries)¶
Operator 使用 Entry 表示條目。
獲取條目¶
獲取條目使用 CloudDataCatalogGetEntryOperator 和 CloudDataCatalogLookupEntryOperator operator。
CloudDataCatalogGetEntryOperator 使用專案 ID、條目組 ID、條目 ID 來獲取條目。
tests/system/google/datacatalog/example_datacatalog_entries.py
get_entry = CloudDataCatalogGetEntryOperator(
task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以將 Jinja 模板 與 location、entry_group、entry、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
結果會儲存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_entries.py
get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
CloudDataCatalogLookupEntryOperator 使用資源名稱來獲取條目。
tests/system/google/datacatalog/example_datacatalog_entries.py
current_entry_template = (
"//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
"entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
task_id="lookup_entry",
linked_resource=current_entry_template.format(
project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
),
)
您可以將 Jinja 模板 與 linked_resource、sql_resource、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
結果會儲存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_entries.py
lookup_entry_result = BashOperator(
task_id="lookup_entry_result",
bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)
建立條目¶
CloudDataCatalogCreateEntryOperator operator 建立條目。
tests/system/google/datacatalog/example_datacatalog_entries.py
create_entry_gcs = CloudDataCatalogCreateEntryOperator(
task_id="create_entry_gcs",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
entry={
"display_name": ENTRY_NAME,
"type_": "FILESET",
"gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
},
)
您可以將 Jinja 模板 與 location、entry_group、entry_id、entry、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
結果會儲存到 XCom 中,以便其他 operator 可以使用。
可以使用 entry_id 鍵讀取新建立的條目 ID。
tests/system/google/datacatalog/example_datacatalog_entries.py
create_entry_gcs_result = BashOperator(
task_id="create_entry_gcs_result",
bash_command=f"echo {XComArg(create_entry_gcs, key='entry_id')}",
)
更新條目¶
CloudDataCatalogUpdateEntryOperator operator 更新條目。
tests/system/google/datacatalog/example_datacatalog_entries.py
update_entry = CloudDataCatalogUpdateEntryOperator(
task_id="update_entry",
entry={"display_name": f"{ENTRY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
)
您可以將 Jinja 模板 與 entry、update_mask、location、entry_group、entry_id、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
刪除條目¶
CloudDataCatalogDeleteEntryOperator operator 刪除條目。
tests/system/google/datacatalog/example_datacatalog_entries.py
delete_entry = CloudDataCatalogDeleteEntryOperator(
task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以將 Jinja 模板 與 location、entry_group、entry、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
管理條目組(entry groups)¶
Operator 使用 Entry 表示條目組。
建立條目組¶
CloudDataCatalogCreateEntryGroupOperator operator 建立條目組。
tests/system/google/datacatalog/example_datacatalog_entries.py
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
task_id="create_entry_group",
location=LOCATION,
entry_group_id=ENTRY_GROUP_ID,
entry_group={"display_name": ENTRY_GROUP_NAME},
)
您可以將 Jinja 模板 與 location、entry_group_id、entry_group、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
結果會儲存到 XCom 中,以便其他 operator 可以使用。
可以使用 entry_group_id 鍵讀取新建立的條目組 ID。
tests/system/google/datacatalog/example_datacatalog_entries.py
create_entry_group_result = BashOperator(
task_id="create_entry_group_result",
bash_command=f"echo {XComArg(create_entry_group, key='entry_group_id')}",
)
獲取條目組¶
CloudDataCatalogGetEntryGroupOperator operator 獲取條目組。
tests/system/google/datacatalog/example_datacatalog_entries.py
get_entry_group = CloudDataCatalogGetEntryGroupOperator(
task_id="get_entry_group",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
read_mask=FieldMask(paths=["name", "display_name"]),
)
您可以將 Jinja 模板 與 location、entry_group、read_mask、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
結果會儲存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_entries.py
get_entry_group_result = BashOperator(
task_id="get_entry_group_result",
bash_command=f"echo {get_entry_group.output}",
)
刪除條目組¶
CloudDataCatalogDeleteEntryGroupOperator operator 刪除條目組。
tests/system/google/datacatalog/example_datacatalog_entries.py
delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)
您可以將 Jinja 模板 與 location、entry_group、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
管理標籤模板(tag templates)¶
Operator 使用 TagTemplate 表示標籤模板。
建立標籤模板¶
CloudDataCatalogCreateTagTemplateOperator operator 獲取標籤模板。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
task_id="create_tag_template",
location=LOCATION,
tag_template_id=TEMPLATE_ID,
tag_template={
"display_name": TAG_TEMPLATE_DISPLAY_NAME,
"fields": {
FIELD_NAME_1: TagTemplateField(
display_name="first-field", type_=dict(primitive_type="STRING")
)
},
},
)
您可以將 Jinja 模板 與 location、tag_template_id、tag_template、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
結果會儲存到 XCom 中,以便其他 operator 可以使用。
可以使用 tag_template_id 鍵讀取新建立的標籤模板 ID。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
create_tag_template_result = BashOperator(
task_id="create_tag_template_result",
bash_command=f"echo {XComArg(create_tag_template, key='tag_template_id')}",
)
刪除標籤模板¶
CloudDataCatalogDeleteTagTemplateOperator operator 刪除標籤模板。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)
您可以將 Jinja 模板 與 location、tag_template、force、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
獲取標籤模板¶
CloudDataCatalogGetTagTemplateOperator operator 獲取標籤模板。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
get_tag_template = CloudDataCatalogGetTagTemplateOperator(
task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)
您可以將 Jinja 模板 與 location、tag_template、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
結果會儲存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
get_tag_template_result = BashOperator(
task_id="get_tag_template_result",
bash_command=f"echo {get_tag_template.output}",
)
更新標籤模板¶
CloudDataCatalogUpdateTagTemplateOperator operator 更新標籤模板。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
task_id="update_tag_template",
tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template_id=TEMPLATE_ID,
)
您可以將 Jinja 模板 與 tag_template、update_mask、location、tag_template_id、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
管理標籤模板欄位(tag template fields)¶
Operator 使用 TagTemplateField 表示標籤模板欄位。
建立欄位¶
CloudDataCatalogCreateTagTemplateFieldOperator operator 獲取標籤模板欄位。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
task_id="create_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_2,
tag_template_field=TagTemplateField(
display_name="second-field", type_=FieldType(primitive_type="STRING")
),
)
您可以將 Jinja 模板 與 location、tag_template、tag_template_field_id、tag_template_field、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
結果會儲存到 XCom 中,以便其他 operator 可以使用。
可以使用 tag_template_field_id 鍵讀取新建立的欄位 ID。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
create_tag_template_field_result = BashOperator(
task_id="create_tag_template_field_result",
bash_command=f"echo {XComArg(create_tag_template_field, key='tag_template_field_id')}",
)
重新命名欄位¶
CloudDataCatalogRenameTagTemplateFieldOperator operator 重新命名標籤模板欄位。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
task_id="rename_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_1,
new_tag_template_field_id=FIELD_NAME_3,
)
您可以將 Jinja 模板 與 location、tag_template、field、new_tag_template_field_id、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
更新欄位¶
CloudDataCatalogUpdateTagTemplateFieldOperator operator 獲取標籤模板欄位。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
task_id="update_tag_template_field",
tag_template_field={"display_name": "Updated template field"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_1,
)
您可以將 Jinja 模板 與 tag_template_field、update_mask、tag_template_field_name、location、tag_template、tag_template_field_id、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
刪除欄位¶
CloudDataCatalogDeleteTagTemplateFieldOperator operator 刪除標籤模板欄位。
tests/system/google/datacatalog/example_datacatalog_tag_templates.py
delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
task_id="delete_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_2,
force=True,
)
您可以將 Jinja 模板 與 location、tag_template、field、force、project_id、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數一起使用,從而動態確定值。
搜尋資源¶
CloudDataCatalogSearchCatalogOperator operator 在 Data Catalog 中搜索與查詢匹配的多個資源,例如條目、標籤。
query 引數應使用搜尋語法定義。
tests/system/google/datacatalog/example_datacatalog_search_catalog.py
search_catalog = CloudDataCatalogSearchCatalogOperator(
task_id="search_catalog",
scope={"include_project_ids": [PROJECT_ID]},
query=f"name:{ENTRY_GROUP_NAME}",
)
您可以使用 Jinja 模板 配合 scope、query、page_size、order_by、retry、timeout、metadata、gcp_conn_id、impersonation_chain 引數,從而動態確定值。
結果會儲存到 XCom 中,以便其他 operator 可以使用。
tests/system/google/datacatalog/example_datacatalog_search_catalog.py
search_catalog_result = BashOperator(
task_id="search_catalog_result",
bash_command=f"echo {search_catalog.output}",
)
參考¶
如需瞭解更多資訊,請參閱