Dynamic DAG Generation at Runtime in Apache Airflow
Building Pipelines That Build Themselves ποΈβ
The Story: One DAG Isnβt Enoughβ
Imagine you manage dozens of clients or environments.
- Each client requires a slightly different DAG
- Hardcoding dozens of DAGs is impossible
- Changes happen frequently
Wouldnβt it be amazing if Airflow could generate DAGs automatically based on configs?
This is exactly what Dynamic DAG Generation at Runtime allows.
What Is Dynamic DAG Generation?β
Dynamic DAG generation means:
- DAGs are created programmatically
- Can be generated from:
- Configuration files (JSON/YAML)
- Database entries
- API responses
- Multiple DAGs can be created without manual duplication
- Ideal for multi-tenant pipelines or environment-based workflows
Think of it as a DAG factory.
Why Dynamic DAGs Matterβ
- Scalable pipelines β generate dozens of DAGs dynamically
- Easy maintenance β one template for many DAGs
- Flexible execution β DAGs adapt to external configurations
- Production-ready β reduces human error
Example Scenario: Multi-Client Pipelinesβ
Config File (JSON)β
[
{"client": "Client A", "tables": ["sales", "inventory", "customers"]},
{"client": "Client B", "tables": ["orders", "payments", "products"]},
{"client": "Client C", "tables": ["users", "activity"]}
]
Python DAG Factoryβ
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import json
# Load config dynamically (could be from DB or API)
with open("clients_config.json") as f:
clients_config = json.load(f)
def create_process_table_task(client_name, table_name, dag):
def process_table():
print(f"Processing {table_name} for {client_name}")
return PythonOperator(
task_id=f"{client_name}_{table_name}",
python_callable=process_table,
dag=dag
)
# Dynamically generate DAGs
for client in clients_config:
dag_id = f"dynamic_dag_{client['client'].replace(' ', '_')}"
globals()[dag_id] = DAG(
dag_id=dag_id,
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False
)
for table in client["tables"]:
task = create_process_table_task(client["client"], table, globals()[dag_id])
Input & Output Exampleβ
Input (clients_config.json)
[
{"client": "Client A", "tables": ["sales", "inventory", "customers"]},
{"client": "Client B", "tables": ["orders", "payments", "products"]}
]
Output (Generated DAG IDs & Tasks)
Generated DAG: dynamic_dag_Client_A
- Task: Client_A_sales
- Task: Client_A_inventory
- Task: Client_A_customers
Generated DAG: dynamic_dag_Client_B
- Task: Client_B_orders
- Task: Client_B_payments
- Task: Client_B_products
Dynamic DAGs with TaskFlow APIβ
You can combine TaskFlow API with dynamic DAG generation:
from airflow.decorators import dag, task
from datetime import datetime
clients_config = [
{"client": "Client A", "tables": ["sales", "inventory"]},
{"client": "Client B", "tables": ["orders", "payments"]}
]
for client in clients_config:
@dag(
dag_id=f"taskflow_dynamic_{client['client'].replace(' ', '_')}",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
)
def client_dag():
@task
def process_table(table):
print(f"{client['client']} - Processing {table}")
for table in client["tables"]:
process_table.expand(table=[table])
globals()[f"taskflow_dynamic_{client['client'].replace(' ', '_')}"] = client_dag()
Best Practices for Dynamic DAGsβ
β
Use external configs or DB for DAG definitions
β
Keep DAG templates simple and modular
β
Name tasks and DAGs clearly to avoid conflicts
β
Avoid creating hundreds of tasks that may overwhelm the scheduler
β
Combine with dynamic tasks, branching, and XComs for scalable pipelines
Common Mistakesβ
β Hardcoding dynamic DAG logic inside loops without proper scope
β Overloading Airflow with hundreds of tasks per DAG
β Ignoring DAG naming conventions
β Forgetting dependencies between tasks
When to Use Dynamic DAG Generationβ
β Multi-tenant pipelines (clients, regions, projects)
β Config-driven workflows
β ETL pipelines with variable inputs
β Automating DAG creation to reduce human error
When NOT to Useβ
β For small static pipelines
β When all DAGs have identical tasks (use dynamic tasks instead)
β When DAGs donβt require runtime adaptation
Summary π§ β
- Dynamic DAG generation allows runtime creation of DAGs
- Ideal for scalable, multi-tenant pipelines
- Works with Python, TaskFlow API, and external configs
- Reduces maintenance and human error
Key Takeawaysβ
- Dynamic DAG = programmatic DAG creation
- Config-driven design makes pipelines flexible
- Combine with dynamic tasks, XComs, branching, and trigger rules for full automation
- Always keep DAGs readable, maintainable, and schedulable
Whatβs Next?β
Local Executor vs Celery Executor vs Kubernetes Executor