Airflow Summit 2025 即將於 10 月 07-09 日舉行。立即註冊可享早鳥票優惠!

動態任務對映

動態任務對映允許工作流在執行時根據當前資料建立多個任務,而無需 DAG 作者事先知道需要多少任務。

這類似於在 for 迴圈中定義任務,但不是讓 DAG 檔案獲取資料並自行完成,而是排程器可以根據前一個任務的輸出完成此操作。在對映任務執行之前,排程器會建立 n 個任務副本,每個輸入對應一個副本。

還可以讓任務對對映任務的收集輸出進行操作,這通常被稱為 map 和 reduce。

簡單對映

最簡單的形式是,您可以使用 expand() 函式而不是直接呼叫任務,來對映 DAG 檔案中直接定義的列表。

如果您想看到動態任務對映的簡單用法,可以檢視下方內容。

src/airflow/example_dags/example_dynamic_task_mapping.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping."""

from __future__ import annotations

from datetime import datetime

from airflow.sdk import DAG, task

with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3])
    sum_it(added_values)

with DAG(
    dag_id="example_task_mapping_second_order", schedule=None, catchup=False, start_date=datetime(2022, 3, 4)
) as dag2:

    @task
    def get_nums():
        return [1, 2, 3]

    @task
    def times_2(num):
        return num * 2

    @task
    def add_10(num):
        return num + 10

    _get_nums = get_nums()
    _times_2 = times_2.expand(num=_get_nums)
    add_10.expand(num=_times_2)

執行時,任務日誌中將顯示 Total was 9

這是生成的 DAG 結構。

../_images/mapping_simple_graph.png

網格檢視還在詳細資訊面板中提供了對映任務的可見性。

../_images/grid_mapped_task.png

注意

expand() 函式僅允許傳入關鍵字引數。

注意

從對映任務傳遞的值是惰性代理

在上面的示例中,sum_it 接收到的 valuesadd_one 的每個對映例項返回的所有值的集合。然而,由於無法事先知道我們將有多少個 add_one 例項,values 不是一個普通的列表,而是一個“惰性序列”,只有在請求時才會檢索每個單獨的值。因此,如果您直接執行 print(values),您會得到類似這樣的結果:

LazySelectSequence([15 items])

您可以在此物件上使用正常的序列語法(例如 values[0]),或使用 for 迴圈正常迭代。list(values) 將為您提供一個“真實”的 list,但由於這將立即載入所有引用上游對映任務的值,如果對映數量很大,您必須注意潛在的效能影響。

請注意,當您將此代理物件推送到 XCom 時,同樣適用。Airflow 會嘗試智慧地強制轉換值,但會為此發出警告,以便您知道這一點。例如:

@task
def forward_values(values):
    return values  # This is a lazy proxy!

將發出如下警告:

Coercing mapped lazy proxy return value from task forward_values to list, which may degrade
performance. Review resource requirements for this operation, and call list() explicitly to suppress this message. See Dynamic Task Mapping documentation for more information about lazy proxy objects.

可以透過修改任務來抑制此訊息,如下所示:

@task
def forward_values(values):
    return list(values)

注意

不需要 reduce 任務。

雖然我們在這裡展示了一個“reduce”任務(sum_it)您不一定需要它,即使沒有下游任務,對映任務仍將執行。

任務生成的對映

上面展示的示例都可以透過在 DAG 檔案中使用 for 迴圈來實現,但動態任務對映的真正強大之處在於能夠讓任務生成要迭代的列表。

@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
    print(arg)


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

make_list 任務作為普通任務執行,必須返回一個列表或字典(參見 可以展開哪些資料型別?),然後 consumer 任務將被呼叫四次,每次呼叫都會接收 make_list 返回值中的一個值。

警告

任務生成的對映不能與 TriggerRule.ALWAYS 一起使用。

在任務生成的對映中設定 trigger_rule=TriggerRule.ALWAYS 是不允許的,因為在任務立即執行時,展開的引數是未定義的。這在 DAG 解析時強制執行,對任務和對映任務組都適用,如果您嘗試使用它將引發錯誤。在最近的示例中,在 consumer 任務中設定 trigger_rule=TriggerRule.ALWAYS 將引發錯誤,因為 make_list 是任務生成的對映。

重複對映

一個對映任務的結果也可以用作下一個對映任務的輸入。

with DAG(dag_id="repeated_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    first = add_one.expand(x=[1, 2, 3])
    second = add_one.expand(x=first)

這將導致結果為 [3, 4, 5]

新增不展開的引數

除了傳遞在執行時展開的引數外,還可以傳遞不改變的引數——為了清楚區分這兩種引數,我們使用不同的函式:expand() 用於對映引數,partial() 用於未對映引數。

@task
def add(x: int, y: int):
    return x + y


added_values = add.partial(y=10).expand(x=[1, 2, 3])
# This results in add function being expanded to
# add(x=1, y=10)
# add(x=2, y=10)
# add(x=3, y=10)

這將導致值為 11、12 和 13。

這對於將連線 ID、資料庫表名或 bucket 名稱等內容傳遞給任務也很有用。

對映多個引數

除了單個引數外,還可以傳遞多個引數進行展開。這將產生“笛卡爾積”的效果,用每個引數組合呼叫對映任務。

@task
def add(x: int, y: int):
    return x + y


added_values = add.expand(x=[2, 4, 8], y=[5, 10])
# This results in the add function being called with
# add(x=2, y=5)
# add(x=2, y=10)
# add(x=4, y=5)
# add(x=4, y=10)
# add(x=8, y=5)
# add(x=8, y=10)

這將導致 add 任務被呼叫 6 次。但請注意,不能保證展開的順序。

命名對映

預設情況下,對映任務被分配一個整數索引。可以透過在 Airflow UI 中為每個對映任務基於任務輸入提供一個名稱來覆蓋整數索引。這是透過為任務提供一個 Jinja 模板來實現的,使用 map_index_template。通常,當展開看起來像 .expand(<property>=...) 時,模板看起來像 map_index_template="{{ task.<property> }}"。此模板在每個展開的任務執行後使用任務上下文進行渲染。這意味著您可以像這樣引用任務上的屬性:

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


# The two expanded task instances will be named "2024-01-01" and "2024-01-02".
SQLExecuteQueryOperator.partial(
    ...,
    sql="SELECT * FROM data WHERE date = %(date)s",
    map_index_template="""{{ task.parameters['date'] }}""",
).expand(
    parameters=[{"date": "2024-01-01"}, {"date": "2024-01-02"}],
)

在上面的示例中,展開的任務例項將被命名為“2024-01-01”和“2024-01-02”。這些名稱會顯示在 Airflow UI 中,而不是分別顯示“0”和“1”。

由於模板是在主執行塊之後渲染的,因此也可以動態注入到渲染上下文中。這在 Jinja 模板語法難以表達所需的名稱渲染邏輯時非常有用,尤其是在 taskflow 函式中。例如:

from airflow.sdk import get_current_context


@task(map_index_template="{{ my_variable }}")
def my_task(my_value: str):
    context = get_current_context()
    context["my_variable"] = my_value * 3
    ...  # Normal execution...


# The task instances will be named "aaa" and "bbb".
my_task.expand(my_value=["a", "b"])

使用非 TaskFlow Operator 進行對映

也可以將 partialexpand 與經典風格的 Operator 一起使用。某些引數不可對映,必須傳遞給 partial(),例如 task_idqueuepool 以及 BaseOperator 的大多數其他引數。

src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping with non-TaskFlow operators."""

