Google Cloud Datastore Operator¶
Datastore 模式下的 Firestore 是一個 NoSQL 文件資料庫,專為自動擴充套件、高效能和易於應用程式開發而構建。
有關該服務的更多資訊,請訪問 Datastore 產品文件
前置任務¶
要使用這些 Operator,您需要執行以下幾項操作
使用 Cloud Console 選擇或建立一個 Cloud Platform 專案。
為您的專案啟用結算,詳情請參閱 Google Cloud 文件。
啟用 API,詳情請參閱 Cloud Console 文件。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關 安裝 的詳細資訊。
匯出實體¶
要將實體從 Google Cloud Datastore 匯出到 Cloud Storage,請使用 CloudDatastoreExportEntitiesOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
export_task = CloudDatastoreExportEntitiesOperator(
    task_id="export_task",
    bucket=BUCKET_NAME,
    project_id=PROJECT_ID,
    overwrite_existing=True,
)
匯入實體¶
要將實體從 Cloud Storage 匯入到 Google Cloud Datastore,請使用 CloudDatastoreImportEntitiesOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
import_task = CloudDatastoreImportEntitiesOperator(
    task_id="import_task",
    bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
    file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
    project_id=PROJECT_ID,
)
分配 ID¶
要為不完整鍵分配 ID,請使用 CloudDatastoreAllocateIdsOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
allocate_ids = CloudDatastoreAllocateIdsOperator(
    task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)
Operator 所需的部分鍵示例
tests/system/google/cloud/datastore/example_datastore_commit.py
KEYS = [
    {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
        "path": {"kind": "airflow"},
    }
]
開始事務¶
要開始新事務,請使用 CloudDatastoreBeginTransactionOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
    task_id="begin_transaction_commit",
    transaction_options=TRANSACTION_OPTIONS,
    project_id=PROJECT_ID,
)
Operator 所需的事務選項示例
tests/system/google/cloud/datastore/example_datastore_commit.py
TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}
提交事務¶
要提交事務,並可選擇建立、刪除或修改某些實體,請使用 CloudDatastoreCommitOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)
Operator 所需的提交資訊示例
tests/system/google/cloud/datastore/example_datastore_commit.py
    COMMIT_BODY = {
        "mode": "TRANSACTIONAL",
        "mutations": [
            {
                "insert": {
                    "key": KEYS[0],
                    "properties": {"string": {"stringValue": "airflow is awesome!"}},
                }
            }
        ],
        "singleUseTransaction": {"readWrite": {}},
    }
執行查詢¶
要對實體執行查詢,請使用 CloudDatastoreRunQueryOperator
tests/system/google/cloud/datastore/example_datastore_query.py
run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)
Operator 所需的查詢示例
tests/system/google/cloud/datastore/example_datastore_query.py
    QUERY = {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
        "readOptions": {"transaction": begin_transaction_query.output},
        "query": {},
    }
回滾事務¶
要回滾事務,請使用 CloudDatastoreRollbackOperator
tests/system/google/cloud/datastore/example_datastore_rollback.py
rollback_transaction = CloudDatastoreRollbackOperator(
    task_id="rollback_transaction",
    transaction=begin_transaction_to_rollback.output,
)
獲取操作狀態¶
要獲取長時間執行的操作的當前狀態,請使用 CloudDatastoreGetOperationOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
get_operation = CloudDatastoreGetOperationOperator(
    task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)
刪除操作¶
要刪除操作,請使用 CloudDatastoreDeleteOperationOperator
tests/system/google/cloud/datastore/example_datastore_commit.py
delete_export_operation = CloudDatastoreDeleteOperationOperator(
    task_id="delete_export_operation",
    name="{{ task_instance.xcom_pull('export_task')['name'] }}",
    trigger_rule=TriggerRule.ALL_DONE,
)
參考資料¶
如需更多資訊,請參閱