Google Cloud Bigtable 運算子¶
先決條件任務¶
要使用這些運算子,您必須執行一些操作
使用 Cloud Console 選擇或建立一個 Cloud Platform 專案。
為您的專案啟用結算功能,具體操作請參閱 Google Cloud 文件。
啟用 API,具體操作請參閱 Cloud Console 文件。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
BigtableCreateInstanceOperator¶
使用 BigtableCreateInstanceOperator 建立 Google Cloud Bigtable 例項。
如果具有給定 ID 的 Cloud Bigtable 例項已存在,運算子不會比較其配置並立即成功。對現有例項不進行任何更改。
使用運算子¶
您可以建立帶或不帶專案 ID 的運算子。如果缺少專案 ID,將從使用的 Google Cloud 連線中檢索。此處展示了兩種變體
tests/system/google/cloud/bigtable/example_bigtable.py
create_instance_task = BigtableCreateInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=CBT_INSTANCE_TYPE, # type: ignore[arg-type]
instance_labels=CBT_INSTANCE_LABELS,
cluster_nodes=None,
cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE, # type: ignore[arg-type]
task_id="create_instance_task",
)
BigtableUpdateInstanceOperator¶
使用 BigtableUpdateInstanceOperator 更新現有的 Google Cloud Bigtable 例項。
對於現有例項,只能更新以下配置:instance_display_name、instance_type 和 instance_labels。
使用運算子¶
您可以建立帶或不帶專案 ID 的運算子。如果缺少專案 ID,將從使用的 Google Cloud 連線中檢索。此處展示了兩種變體
tests/system/google/cloud/bigtable/example_bigtable.py
update_instance_task = BigtableUpdateInstanceOperator(
instance_id=CBT_INSTANCE_ID_1,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
instance_type=CBT_INSTANCE_TYPE_PROD,
instance_labels=CBT_INSTANCE_LABELS_UPDATED,
task_id="update_instance_task",
)
BigtableDeleteInstanceOperator¶
使用 BigtableDeleteInstanceOperator 刪除 Google Cloud Bigtable 例項。
使用運算子¶
您可以建立帶或不帶專案 ID 的運算子。如果缺少專案 ID,將從使用的 Google Cloud 連線中檢索。此處展示了兩種變體
tests/system/google/cloud/bigtable/example_bigtable.py
delete_instance_task = BigtableDeleteInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
task_id="delete_instance_task",
)
BigtableUpdateClusterOperator¶
使用 BigtableUpdateClusterOperator 修改 Cloud Bigtable 叢集中的節點數量。
使用運算子¶
您可以建立帶或不帶專案 ID 的運算子。如果缺少專案 ID,將從使用的 Google Cloud 連線中檢索。此處展示了兩種變體
tests/system/google/cloud/bigtable/example_bigtable.py
cluster_update_task = BigtableUpdateClusterOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
cluster_id=CBT_CLUSTER_ID,
nodes=CBT_CLUSTER_NODES_UPDATED,
task_id="update_cluster_task",
)
BigtableCreateTableOperator¶
在 Cloud Bigtable 例項中建立表。
如果具有給定 ID 的表已存在於 Cloud Bigtable 例項中,運算子將比較列族。如果列族相同,運算子將成功。否則,運算子將返回適當的錯誤訊息並失敗。
使用運算子¶
您可以建立帶或不帶專案 ID 的運算子。如果缺少專案 ID,將從使用的 Google Cloud 連線中檢索。此處展示了兩種變體
tests/system/google/cloud/bigtable/example_bigtable.py
create_table_task = BigtableCreateTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
table_id=CBT_TABLE_ID,
task_id="create_table",
)
高階¶
建立表時,您可以指定可選的 initial_split_keys 和 column_families。請參考 Python Client for Google Cloud Bigtable 文件中的 Table 和 Column Families 部分。
BigtableDeleteTableOperator¶
使用 BigtableDeleteTableOperator 刪除 Google Cloud Bigtable 中的表。
使用運算子¶
您可以建立帶或不帶專案 ID 的運算子。如果缺少專案 ID,將從使用的 Google Cloud 連線中檢索。此處展示了兩種變體
tests/system/google/cloud/bigtable/example_bigtable.py
delete_table_task = BigtableDeleteTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
table_id=CBT_TABLE_ID,
task_id="delete_table_task",
)
BigtableTableReplicationCompletedSensor¶
您可以建立帶或不帶專案 ID 的運算子。如果缺少專案 ID,將從使用的 Google Cloud 連線中檢索。此處展示了兩種變體
使用 BigtableTableReplicationCompletedSensor 等待表完全複製完成。
適用於 BigtableCreateTableOperator 的相同引數也適用於此感測器。
注意: 如果表或 Cloud Bigtable 例項不存在,此感測器將一直等待直到超時,並且不會引發任何異常。
使用運算子¶
tests/system/google/cloud/bigtable/example_bigtable.py
wait_for_table_replication_task = BigtableTableReplicationCompletedSensor(
instance_id=CBT_INSTANCE_ID_2,
table_id=CBT_TABLE_ID,
poke_interval=CBT_POKE_INTERVAL,
timeout=180,
task_id="wait_for_table_replication_task2",
)
參考¶
更多資訊,請參閱