from __future__ import annotations

from datetime import datetime

from airflow.models.baseoperator import BaseOperator
from airflow.sdk import DAG


class AddOneOperator(BaseOperator):
    """A custom operator that adds one to the input."""

    def __init__(self, value, **kwargs):
        super().__init__(**kwargs)
        self.value = value

    def execute(self, context):
        return self.value + 1


class SumItOperator(BaseOperator):
    """A custom operator that sums the input."""

    template_fields = ("values",)

    def __init__(self, values, **kwargs):
        super().__init__(**kwargs)
        self.values = values

    def execute(self, context):
        total = sum(self.values)
        print(f"Total was {total}")
        return total


with DAG(
    dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
    schedule=None,
    start_date=datetime(2022, 3, 4),
    catchup=False,
):
    # map the task to a list of values
    add_one_task = AddOneOperator.partial(task_id="add_one").expand(value=[1, 2, 3])

    # aggregate (reduce) the mapped tasks results
    sum_it_task = SumItOperator(task_id="sum_it", values=add_one_task.output)

注意

partial() 函式僅允許傳入關鍵字引數。

對映經典 Operator 的結果

如果您想對映經典 Operator 的結果,您應該明確引用其 *輸出*,而不是 Operator 本身。

# Create a list of data inputs.
extract = ExtractOperator(task_id="extract")

