Hooks Explained β Database, S3, GCP, Azure
Imagine this scenario:
You have a DAG that needs to:
- Query a PostgreSQL database
- Upload results to S3
- Transform data in BigQuery
- Archive logs in Azure Blob Storage
Writing raw connection code for each platform is tedious and error-prone.
Airflow provides Hooks β reusable connectors to external systems.
What Are Hooks in Airflow?β
Hooks are Python interfaces that handle:
- Authentication
- Connection management
- API or database operations
- Error handling and retries
Hooks underlie Operators. For example:
- PostgresOperator uses PostgresHook
- S3ToGCSOperator uses S3Hook and GCSHook
Think of hooks as the plumbing β operators are the appliances using that plumbing.
Database Hooksβ
PostgresHook Exampleβ
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
records = pg_hook.get_records("SELECT COUNT(**) FROM sales;")
print(records)
Inputβ
| Parameter | Value |
|---|---|
| postgres_conn_id | postgres_default |
| SQL | SELECT COUNT(**) FROM sales |
Outputβ
[(1050,)]
MySQLHook Exampleβ
from airflow.providers.mysql.hooks.mysql import MySqlHook
mysql_hook = MySqlHook(mysql_conn_id="mysql_reporting")
rows = mysql_hook.get_records("SELECT ** FROM sessions LIMIT 5;")
S3 Hooksβ
Uploading a File to S3β
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3 = S3Hook(aws_conn_id="aws_default")
s3.load_file(
filename="/tmp/sales.csv",
key="sales/2024/01/sales.csv",
bucket_name="analytics-bucket",
replace=True
)
Inputβ
| Parameter | Value |
|---|---|
| filename | /tmp/sales.csv |
| key | sales/2024/01/sales.csv |
| bucket_name | analytics-bucket |
Outputβ
File uploaded successfully
Downloading a File from S3β
s3.get_key("sales/2024/01/sales.csv", bucket_name="analytics-bucket").download_file("/tmp/sales_local.csv")
GCP Hooksβ
BigQuery Hook Exampleβ
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
bq = BigQueryHook(gcp_conn_id="google_cloud_default")
results = bq.get_pandas_df("SELECT COUNT(**) as total FROM `project.dataset.sales`")
print(results)
Outputβ
total
0 1050
GCS Hook Exampleβ
from airflow.providers.google.cloud.hooks.gcs import GCSHook
gcs = GCSHook(gcp_conn_id="google_cloud_default")
gcs.upload(bucket_name="analytics-gcs-bucket", object_name="sales.csv", filename="/tmp/sales.csv")
Azure Hooksβ
Azure Blob Storage Exampleβ
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
azure = WasbHook(wasb_conn_id="azure_default")
azure.load_file(
file_path="/tmp/logs.txt",
container_name="logs",
blob_name="logs_2024_01_18.txt",
overwrite=True
)
Key Advantages of Hooksβ
- Centralize connection management via Airflow Connections
- Provide reusable methods for operators or Python code
- Handle authentication, retries, and logging
- Enable clean, testable DAGs
Best Practicesβ
β Recommendedβ
- Use Airflow Connections instead of hardcoding credentials
- Reuse hooks in custom operators
- Combine with sensors for event-driven workflows
- Use hooks for data validation, extraction, and upload
β Avoidβ
- Instantiating hooks inside loops unnecessarily
- Hardcoding credentials or paths
- Using hooks for heavy transformations (use Python/SQL)
Real-World Use Casesβ
- Querying Postgres for daily sales, then storing in S3
- Reading logs from GCS and sending to BigQuery
- Downloading backups from Azure Blob Storage
- Custom operators built on top of hooks for cloud orchestration
Summaryβ
Hooks are the foundation of Airflow connectivity.
Key Takeaways:
- Connect DAGs to external systems reliably
- Simplify authentication and API/database interaction
- Underlie almost all Airflow operators
- Best practices make DAGs maintainable, secure, and scalable
Hooks let Airflow focus on orchestration while they handle the plumbing behind the scenes.
Whatβs Next?β
Next in the series:
Hooks Explained β Database, S3, GCP, Azure