airflow.providers.google.cloud.log.stackdriver_task_handler

整合 Stackdriver 的處理器。

屬性

DEFAULT_LOGGER_NAME

StackdriverTaskHandler

直接進行 Stackdriver 日誌 API 呼叫的處理器。

模組內容

airflow.providers.google.cloud.log.stackdriver_task_handler.DEFAULT_LOGGER_NAME = 'airflow'[source]
class airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler(gcp_key_path=None, scopes=_DEFAULT_SCOPESS, name=NOTSET, transport=BackgroundThreadTransport, resource=_GLOBAL_RESOURCE, labels=None, gcp_log_name=DEFAULT_LOGGER_NAME)[source]

繼承自: logging.Handler

直接進行 Stackdriver 日誌 API 呼叫的處理器。

這是 Python 標準 logging 處理器,可用於將 Python 標準日誌訊息直接路由到 Stackdriver 日誌 API。

它也可用於儲存執行任務的日誌。為此,應將其設定為名稱為“tasks”的處理器。在這種情況下,它也將用於讀取日誌以便在 Web UI 中顯示。

此處理器支援非同步和同步傳輸。

引數:
  • gcp_key_path (str | None) – Google Cloud 憑據 JSON 檔案的路徑。如果省略,將使用基於 Application Default Credentials 的授權。

  • scopes (collections.abc.Collection[str] | None) – 憑據的 OAuth 範圍,

  • name (str | airflow.utils.types.ArgNotSet) – Stackdriver 日誌中自定義日誌的名稱。預設為 'airflow'。Python 日誌記錄器的名稱將在 python_logger 欄位中表示。

  • transport (type[google.cloud.logging.handlers.transports.Transport]) – 用於建立新傳輸物件的類。它應繼承自基礎 google.cloud.logging.handlers.Transport 型別並實現 :meth`google.cloud.logging.handlers.Transport.send`。預設為 google.cloud.logging.handlers.BackgroundThreadTransport。另一個選項是 google.cloud.logging.handlers.SyncTransport

  • resource (google.cloud.logging.Resource) – (可選)條目的受監控資源,預設為全域性資源型別。

  • labels (dict[str, str] | None) – (可選)條目的標籤對映。

LABEL_TASK_ID = 'task_id'[source]
LABEL_DAG_ID = 'dag_id'[source]
LABEL_LOGICAL_DATE = 'logical_date'[source]
LABEL_TRY_NUMBER = 'try_number'[source]
LOG_VIEWER_BASE_URL = 'https://console.cloud.google.com/logs/viewer'[source]
LOG_NAME = 'Google Stackdriver'[source]
trigger_supported = True[source]
trigger_should_queue = False[source]
trigger_should_wrap = False[source]
trigger_send_end_marker = False[source]
gcp_key_path: str | None = None[source]
scopes: collections.abc.Collection[str] | None[source]
gcp_log_name: str = 'airflow'[source]
transport_type: type[google.cloud.logging.handlers.transports.Transport][source]
resource: google.cloud.logging.Resource[source]
labels: dict[str, str] | None = None[source]
task_instance_labels: dict[str, str] | None[source]
task_instance_hostname = 'default-hostname'[source]
emit(record)[source]

實際記錄指定的日誌記錄。

引數:

record (logging.LogRecord) – 要記錄的記錄。

set_context(task_instance)[source]

配置日誌記錄器以新增有關當前任務的資訊。

引數:

task_instance (airflow.models.TaskInstance) – 當前執行的任務

read(task_instance, try_number=None, metadata=None)[source]

從 Stackdriver 日誌讀取給定任務例項的日誌。

引數:
  • task_instance (airflow.models.TaskInstance) – 任務例項物件

  • try_number (int | None) – 要從中讀取日誌的任務例項嘗試次數 (try_number)。如果為 None,則返回所有日誌

  • metadata (dict | None) – 日誌元資料。用於流式日誌讀取和自動尾隨。

返回:

一個元組,包含(主機名和日誌組成的雙元素元組的單元素元組列表,以及元資料列表)

返回型別:

tuple[list[tuple[tuple[str, str]]], list[dict[str, str | bool]]]

property log_name[source]

返回日誌名稱。

get_external_log_url(task_instance, try_number)[source]

建立外部日誌收集服務的地址。

引數:
  • task_instance (airflow.models.TaskInstance) – 任務例項物件

  • try_number (int) – 要從中讀取日誌的任務例項嘗試次數 (try_number)

返回:

外部日誌收集服務的 URL

返回型別:

str

close()[source]

清理此處理器使用的所有資源。

此版本將處理器從內部處理器對映 _handlers 中移除,該對映用於按名稱查詢處理器。子類應確保在覆蓋的 close() 方法中呼叫此方法。

本條目是否有幫助?