# Expand the operator to transform each input.
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)

# Collect the transformed inputs, expand the operator to load each one of them to the target.
load = LoadOperator.partial(task_id="load").expand(input=transform.output)

混合使用 TaskFlow 和經典 Operator

在此示例中,您有一個常規資料傳遞到 S3 bucket,並且想要對到達的每個檔案應用相同的處理,無論每次到達多少檔案。

from datetime import datetime

from airflow.sdk import DAG
from airflow.sdk import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator


with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
    list_filenames = S3ListOperator(
        task_id="get_input",
        bucket="example-bucket",
        prefix='incoming/provider_a/{{ data_interval_start.strftime("%Y-%m-%d") }}',
    )

    @task
    def count_lines(aws_conn_id, bucket, filename):
        hook = S3Hook(aws_conn_id=aws_conn_id)

        return len(hook.read_key(filename, bucket).splitlines())

    @task
    def total(lines):
        return sum(lines)

    counts = count_lines.partial(aws_conn_id="aws_default", bucket=list_filenames.bucket).expand(
        filename=list_filenames.output
    )

    total(lines=counts)

為非 TaskFlow Operator 分配多個引數

有時上游需要為下游 Operator 指定多個引數。為此,您可以使用 expand_kwargs 函式,該函式接受一個對映序列來進行對映。

BashOperator.partial(task_id="bash").expand_kwargs(
    [
        {"bash_command": "echo $ENV1", "env": {"ENV1": "1"}},
        {"bash_command": "printf $ENV2", "env": {"ENV2": "2"}},
    ],
)

這在執行時產生兩個任務例項,分別列印 12

還可以將 expand_kwargs 與大多數 Operator 引數混合使用,例如 PythonOperator 的 op_kwargs

def print_args(x, y):
    print(x)
    print(y)
    return x + y


PythonOperator.partial(task_id="task-1", python_callable=print_args).expand_kwargs(
    [
        {"op_kwargs": {"x": 1, "y": 2}, "show_return_value_in_logs": True},
        {"op_kwargs": {"x": 3, "y": 4}, "show_return_value_in_logs": False},
    ]
)

expand 類似,您也可以對映返回字典列表的 XCom,或對映返回字典的 XCom 列表。重用上面的 S3 示例,您可以使用對映任務來執行“分支”並將檔案複製到不同的 bucket。

list_filenames = S3ListOperator(...)  # Same as the above example.


@task
def create_copy_kwargs(filename):
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        dest_bucket_name = "my_text_bucket"
    else:
        dest_bucket_name = "my_other_bucket"
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": dest_bucket_name,
    }


copy_kwargs = create_copy_kwargs.expand(filename=list_filenames.output)

