In this example we will upload files(eg: data_sample_240101) from the local file system to Amazon S3 using Airflow running in Docker

S3 Bucket Landing
Access S3 console
Create S3 Bucket Landing zone

IAM user
Access IAM console
Create IAM user with S3 permission for Airflow

Add AWS connection
Access Admin/Connection
Create Connection AWS


Create DAG and file upload to S3

DAG includes the following components:
Create dag
default_args = {
'owner': 'pmt',
'start_date': pendulum.now(),
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'catchup': False,
}
@dag(
dag_id='local_to_s3',
default_args=default_args,
description='Load file from local file system to S3',
schedule='@once'
)
Create function execute() with operators
def execute():
end_operator = EmptyOperator(task_id='End_execution')
files = []
for file in glob.glob(f"/opt/airflow/data/data_sample_{DATE}/*"):
files.append(file)
for idx, file in enumerate(files, start=1):
file_name = file.split("/")[-1]
transfer_operator = LocalFilesystemToS3Operator(
task_id="local_to_s3_" + str(idx),
filename=file,
dest_key=f"date={DATE}/{file_name}",
dest_bucket=S3_BUCKET_NAME,
replace=True,
aws_conn_id="pmt_airflow_aws"
)
Workflow operator
transfer_operator >> end_operator
DAG show in Airflow

Run DAG


Check result
