Google Cloud SQL 操作器

前提任務

要使用這些操作器,您必須執行以下幾項操作

CloudSQLCreateInstanceDatabaseOperator

在 Cloud SQL 例項中建立新的資料庫。

有關引數定義,請參閱 CloudSQLCreateInstanceDatabaseOperator

使用操作器

您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_db_create_task = CloudSQLCreateInstanceDatabaseOperator(
    body=db_create_body, instance=INSTANCE_NAME, task_id="sql_db_create_task"
)

示例請求體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

db_create_body = {"instance": INSTANCE_NAME, "name": DB_NAME, "project": PROJECT_ID}

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,瞭解 如何在例項中建立新資料庫

CloudSQLDeleteInstanceDatabaseOperator

從 Cloud SQL 例項中刪除資料庫。

有關引數定義,請參閱 CloudSQLDeleteInstanceDatabaseOperator

使用操作器

您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_delete_task",
    trigger_rule=TriggerRule.ALL_DONE,
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,瞭解 如何刪除資料庫

CloudSQLPatchInstanceDatabaseOperator

使用 patch 語義更新包含 Cloud SQL 例項中資料庫資訊的資源。請參閱:https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch

有關引數定義,請參閱 CloudSQLPatchInstanceDatabaseOperator

使用操作器

您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_db_patch_task = CloudSQLPatchInstanceDatabaseOperator(
    body=db_patch_body,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id="sql_db_patch_task",
)

示例請求體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

db_patch_body = {"charset": "utf16", "collation": "utf16_general_ci"}

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "database",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,瞭解 如何更新資料庫

CloudSQLDeleteInstanceOperator

在 Google Cloud 中刪除 Cloud SQL 例項。

它也用於刪除只讀和故障轉移副本。

有關引數定義,請參閱 CloudSQLDeleteInstanceOperator

使用操作器

您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
    instance=INSTANCE_NAME, task_id="sql_instance_delete_task", trigger_rule=TriggerRule.ALL_DONE
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,瞭解 如何刪除 SQL 例項

CloudSQLExportInstanceOperator

將 Cloud SQL 例項中的資料匯出到 Cloud Storage 儲存桶中,格式為 SQL 轉儲或 CSV 檔案。

注意

此操作器是冪等的。如果使用相同的匯出檔案 URI 多次執行,GCS 中的匯出檔案將被簡單覆蓋。

有關引數定義,請參閱 CloudSQLExportInstanceOperator

引數

定義匯出操作的示例請求體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

export_body = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}
export_body_deferrable = {
    "exportContext": {
        "fileType": "sql",
        "uri": FILE_URI_DEFERRABLE,
        "sqlExportOptions": {"schemaOnly": False},
        "offload": True,
    }
}

使用操作器

您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_export_task = CloudSQLExportInstanceOperator(
    body=export_body, instance=INSTANCE_NAME, task_id="sql_export_task"
)

此外,對於所有這些操作,您可以在可延遲模式下使用操作器

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_export_def_task = CloudSQLExportInstanceOperator(
    body=export_body_deferrable,
    instance=INSTANCE_NAME,
    task_id="sql_export_def_task",
    deferrable=True,
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,瞭解 如何匯出資料

故障排除

如果您在 Google Cloud 中收到“Unauthorized”(未授權)錯誤,請確保 Cloud SQL 例項的服務帳號已獲得向所選 GCS 儲存桶寫入資料的授權。

與 GCS 通訊的不是在 Airflow 中配置的服務帳號,而是特定 Cloud SQL 例項的服務帳號。

要授予服務帳號對 GCS 儲存桶的相應 WRITE 許可權,您可以使用 GCSBucketCreateAclEntryOperator,如示例所示

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_gcp_add_bucket_permission_task = GCSBucketCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="WRITER",
    bucket=file_url_split[1],  # netloc (bucket)
    task_id="sql_gcp_add_bucket_permission_task",
)

CloudSQLImportInstanceOperator

將 Cloud Storage 中的 SQL 轉儲或 CSV 檔案匯入 Cloud SQL 例項。