# Copy files to another bucket, based on the file's extension.
copy_filenames = S3CopyObjectOperator.partial(
    task_id="copy_files", source_bucket_name=list_filenames.bucket
).expand_kwargs(copy_kwargs)

對映任務組

類似於 TaskFlow 任務,您也可以對 @task_group 裝飾的函式呼叫 expandexpand_kwargs 來建立對映任務組。

注意

為簡潔起見,本節省略了單個任務的實現。

@task_group
def file_transforms(filename):
    return convert_to_yaml(filename)


file_transforms.expand(filename=["data1.json", "data2.json"])

在上面的示例中,任務 convert_to_yaml 在執行時被展開為兩個任務例項。第一個展開的例項將接收 "data1.json" 作為輸入,第二個接收 "data2.json"

任務組函式中的值引用

任務函式(@task)和任務 *組* 函式(@task_group)之間一個重要的區別是,由於任務組沒有關聯的工作程序,任務組函式中的程式碼無法解析傳遞給它的引數;實際值只有當引用傳遞給任務時才會被解析。

例如,這段程式碼將 *無法* 工作:

@task
def my_task(value):
    print(value)


@task_group
def my_task_group(value):
    if not value:  # DOES NOT work as you'd expect!
        task_a = EmptyOperator(...)
    else:
        task_a = PythonOperator(...)
    task_a << my_task(value)


my_task_group.expand(value=[0, 1, 2])

my_task_group 中的程式碼執行時,value 仍然只會是一個引用,而不是實際值,所以 if not value 分支不會按照您可能期望的方式工作。但是,如果您將該引用傳遞給一個任務,它將在任務執行時被解析,因此三個 my_task 例項將分別接收到 1、2 和 3。

因此,重要的是要記住,如果您打算對傳遞給任務組函式的值執行任何邏輯,則必須始終使用任務來執行邏輯,例如用於條件的 @task.branch(或 BranchPythonOperator),以及用於迴圈的任務對映方法。

注意

對映任務組中不允許進行任務對映

目前不允許在對映任務組中巢狀進行任務對映。雖然此功能的技術方面不是特別困難,但我們決定有意省略此功能,因為它會增加相當大的 UI 複雜性,並且可能對一般用例不是必需的。未來可能會根據使用者反饋重新考慮此限制。

深度優先執行

如果對映任務組包含多個任務,則組中的所有任務將針對相同的輸入“一起”展開。例如:

@task_group
def file_transforms(filename):
    converted = convert_to_yaml(filename)
    return replace_defaults(converted)


file_transforms.expand(filename=["data1.json", "data2.json"])

由於組 file_transforms 被展開為兩個,任務 convert_to_yamlreplace_defaults 在執行時將各自成為兩個例項。

透過如下所示單獨展開這兩個任務,也可以實現類似的效果:

converted = convert_to_yaml.expand(filename=["data1.json", "data2.json"])
replace_defaults.expand(filename=converted)

然而,區別在於任務組允許其中的每個任務只依賴於其“相關輸入”。對於上面的示例,第一個 replace_defaults 將能夠在 convert_to_yaml("data2.json") 完成之前執行,並且不需要關心它是否成功。這種策略被稱為*深度優先執行*(與簡單的無組*廣度優先執行*相對),它允許更合乎邏輯的任務分離、更精細的依賴規則以及準確的資源分配——以上面的示例為例,第一個 replace_defaults 將能夠在 convert_to_yaml("data2.json") 完成之前執行,並且不需要關心它是否成功。

依賴對映任務組的輸出

類似於對映任務組,依賴於對映任務組的輸出也將自動聚合組的結果。

@task_group
def add_to(value):
    value = add_one(value)
    return double(value)


results = add_to.expand(value=[1, 2, 3])
consumer(results)  # Will receive [4, 6, 8].

也可以像對待普通對映任務的結果一樣執行任何操作。

