In this example we will upload files(eg: data_sample_240101) from the local file system to Amazon S3 using Airflow running in Docker
S3 Bucket Landing
Access S3 console
Create S3 Bucket Landing zone
IAM user
Access IAM console
Create IAM user with S3 permission for Airflow
Add AWS connection
Access Admin/Connection
Create Connection AWS
Create DAG and file upload to S3
DAG includes the following components:
Create dag
default_args = {
'owner': 'pmt',
'start_date': pendulum.now(),
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'catchup': False,
}
@dag(
dag_id='local_to_s3',
default_args=default_args,
description='Load file from local file system to S3',
schedule='@once'
)
Create function execute() with operators
def execute():
end_operator = EmptyOperator(task_id='End_execution')
files = []
for file in glob.glob(f"/opt/airflow/data/data_sample_{DATE}/*"):
files.append(file)
for idx, file in enumerate(files, start=1):
file_name = file.split("/")[-1]
transfer_operator = LocalFilesystemToS3Operator(
task_id="local_to_s3_" + str(idx),
filename=file,
dest_key=f"date={DATE}/{file_name}",
dest_bucket=S3_BUCKET_NAME,
replace=True,
aws_conn_id="pmt_airflow_aws"
)
Workflow operator
transfer_operator >> end_operator
DAG show in Airflow
Run DAG
Check result