CSV 匯入:

對於 CSV 匯入,此操作器不是冪等的。如果多次匯入同一個檔案,匯入的資料將在資料庫中重複。此外,如果存在任何唯一約束,重複匯入可能會導致錯誤。

SQL 匯入:

如果 SQL 檔案也是由 Cloud SQL 匯出的,則此操作器對於 SQL 匯入是冪等的。匯出的 SQL 包含所有要匯入的表的 'DROP TABLE IF EXISTS' 語句。

如果匯入檔案是以不同方式生成的,則不保證冪等性。這必須在 SQL 檔案級別上確保。

有關引數定義,請參閱 CloudSQLImportInstanceOperator

引數

定義匯入操作的示例請求體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

import_body = {"importContext": {"fileType": "sql", "uri": FILE_URI}}

使用操作器

您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_import_task = CloudSQLImportInstanceOperator(
    body=import_body, instance=INSTANCE_NAME, task_id="sql_import_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,瞭解 如何匯入資料

故障排除

如果您在 Google Cloud 中收到“Unauthorized”(未授權)錯誤,請確保 Cloud SQL 例項的服務帳號已獲得從所選 GCS 物件讀取資料的授權。

與 GCS 通訊的不是在 Airflow 中配置的服務帳號,而是特定 Cloud SQL 例項的服務帳號。

要授予服務帳號對 GCS 物件的相應 READ 許可權,您可以使用 GCSBucketCreateAclEntryOperator,如示例所示

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_gcp_add_object_permission_task = GCSObjectCreateAclEntryOperator(
    entity=f"user-{service_account_email}",
    role="READER",
    bucket=file_url_split[1],  # netloc (bucket)
    object_name=file_url_split[2][1:],  # path (strip first '/')
    task_id="sql_gcp_add_object_permission_task",
)

CloudSQLCreateInstanceOperator

在 Google Cloud 中建立新的 Cloud SQL 例項。

它也用於建立只讀副本。

有關引數定義,請參閱 CloudSQLCreateInstanceOperator

如果存在同名例項,將不執行任何操作,並且操作器將成功。

引數

定義帶故障轉移副本的例項的示例請求體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

body = {
    "name": INSTANCE_NAME,
    "settings": {
        "tier": "db-n1-standard-1",
        "backupConfiguration": {"binaryLogEnabled": True, "enabled": True, "startTime": "05:00"},
        "activationPolicy": "ALWAYS",
        "dataDiskSizeGb": 30,
        "dataDiskType": "PD_SSD",
        "databaseFlags": [],
        "ipConfiguration": {
            "ipv4Enabled": True,
            "requireSsl": True,
        },
        "locationPreference": {"zone": "europe-west4-a"},
        "maintenanceWindow": {"hour": 5, "day": 7, "updateTrack": "canary"},
        "pricingPlan": "PER_USE",
        "storageAutoResize": True,
        "storageAutoResizeLimit": 0,
        "userLabels": {"my-key": "my-value"},
    },
    "databaseVersion": "MYSQL_5_7",
    "region": "europe-west4",
}

使用操作器

您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_instance_create_task = CloudSQLCreateInstanceOperator(
    body=body, instance=INSTANCE_NAME, task_id="sql_instance_create_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,瞭解 如何建立例項

CloudSQLInstancePatchOperator

更新 Google Cloud 中 Cloud SQL 例項的設定(部分更新)。

有關引數定義,請參閱 CloudSQLInstancePatchOperator

這是一個部分更新,因此只會設定/更新請求體中指定的設定值。現有例項的其餘配置將保持不變。

引數

定義例項的示例請求體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

patch_body = {
    "name": INSTANCE_NAME,
    "settings": {
        "dataDiskSizeGb": 35,
        "maintenanceWindow": {"hour": 3, "day": 6, "updateTrack": "canary"},
        "userLabels": {"my-key-patch": "my-value-patch"},
    },
}

使用操作器

您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_instance_patch_task = CloudSQLInstancePatchOperator(
    body=patch_body, instance=INSTANCE_NAME, task_id="sql_instance_patch_task"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "body",
    "gcp_conn_id",
    "api_version",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud SQL API 文件,瞭解 如何打補丁更新例項

CloudSQLCloneInstanceOperator

克隆 Cloud SQL 例項。

有關引數定義,請參閱 CloudSQLCloneInstanceOperator

引數

有關 clone_context 物件屬性,請參閱 CloneContext

使用操作器

您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體

tests/system/google/cloud/cloud_sql/example_cloud_sql.py

sql_instance_clone = CloudSQLCloneInstanceOperator(
    instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
)

模板化

template_fields: Sequence[str] = (
    "project_id",
    "instance",
    "destination_instance_name",
    "gcp_conn_id",
    "api_version",
)

更多資訊

請參閱 Google Cloud SQL API 文件,瞭解 如何克隆例項

CloudSQLExecuteQueryOperator

在 Google Cloud SQL 例項中執行 DDL 或 DML SQL 查詢。不支援 DQL(從 Google Cloud SQL 中檢索資料)。您可以執行 SELECT 查詢,但這些查詢的結果將被丟棄。

您可以指定各種連線方法來連線正在執行的例項,從公共 IP 明文連線,到使用 SSL 的公共 IP,或透過 Cloud SQL Proxy 進行 TCP 和套接字連線。代理會根據操作器的需要動態下載、啟動/停止。

有一種 gcpcloudsql://* 連線型別,您應該使用它來定義操作器要使用的連線型別。此連線是一種“元”連線。它本身不用於建立實際連線,而是確定 CloudSQLDatabaseHook 是否應啟動 Cloud SQL Proxy,以及應動態建立何種資料庫連線(Postgres 或 MySQL)以透過公共 IP 地址或代理連線到 Cloud SQL。CloudSqlDatabaseHook 使用 CloudSqlProxyRunner 來管理 Cloud SQL Proxy 的生命週期(每個任務都有自己的 Cloud SQL Proxy)

構建連線時,您應使用 CloudSQLDatabaseHook 中描述的連線引數。下方顯示了所有可能的連線型別的連線示例。此類連線可在不同任務(CloudSqlQueryOperator 的例項)之間重用。如果需要,每個任務都會啟動自己的代理,並使用自己的 TCP 或 UNIX 套接字。

有關引數定義,請參閱 CloudSQLExecuteQueryOperator

由於查詢操作器可以執行任意查詢,因此不能保證其冪等性。SQL 查詢設計者應將查詢設計為冪等。例如,Postgres 和 MySQL 都支援 CREATE TABLE IF NOT EXISTS 語句,可用於以冪等方式建立表。

引數

如果您透過環境變數中定義的 AIRFLOW_CONN_{CONN_ID} URL 定義連線,請確保 URL 中的 URL 元件經過 URL 編碼。有關詳細資訊,請參閱以下示例。

請注意,在使用 SSL 連線的情況下,您需要有一種機制來使證書/金鑰檔案在預定義的位置可用於操作器可以執行的所有工作程序。例如,可以透過在所有工作程序的同一路徑下掛載類似 NFS 的捲來提供此功能。

所有非 SSL 連線的示例連線定義。請注意,連線 URI 的所有元件都應進行 URL 編碼

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py

# Connect via proxy over TCP
CONNECTION_PROXY_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_use_tcp": "True",
    },
}

# Connect via proxy over UNIX socket (specific proxy version)
CONNECTION_PROXY_SOCKET_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "True",
        "sql_proxy_version": "v1.33.9",
        "sql_proxy_use_tcp": "False",
    },
}