根據對映任務組的輸出進行分支

雖然不能對對映任務的結果實現分支邏輯(例如使用 @task.branch),但可以根據任務組的 *輸入* 進行分支。以下示例演示了根據對映任務組的輸入執行三個任務之一。

inputs = ["a", "b", "c"]


@task_group(group_id="my_task_group")
def my_task_group(input):
    @task.branch
    def branch(element):
        if "a" in element:
            return "my_task_group.a"
        elif "b" in element:
            return "my_task_group.b"
        else:
            return "my_task_group.c"

    @task
    def a():
        print("a")

    @task
    def b():
        print("b")

    @task
    def c():
        print("c")

    branch(input) >> [a(), b(), c()]


my_task_group.expand(input=inputs)

過濾對映任務中的項

對映任務可以透過返回 None 來阻止任何元素傳遞給其下游任務。例如,如果我們只想將 S3 bucket 中具有特定副檔名的檔案複製到另一個 bucket,我們可以這樣實現 create_copy_kwargs

@task
def create_copy_kwargs(filename):
    # Skip files not ending with these suffixes.
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        return None
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": "my_other_bucket",
    }


# copy_kwargs and copy_files are implemented the same.

這使得 copy_files 只針對 .json.yml 檔案進行展開,同時忽略其餘檔案。

轉換展開資料

由於通常需要轉換任務對映的輸出資料格式,尤其是從非 TaskFlow Operator 獲取的輸出(其輸出格式是預定的,且無法輕鬆轉換,例如上面示例中的 create_copy_kwargs),可以使用特殊的 map() 函式來輕鬆執行此類轉換。因此,上面的示例可以修改如下:

from airflow.exceptions import AirflowSkipException

list_filenames = S3ListOperator(...)  # Unchanged.


def create_copy_kwargs(filename):
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        raise AirflowSkipException(f"skipping {filename!r}; unexpected suffix")
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": "my_other_bucket",
    }


copy_kwargs = list_filenames.output.map(create_copy_kwargs)

# Unchanged.
copy_filenames = S3CopyObjectOperator.partial(...).expand_kwargs(copy_kwargs)

有幾點需要注意:

  1. map() 函式的可呼叫引數(示例中的 create_copy_kwargs)**不能**是一個任務,而是一個普通的 Python 函式。轉換是作為下游任務(即 copy_files)“預處理”的一部分,而不是 DAG 中的獨立任務。

  2. 可呼叫物件總是隻接受一個位置引數。對於用於任務對映的可迭代物件中的每個項,都會呼叫此函式,類似於 Python 內建的 map() 函式的工作方式。

  3. 由於可呼叫物件作為下游任務的一部分執行,您可以使用任何現有技術來編寫任務函式。例如,要將元件標記為跳過,您應該引發 AirflowSkipException。請注意,返回 None 在這裡**不起作用**。

組合上游資料(又稱“zip”)

將多個輸入源組合成一個任務對映可迭代物件也很常見。這通常稱為“zipping”(類似於 Python 內建的 zip() 函式),也作為下游任務的預處理執行。

這對於任務對映中的條件邏輯特別有用。例如,如果您想從 S3 下載檔案,但重新命名這些檔案,就可以實現類似這樣的功能:

