Skip to main content

Dynamic Tasks in Apache Airflow

Using TaskFlow API & @task Decorator βš‘β€‹

The Story: Workflows That Adapt​

Imagine you are building a data pipeline for multiple clients:

  • Client A β†’ 3 tables
  • Client B β†’ 5 tables
  • Client C β†’ 2 tables

If you hardcode every task:

  • DAG becomes huge
  • Maintenance nightmare
  • Adding a new client requires code changes

What if your DAG could automatically generate tasks based on input data?

That’s where Dynamic Tasks shine.


What Are Dynamic Tasks?​

Dynamic tasks are:

  • Tasks generated at runtime
  • Often based on input lists, configs, or database records
  • Enabled by TaskFlow API and @task decorator
  • Fully compatible with XComs, branching, and trigger rules

Think of it as a factory that creates tasks on demand.


Why Dynamic Tasks Matter​

  • Scalable pipelines – no hardcoding
  • Flexible DAGs – adapt to data or configs
  • Maintainable workflows – reduce repetition
  • Modern Airflow best practice – TaskFlow API friendly

Example Scenario: Process Multiple Clients​

Input Data​

clients = [
{"name": "Client A", "tables": ["sales", "inventory", "customers"]},
{"name": "Client B", "tables": ["orders", "payments", "products", "shipping"]},
{"name": "Client C", "tables": ["users", "activity"]}
]

DAG Code Example (Dynamic Tasks)​

from airflow.decorators import dag, task
from datetime import datetime

@dag(
dag_id="dynamic_tasks_example",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
)
def dynamic_tasks_dag():

@task
def fetch_clients():
return [
{"name": "Client A", "tables": ["sales", "inventory", "customers"]},
{"name": "Client B", "tables": ["orders", "payments", "products", "shipping"]},
{"name": "Client C", "tables": ["users", "activity"]}
]

@task
def process_table(client_name, table_name):
print(f"Processing {table_name} for {client_name}")
return f"{client_name}-{table_name}-done"

clients = fetch_clients()

# Dynamically generate tasks for each table
for client in clients:
for table in client["tables"]:
process_table.partial(client["name"], table).expand()

dynamic_tasks_dag()

Input & Output Example​

Input (from fetch_clients)

[
{"name": "Client A", "tables": ["sales", "inventory", "customers"]},
{"name": "Client B", "tables": ["orders", "payments", "products", "shipping"]},
{"name": "Client C", "tables": ["users", "activity"]}
]

Output (example logs)

Processing sales for Client A
Processing inventory for Client A
Processing customers for Client A
Processing orders for Client B
Processing payments for Client B
Processing products for Client B
Processing shipping for Client B
Processing users for Client C
Processing activity for Client C

Using XComs with Dynamic Tasks​

Dynamic tasks return values automatically via XComs, enabling downstream aggregation.


Example: Aggregating Results​

@task
def aggregate_results(*results):
print("Aggregated results:", results)
  • Use .expand() to pass multiple dynamic outputs to aggregate_results.

Best Practices for Dynamic Tasks​

βœ… Use @task decorator for clarity
βœ… Avoid deeply nested loops β€” prefer flat .expand()
βœ… Keep task logic small and focused
βœ… Leverage XComs for downstream communication
βœ… Combine with trigger rules and branching when needed


Common Mistakes​

❌ Hardcoding dynamic tasks
❌ Forgetting XCom returns
❌ Creating excessive DAG size (too many tasks)
❌ Ignoring task dependencies


When to Use Dynamic Tasks​

βœ” Multi-client or multi-table pipelines
βœ” ETL pipelines with varying inputs
βœ” Scalable task generation
βœ” Modern Airflow DAGs using TaskFlow API


When NOT to Use Dynamic Tasks​

✘ For simple, linear DAGs
✘ When inputs are static and fixed
✘ For small pipelines β€” static tasks are simpler


Summary πŸ§ β€‹

  • Dynamic tasks allow runtime task generation
  • TaskFlow API + @task makes this clean and Pythonic
  • Fully compatible with XComs, branching, trigger rules
  • Key to building scalable, maintainable pipelines

Key Takeaways​

  • Dynamic tasks = adaptive workflows
  • Use .partial() + .expand() for multiple task instances
  • Keep logic modular and small
  • Combine with modern Airflow features for maximum efficiency

What’s Next?​

➑️ Dynamic DAG Generation at Runtime
Learn how to create entire DAGs dynamically based on configs, databases, or external systems.