# Connect directly via TCP (non-SSL)
CONNECTION_PUBLIC_TCP_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "False",
    },
}

所有啟用 SSL 的連線的類似連線定義

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py

# Connect directly via TCP (SSL)
CONNECTION_PUBLIC_TCP_SSL_KWARGS = {
    "conn_type": "gcpcloudsql",
    "login": CLOUD_SQL_USER,
    "password": CLOUD_SQL_PASSWORD,
    "host": CLOUD_SQL_IP_ADDRESS,
    "port": CLOUD_SQL_PUBLIC_PORT,
    "schema": CLOUD_SQL_DATABASE_NAME,
    "extra": {
        "database_type": DATABASE_TYPE,
        "project_id": PROJECT_ID,
        "location": REGION,
        "instance": CLOUD_SQL_INSTANCE_NAME,
        "use_proxy": "False",
        "use_ssl": "True",
    },
}

還可以透過環境變數配置連線(請注意,如果您使用標準的 AIRFLOW 表示法透過環境變數定義連線,則操作器中的連線 ID 與 AIRFLOW_CONN_{CONN_ID} 的字尾大寫字母匹配)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect via proxy over TCP
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=True".format(**postgres_kwargs)
)

# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**postgres_kwargs)
)

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect via proxy over TCP (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_version=v1.13&"
    "sql_proxy_use_tcp=True".format(**mysql_kwargs)
)

# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
os.environ["AIRFLOW_CONN_PROXY_MYSQL_SOCKET"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=True&"
    "sql_proxy_use_tcp=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=False".format(**mysql_kwargs)
)

# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

# Special case: MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql
# Proxy binary path AND with missing project_id
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL_NO_PROJECT_ID"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py


# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).

postgres_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=postgres&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**postgres_kwargs)
)

mysql_kwargs = {
    "user": "user",
    "password": "password",
    "public_ip": "public_ip",
    "public_port": "public_port",
    "database": "database",
    "project_id": "project_id",
    "location": "location",
    "instance": "instance",
    "client_cert_file": "client_cert_file",
    "client_key_file": "client_key_file",
    "server_ca_file": "server_ca_file",
}

# MySQL: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
    "database_type=mysql&"
    "project_id={project_id}&"
    "location={location}&"
    "instance={instance}&"
    "use_proxy=False&"
    "use_ssl=True&"
    "sslcert={client_cert_file}&"
    "sslkey={client_key_file}&"
    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
)

使用操作器

下面的示例操作器使用了先前準備好的連線。它可能是來自 Airflow 資料庫的 connection_id,或者透過環境變數配置的連線(請注意,如果您使用標準的 AIRFLOW 表示法透過環境變數定義連線,則操作器中的連線 ID 與 AIRFLOW_CONN_{CONN_ID} 的字尾大寫字母匹配)

tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py

                query_task = CloudSQLExecuteQueryOperator(
                    gcp_cloudsql_conn_id=connection_id,
                    task_id=task_id,
                    sql=SQL,
                )

SSL 設定也可以在操作器級別指定。在這種情況下,連線中配置的 SSL 設定將被覆蓋。其中一種方法是指定每個證書檔案的路徑,如下所示。請注意,出於安全原因,這些檔案將被複制到具有最低所需許可權的臨時位置。

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py

        query_task = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_client_cert=ssl_cert_path,
            ssl_server_cert=ssl_server_cert_path,
            ssl_client_key=ssl_key_path,
        )

您還可以將您的 SSL 證書儲存到 Google Cloud Secret Manager 中,並提供一個 Secret ID。Secret 格式如下:.. code-block:: python

{“sslcert”: “”, “sslkey”: “”, “sslrootcert”: “”}

tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py

        query_task_secret = CloudSQLExecuteQueryOperator(
            gcp_cloudsql_conn_id=conn_id,
            task_id=task_id,
            sql=SQL,
            ssl_secret_id=secret_id,
        )

模板化

template_fields: Sequence[str] = (
    "sql",
    "gcp_cloudsql_conn_id",
    "gcp_conn_id",
    "ssl_server_cert",
    "ssl_client_cert",
    "ssl_client_key",
    "ssl_secret_id",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}

更多資訊

請參閱 Google Cloud SQL 文件,瞭解 MySQLPostgreSQL 相關的代理。

參考

更多資訊請參閱

此條目有幫助嗎?