list_filenames_a = S3ListOperator(
    task_id="list_files_in_a",
    bucket="bucket",
    prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = ["rename_1", "rename_2", "rename_3", ...]

filenames_a_b = list_filenames_a.output.zip(list_filenames_b)


@task
def download_filea_from_a_rename(filenames_a_b):
    fn_a, fn_b = filenames_a_b
    S3Hook().download_file(fn_a, local_path=fn_b)


download_filea_from_a_rename.expand(filenames_a_b=filenames_a_b)

類似於內建的 zip(),您可以將任意數量的可迭代物件一起打包,以獲得包含位置引數數量的元組的可迭代物件。預設情況下,打包後的可迭代物件的長度與最短的打包可迭代物件相同,多餘的項將被丟棄。可以傳遞可選的關鍵字引數 default 來切換行為,使其與 Python 的 itertools.zip_longest() 匹配——打包後的可迭代物件將具有與*最長*的打包可迭代物件相同的長度,缺失的項將用 default 提供的值填充。

連線多個上游

組合輸入源的另一種常見模式是針對多個可迭代物件執行相同的任務。當然,針對每個可迭代物件單獨執行相同的程式碼是完全有效的,例如:

list_filenames_a = S3ListOperator(
    task_id="list_files_in_a",
    bucket="bucket",
    prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = S3ListOperator(
    task_id="list_files_in_b",
    bucket="bucket",
    prefix="incoming/provider_b/{{ data_interval_start|ds }}",
)


@task
def download_file(filename):
    S3Hook().download_file(filename)
    # process file...


download_file.override(task_id="download_file_a").expand(filename=list_filenames_a.output)
download_file.override(task_id="download_file_b").expand(filename=list_filenames_b.output)

然而,如果任務可以合併成一個,DAG 將更具可擴充套件性且更易於檢查。可以使用 concat 來完成此操作:

# Tasks list_filenames_a and list_filenames_b, and download_file stay unchanged.

list_filenames_concat = list_filenames_a.concat(list_filenames_b)
download_file.expand(filename=list_filenames_concat)

這將建立一個任務,針對兩個列表進行展開。您可以將任意數量的可迭代物件進行 concat(例如 foo.concat(bar, rex));或者,由於返回值也是一個 XCom 引用,因此可以將 concat 呼叫連結起來(例如 foo.concat(bar).concat(rex))以達到相同的效果:一個單個可迭代物件,按順序連線所有可迭代物件,類似於 Python 的 itertools.chain()

可以展開哪些資料型別?

目前只能對映字典、列表,或者儲存在 XCom 中作為任務結果的這些型別之一。

如果上游任務返回不可對映的型別,對映任務將在執行時以 UnmappableXComTypePushed 異常失敗。例如,您不能讓上游任務返回一個純字串——它必須是列表或字典。

模板欄位和對映引數如何互動?

Operator 的所有引數都可以對映,即使是那些不接受模板引數的引數。

如果欄位被標記為模板化並且被對映,它將**不會被模板化**。

例如,這將列印 {{ ds }} 而不是日期戳。

@task
def make_list():
    return ["{{ ds }}"]


@task
def printer(val):
    print(val)


printer.expand(val=make_list())

如果您想插入值,可以自己呼叫 task.render_template,或使用插值。

@task
def make_list(ds=None):
    return [ds]


@task
def make_list(**context):
    return [context["task"].render_template("{{ ds }}", context)]

對對映任務設定限制

您可以對任務設定兩個限制:

  1. 展開可以建立的對映任務例項數量。

  2. 對映任務可以同時執行的數量。

  • 限制對映任務的數量

    [core] max_map_length 配置選項是 expand 可以建立的最大任務數量——預設值為 1024。

    如果源任務(我們前面示例中的 make_list)返回的列表長度超過此值,將導致*該*任務失敗。

  • 限制對映任務的並行副本數量

    如果您不想讓大型對映任務佔用所有可用的執行器插槽,您可以使用任務上的 max_active_tis_per_dag 設定來限制同時執行的任務數量。

    但請注意,這適用於該任務在所有活動 DagRun 中的所有副本,而不僅僅是此特定 DagRun 中的副本。

    @task(max_active_tis_per_dag=16)
    def add_one(x: int):
        return x + 1
    
    
    BashOperator.partial(task_id="my_task", max_active_tis_per_dag=16).expand(bash_command=commands)
    

自動跳過零長度對映

如果輸入為空(零長度),將不會建立新任務,並且對映任務將被標記為 SKIPPED

此條目是否有幫助?