Google Cloud SQL 操作器¶
前提任務¶
要使用這些操作器,您必須執行以下幾項操作
使用 Cloud 控制檯 選擇或建立一個 Cloud Platform 專案。
如 Google Cloud 文件 中所述,為您的專案啟用結算功能。
如 Cloud 控制檯文件 中所述,啟用 API。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
CloudSQLCreateInstanceDatabaseOperator¶
在 Cloud SQL 例項中建立新的資料庫。
有關引數定義,請參閱 CloudSQLCreateInstanceDatabaseOperator。
使用操作器¶
您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_db_create_task = CloudSQLCreateInstanceDatabaseOperator(
body=db_create_body, instance=INSTANCE_NAME, task_id="sql_db_create_task"
)
示例請求體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
db_create_body = {"instance": INSTANCE_NAME, "name": DB_NAME, "project": PROJECT_ID}
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,瞭解 如何在例項中建立新資料庫。
CloudSQLDeleteInstanceDatabaseOperator¶
從 Cloud SQL 例項中刪除資料庫。
有關引數定義,請參閱 CloudSQLDeleteInstanceDatabaseOperator。
使用操作器¶
您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_db_delete_task = CloudSQLDeleteInstanceDatabaseOperator(
instance=INSTANCE_NAME,
database=DB_NAME,
task_id="sql_db_delete_task",
trigger_rule=TriggerRule.ALL_DONE,
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"database",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,瞭解 如何刪除資料庫。
CloudSQLPatchInstanceDatabaseOperator¶
使用 patch 語義更新包含 Cloud SQL 例項中資料庫資訊的資源。請參閱:https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
有關引數定義,請參閱 CloudSQLPatchInstanceDatabaseOperator。
使用操作器¶
您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_db_patch_task = CloudSQLPatchInstanceDatabaseOperator(
body=db_patch_body,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id="sql_db_patch_task",
)
示例請求體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
db_patch_body = {"charset": "utf16", "collation": "utf16_general_ci"}
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"database",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,瞭解 如何更新資料庫。
CloudSQLDeleteInstanceOperator¶
在 Google Cloud 中刪除 Cloud SQL 例項。
它也用於刪除只讀和故障轉移副本。
有關引數定義,請參閱 CloudSQLDeleteInstanceOperator。
使用操作器¶
您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
instance=INSTANCE_NAME, task_id="sql_instance_delete_task", trigger_rule=TriggerRule.ALL_DONE
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,瞭解 如何刪除 SQL 例項。
CloudSQLExportInstanceOperator¶
將 Cloud SQL 例項中的資料匯出到 Cloud Storage 儲存桶中,格式為 SQL 轉儲或 CSV 檔案。
注意
此操作器是冪等的。如果使用相同的匯出檔案 URI 多次執行,GCS 中的匯出檔案將被簡單覆蓋。
有關引數定義,請參閱 CloudSQLExportInstanceOperator。
引數¶
定義匯出操作的示例請求體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
export_body = {
"exportContext": {
"fileType": "sql",
"uri": FILE_URI,
"sqlExportOptions": {"schemaOnly": False},
"offload": True,
}
}
export_body_deferrable = {
"exportContext": {
"fileType": "sql",
"uri": FILE_URI_DEFERRABLE,
"sqlExportOptions": {"schemaOnly": False},
"offload": True,
}
}
使用操作器¶
您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_export_task = CloudSQLExportInstanceOperator(
body=export_body, instance=INSTANCE_NAME, task_id="sql_export_task"
)
此外,對於所有這些操作,您可以在可延遲模式下使用操作器
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_export_def_task = CloudSQLExportInstanceOperator(
body=export_body_deferrable,
instance=INSTANCE_NAME,
task_id="sql_export_def_task",
deferrable=True,
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,瞭解 如何匯出資料。
故障排除¶
如果您在 Google Cloud 中收到“Unauthorized”(未授權)錯誤,請確保 Cloud SQL 例項的服務帳號已獲得向所選 GCS 儲存桶寫入資料的授權。
與 GCS 通訊的不是在 Airflow 中配置的服務帳號,而是特定 Cloud SQL 例項的服務帳號。
要授予服務帳號對 GCS 儲存桶的相應 WRITE 許可權,您可以使用 GCSBucketCreateAclEntryOperator,如示例所示
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_gcp_add_bucket_permission_task = GCSBucketCreateAclEntryOperator(
entity=f"user-{service_account_email}",
role="WRITER",
bucket=file_url_split[1], # netloc (bucket)
task_id="sql_gcp_add_bucket_permission_task",
)
CloudSQLImportInstanceOperator¶
將 Cloud Storage 中的 SQL 轉儲或 CSV 檔案匯入 Cloud SQL 例項。
CSV 匯入:¶
對於 CSV 匯入,此操作器不是冪等的。如果多次匯入同一個檔案,匯入的資料將在資料庫中重複。此外,如果存在任何唯一約束,重複匯入可能會導致錯誤。
SQL 匯入:¶
如果 SQL 檔案也是由 Cloud SQL 匯出的,則此操作器對於 SQL 匯入是冪等的。匯出的 SQL 包含所有要匯入的表的 'DROP TABLE IF EXISTS' 語句。
如果匯入檔案是以不同方式生成的,則不保證冪等性。這必須在 SQL 檔案級別上確保。
有關引數定義,請參閱 CloudSQLImportInstanceOperator。
引數¶
定義匯入操作的示例請求體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
import_body = {"importContext": {"fileType": "sql", "uri": FILE_URI}}
使用操作器¶
您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_import_task = CloudSQLImportInstanceOperator(
body=import_body, instance=INSTANCE_NAME, task_id="sql_import_task"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,瞭解 如何匯入資料。
故障排除¶
如果您在 Google Cloud 中收到“Unauthorized”(未授權)錯誤,請確保 Cloud SQL 例項的服務帳號已獲得從所選 GCS 物件讀取資料的授權。
與 GCS 通訊的不是在 Airflow 中配置的服務帳號,而是特定 Cloud SQL 例項的服務帳號。
要授予服務帳號對 GCS 物件的相應 READ 許可權,您可以使用 GCSBucketCreateAclEntryOperator,如示例所示
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_gcp_add_object_permission_task = GCSObjectCreateAclEntryOperator(
entity=f"user-{service_account_email}",
role="READER",
bucket=file_url_split[1], # netloc (bucket)
object_name=file_url_split[2][1:], # path (strip first '/')
task_id="sql_gcp_add_object_permission_task",
)
CloudSQLCreateInstanceOperator¶
在 Google Cloud 中建立新的 Cloud SQL 例項。
它也用於建立只讀副本。
有關引數定義,請參閱 CloudSQLCreateInstanceOperator。
如果存在同名例項,將不執行任何操作,並且操作器將成功。
引數¶
定義帶故障轉移副本的例項的示例請求體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
body = {
"name": INSTANCE_NAME,
"settings": {
"tier": "db-n1-standard-1",
"backupConfiguration": {"binaryLogEnabled": True, "enabled": True, "startTime": "05:00"},
"activationPolicy": "ALWAYS",
"dataDiskSizeGb": 30,
"dataDiskType": "PD_SSD",
"databaseFlags": [],
"ipConfiguration": {
"ipv4Enabled": True,
"requireSsl": True,
},
"locationPreference": {"zone": "europe-west4-a"},
"maintenanceWindow": {"hour": 5, "day": 7, "updateTrack": "canary"},
"pricingPlan": "PER_USE",
"storageAutoResize": True,
"storageAutoResizeLimit": 0,
"userLabels": {"my-key": "my-value"},
},
"databaseVersion": "MYSQL_5_7",
"region": "europe-west4",
}
使用操作器¶
您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_instance_create_task = CloudSQLCreateInstanceOperator(
body=body, instance=INSTANCE_NAME, task_id="sql_instance_create_task"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,瞭解 如何建立例項。
CloudSQLInstancePatchOperator¶
更新 Google Cloud 中 Cloud SQL 例項的設定(部分更新)。
有關引數定義,請參閱 CloudSQLInstancePatchOperator。
這是一個部分更新,因此只會設定/更新請求體中指定的設定值。現有例項的其餘配置將保持不變。
引數¶
定義例項的示例請求體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
patch_body = {
"name": INSTANCE_NAME,
"settings": {
"dataDiskSizeGb": 35,
"maintenanceWindow": {"hour": 3, "day": 6, "updateTrack": "canary"},
"userLabels": {"my-key-patch": "my-value-patch"},
},
}
使用操作器¶
您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_instance_patch_task = CloudSQLInstancePatchOperator(
body=patch_body, instance=INSTANCE_NAME, task_id="sql_instance_patch_task"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"body",
"gcp_conn_id",
"api_version",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,瞭解 如何打補丁更新例項。
CloudSQLCloneInstanceOperator¶
克隆 Cloud SQL 例項。
有關引數定義,請參閱 CloudSQLCloneInstanceOperator。
引數¶
有關 clone_context 物件屬性,請參閱 CloneContext
使用操作器¶
您可以建立帶或不帶專案 ID 的操作器。如果缺少專案 ID,則會從使用的 Google Cloud 連線中檢索。此處顯示了兩種變體
tests/system/google/cloud/cloud_sql/example_cloud_sql.py
sql_instance_clone = CloudSQLCloneInstanceOperator(
instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance",
"destination_instance_name",
"gcp_conn_id",
"api_version",
)
更多資訊¶
請參閱 Google Cloud SQL API 文件,瞭解 如何克隆例項。
CloudSQLExecuteQueryOperator¶
在 Google Cloud SQL 例項中執行 DDL 或 DML SQL 查詢。不支援 DQL(從 Google Cloud SQL 中檢索資料)。您可以執行 SELECT 查詢,但這些查詢的結果將被丟棄。
您可以指定各種連線方法來連線正在執行的例項,從公共 IP 明文連線,到使用 SSL 的公共 IP,或透過 Cloud SQL Proxy 進行 TCP 和套接字連線。代理會根據操作器的需要動態下載、啟動/停止。
有一種 gcpcloudsql://* 連線型別,您應該使用它來定義操作器要使用的連線型別。此連線是一種“元”連線。它本身不用於建立實際連線,而是確定 CloudSQLDatabaseHook 是否應啟動 Cloud SQL Proxy,以及應動態建立何種資料庫連線(Postgres 或 MySQL)以透過公共 IP 地址或代理連線到 Cloud SQL。CloudSqlDatabaseHook 使用 CloudSqlProxyRunner 來管理 Cloud SQL Proxy 的生命週期(每個任務都有自己的 Cloud SQL Proxy)
構建連線時,您應使用 CloudSQLDatabaseHook 中描述的連線引數。下方顯示了所有可能的連線型別的連線示例。此類連線可在不同任務(CloudSqlQueryOperator 的例項)之間重用。如果需要,每個任務都會啟動自己的代理,並使用自己的 TCP 或 UNIX 套接字。
有關引數定義,請參閱 CloudSQLExecuteQueryOperator。
由於查詢操作器可以執行任意查詢,因此不能保證其冪等性。SQL 查詢設計者應將查詢設計為冪等。例如,Postgres 和 MySQL 都支援 CREATE TABLE IF NOT EXISTS 語句,可用於以冪等方式建立表。
引數¶
如果您透過環境變數中定義的 AIRFLOW_CONN_{CONN_ID} URL 定義連線,請確保 URL 中的 URL 元件經過 URL 編碼。有關詳細資訊,請參閱以下示例。
請注意,在使用 SSL 連線的情況下,您需要有一種機制來使證書/金鑰檔案在預定義的位置可用於操作器可以執行的所有工作程序。例如,可以透過在所有工作程序的同一路徑下掛載類似 NFS 的捲來提供此功能。
所有非 SSL 連線的示例連線定義。請注意,連線 URI 的所有元件都應進行 URL 編碼
tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py
# Connect via proxy over TCP
CONNECTION_PROXY_TCP_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "True",
"sql_proxy_use_tcp": "True",
},
}
# Connect via proxy over UNIX socket (specific proxy version)
CONNECTION_PROXY_SOCKET_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "True",
"sql_proxy_version": "v1.33.9",
"sql_proxy_use_tcp": "False",
},
}
# Connect directly via TCP (non-SSL)
CONNECTION_PUBLIC_TCP_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "False",
"use_ssl": "False",
},
}
所有啟用 SSL 的連線的類似連線定義
tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py
# Connect directly via TCP (SSL)
CONNECTION_PUBLIC_TCP_SSL_KWARGS = {
"conn_type": "gcpcloudsql",
"login": CLOUD_SQL_USER,
"password": CLOUD_SQL_PASSWORD,
"host": CLOUD_SQL_IP_ADDRESS,
"port": CLOUD_SQL_PUBLIC_PORT,
"schema": CLOUD_SQL_DATABASE_NAME,
"extra": {
"database_type": DATABASE_TYPE,
"project_id": PROJECT_ID,
"location": REGION,
"instance": CLOUD_SQL_INSTANCE_NAME,
"use_proxy": "False",
"use_ssl": "True",
},
}
還可以透過環境變數配置連線(請注意,如果您使用標準的 AIRFLOW 表示法透過環境變數定義連線,則操作器中的連線 ID 與 AIRFLOW_CONN_{CONN_ID} 的字尾大寫字母匹配)
tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py
# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).
postgres_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# Postgres: connect via proxy over TCP
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_use_tcp=True".format(**postgres_kwargs)
)
# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_POSTGRES_SOCKET"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_version=v1.13&"
"sql_proxy_use_tcp=False".format(**postgres_kwargs)
)
# Postgres: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=False".format(**postgres_kwargs)
)
# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**postgres_kwargs)
)
mysql_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# MySQL: connect via proxy over TCP (specific proxy version)
os.environ["AIRFLOW_CONN_PROXY_MYSQL_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_version=v1.13&"
"sql_proxy_use_tcp=True".format(**mysql_kwargs)
)
# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
os.environ["AIRFLOW_CONN_PROXY_MYSQL_SOCKET"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_use_tcp=False".format(**mysql_kwargs)
)
# MySQL: connect directly via TCP (non-SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=False".format(**mysql_kwargs)
)
# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
)
# Special case: MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql
# Proxy binary path AND with missing project_id
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL_NO_PROJECT_ID"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
)
tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py
# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).
postgres_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# Postgres: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=postgres&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**postgres_kwargs)
)
mysql_kwargs = {
"user": "user",
"password": "password",
"public_ip": "public_ip",
"public_port": "public_port",
"database": "database",
"project_id": "project_id",
"location": "location",
"instance": "instance",
"client_cert_file": "client_cert_file",
"client_key_file": "client_key_file",
"server_ca_file": "server_ca_file",
}
# MySQL: connect directly via TCP (SSL)
os.environ["AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL"] = (
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?"
"database_type=mysql&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=False&"
"use_ssl=True&"
"sslcert={client_cert_file}&"
"sslkey={client_key_file}&"
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
)
使用操作器¶
下面的示例操作器使用了先前準備好的連線。它可能是來自 Airflow 資料庫的 connection_id,或者透過環境變數配置的連線(請注意,如果您使用標準的 AIRFLOW 表示法透過環境變數定義連線,則操作器中的連線 ID 與 AIRFLOW_CONN_{CONN_ID} 的字尾大寫字母匹配)
tests/system/google/cloud/cloud_sql/example_cloud_sql_query.py
query_task = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=connection_id,
task_id=task_id,
sql=SQL,
)
SSL 設定也可以在操作器級別指定。在這種情況下,連線中配置的 SSL 設定將被覆蓋。其中一種方法是指定每個證書檔案的路徑,如下所示。請注意,出於安全原因,這些檔案將被複制到具有最低所需許可權的臨時位置。
tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py
query_task = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=conn_id,
task_id=task_id,
sql=SQL,
ssl_client_cert=ssl_cert_path,
ssl_server_cert=ssl_server_cert_path,
ssl_client_key=ssl_key_path,
)
您還可以將您的 SSL 證書儲存到 Google Cloud Secret Manager 中,並提供一個 Secret ID。Secret 格式如下:.. code-block:: python
{“sslcert”: “”, “sslkey”: “”, “sslrootcert”: “”}
tests/system/google/cloud/cloud_sql/example_cloud_sql_query_ssl.py
query_task_secret = CloudSQLExecuteQueryOperator(
gcp_cloudsql_conn_id=conn_id,
task_id=task_id,
sql=SQL,
ssl_secret_id=secret_id,
)
模板化¶
template_fields: Sequence[str] = (
"sql",
"gcp_cloudsql_conn_id",
"gcp_conn_id",
"ssl_server_cert",
"ssl_client_cert",
"ssl_client_key",
"ssl_secret_id",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}
更多資訊¶
請參閱 Google Cloud SQL 文件,瞭解 MySQL 和 PostgreSQL 相關的代理。
參考¶
更多資訊請參閱