序列化¶
為了支援任務間的資料交換(例如引數),Airflow 需要對要交換的資料進行序列化,並在下游任務需要時再次反序列化。序列化也發生在 Web 伺服器和排程器(而非 DAG 處理器)無需讀取 DAG 檔案時。這樣做是為了安全和效率。
序列化是一項出奇困難的工作。Python 本身僅支援對原始型別(如 str 和 int)進行序列化,並遍歷可迭代物件。當情況變得更復雜時,就需要自定義序列化。
Airflow 本身支援三種自定義序列化方式。原始型別按原樣返回,無需額外編碼,例如 str 仍然是 str。當它不是原始型別(或其可迭代物件)時,Airflow 會在 airflow.serialization.serializers 名稱空間中查詢已註冊的序列化器和反序列化器。如果找不到,它會在類中查詢 serialize() 方法,或者在反序列化時查詢 deserialize(data, version: int) 方法。最後,如果類使用 @dataclass 或 @attr.define 裝飾器裝飾,它將使用這些裝飾器提供的公共方法。
如果您想擴充套件 Airflow 以支援新的序列化器,瞭解何時選擇哪種序列化方式非常有用。由 Airflow 控制的物件,即位於 airflow.* 名稱空間下的物件(如 airflow.model.dag.DAG)或由開發人員控制的物件(如 my.company.Foo),應首先檢查它們是否可以使用 @attr.define 或 @dataclass 進行裝飾。如果不可能,則應實現 serialize 和 deserialize 方法。serialize 方法應返回一個原始型別或一個字典。它無需序列化字典中的值(這會由 Airflow 處理),但鍵必須是原始型別形式。
不受 Airflow 控制的物件(例如 numpy.int16)將需要註冊的序列化器和反序列化器。需要版本控制。原始型別(不包括 bytes)可以作為字典返回。同樣,dict 的值不需要序列化,但其鍵需要是原始型別形式。如果您正在實現已註冊的序列化器,請特別注意避免迴圈匯入。通常,可以透過使用 str 來填充序列化器列表來避免這種情況。例如:serializers = ["my.company.Foo"] 而不是 serializers = [Foo]。
注意
序列化和反序列化依賴於速度。儘可能使用內建函式(如 dict),避免使用類和其他複雜結構。
Airflow 物件¶
from typing import Any, ClassVar
class Foo:
__version__: ClassVar[int] = 1
def __init__(self, a, v) -> None:
self.a = a
self.b = {"x": v}
def serialize(self) -> dict[str, Any]:
return {
"a": self.a,
"b": self.b,
}
@staticmethod
def deserialize(data: dict[str, Any], version: int):
f = Foo(a=data["a"])
f.b = data["b"]
return f
已註冊¶
from __future__ import annotations
from decimal import Decimal
from typing import TYPE_CHECKING
from airflow.utils.module_loading import qualname
if TYPE_CHECKING:
from airflow.serialization.serde import U
serializers = [
Decimal
] # this can be a type or a fully qualified str. Str can be used to prevent circular imports
deserializers = serializers # in some cases you might not have a deserializer (e.g. k8s pod)
__version__ = 1 # required
# the serializer expects output, classname, version, is_serialized?
def serialize(o: object) -> tuple[U, str, int, bool]:
if isinstance(o, Decimal):
name = qualname(o)
_, _, exponent = o.as_tuple()
if exponent >= 0: # No digits after the decimal point.
return int(o), name, __version__, True
# Technically lossy due to floating point errors, but the best we
# can do without implementing a custom encode function.
return float(o), name, __version__, True
return "", "", 0, False
# the deserializer sanitizes the data for you, so you do not need to deserialize values yourself
def deserialize(classname: str, version: int, data: object) -> Decimal:
# always check version compatibility
if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")
if classname != qualname(Decimal):
raise TypeError(f"{classname} != {qualname(Decimal)}")
return Decimal(str(data))