Skip to main content

Dynamic DAG Generation at Runtime in Apache Airflow

Building Pipelines That Build Themselves πŸ—οΈβ€‹

The Story: One DAG Isn’t Enough​

Imagine you manage dozens of clients or environments.

  • Each client requires a slightly different DAG
  • Hardcoding dozens of DAGs is impossible
  • Changes happen frequently

Wouldn’t it be amazing if Airflow could generate DAGs automatically based on configs?

This is exactly what Dynamic DAG Generation at Runtime allows.


What Is Dynamic DAG Generation?​

Dynamic DAG generation means:

  • DAGs are created programmatically
  • Can be generated from:
  • Configuration files (JSON/YAML)
  • Database entries
  • API responses
  • Multiple DAGs can be created without manual duplication
  • Ideal for multi-tenant pipelines or environment-based workflows

Think of it as a DAG factory.


Why Dynamic DAGs Matter​

  • Scalable pipelines – generate dozens of DAGs dynamically
  • Easy maintenance – one template for many DAGs
  • Flexible execution – DAGs adapt to external configurations
  • Production-ready – reduces human error

Example Scenario: Multi-Client Pipelines​

Config File (JSON)​

[
{"client": "Client A", "tables": ["sales", "inventory", "customers"]},
{"client": "Client B", "tables": ["orders", "payments", "products"]},
{"client": "Client C", "tables": ["users", "activity"]}
]

Python DAG Factory​

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import json

# Load config dynamically (could be from DB or API)
with open("clients_config.json") as f:
clients_config = json.load(f)

def create_process_table_task(client_name, table_name, dag):
def process_table():
print(f"Processing {table_name} for {client_name}")
return PythonOperator(
task_id=f"{client_name}_{table_name}",
python_callable=process_table,
dag=dag
)

# Dynamically generate DAGs
for client in clients_config:
dag_id = f"dynamic_dag_{client['client'].replace(' ', '_')}"
globals()[dag_id] = DAG(
dag_id=dag_id,
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False
)

for table in client["tables"]:
task = create_process_table_task(client["client"], table, globals()[dag_id])

Input & Output Example​

Input (clients_config.json)

[
{"client": "Client A", "tables": ["sales", "inventory", "customers"]},
{"client": "Client B", "tables": ["orders", "payments", "products"]}
]

Output (Generated DAG IDs & Tasks)

Generated DAG: dynamic_dag_Client_A
- Task: Client_A_sales
- Task: Client_A_inventory
- Task: Client_A_customers

Generated DAG: dynamic_dag_Client_B
- Task: Client_B_orders
- Task: Client_B_payments
- Task: Client_B_products

Dynamic DAGs with TaskFlow API​

You can combine TaskFlow API with dynamic DAG generation:

from airflow.decorators import dag, task
from datetime import datetime

clients_config = [
{"client": "Client A", "tables": ["sales", "inventory"]},
{"client": "Client B", "tables": ["orders", "payments"]}
]

for client in clients_config:

@dag(
dag_id=f"taskflow_dynamic_{client['client'].replace(' ', '_')}",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
)
def client_dag():

@task
def process_table(table):
print(f"{client['client']} - Processing {table}")

for table in client["tables"]:
process_table.expand(table=[table])

globals()[f"taskflow_dynamic_{client['client'].replace(' ', '_')}"] = client_dag()

Best Practices for Dynamic DAGs​

βœ… Use external configs or DB for DAG definitions
βœ… Keep DAG templates simple and modular
βœ… Name tasks and DAGs clearly to avoid conflicts
βœ… Avoid creating hundreds of tasks that may overwhelm the scheduler
βœ… Combine with dynamic tasks, branching, and XComs for scalable pipelines


Common Mistakes​

❌ Hardcoding dynamic DAG logic inside loops without proper scope
❌ Overloading Airflow with hundreds of tasks per DAG
❌ Ignoring DAG naming conventions
❌ Forgetting dependencies between tasks


When to Use Dynamic DAG Generation​

βœ” Multi-tenant pipelines (clients, regions, projects)
βœ” Config-driven workflows
βœ” ETL pipelines with variable inputs
βœ” Automating DAG creation to reduce human error


When NOT to Use​

✘ For small static pipelines
✘ When all DAGs have identical tasks (use dynamic tasks instead)
✘ When DAGs don’t require runtime adaptation


Summary πŸ§ β€‹

  • Dynamic DAG generation allows runtime creation of DAGs
  • Ideal for scalable, multi-tenant pipelines
  • Works with Python, TaskFlow API, and external configs
  • Reduces maintenance and human error

Key Takeaways​

  • Dynamic DAG = programmatic DAG creation
  • Config-driven design makes pipelines flexible
  • Combine with dynamic tasks, XComs, branching, and trigger rules for full automation
  • Always keep DAGs readable, maintainable, and schedulable

What’s Next?​

Local Executor vs Celery Executor vs Kubernetes Executor