Upload files from the local file system to Amazon S3

1. Introduction

In this example we will upload files(eg: data_sample_240101) from the local file system to Amazon S3 using Airflow running in Docker

Image

2. Prepare the environment

  1. S3 Bucket Landing

  2. IAM user

    • Access IAM console

    • Create IAM user with S3 permission for Airflow

      Image

  3. Run Airflow

  4. Add AWS connection

    • Access Admin/Connection

    • Create Connection AWS

      Image

      Image

3. Create Dag

  • Create DAG and file upload to S3

    Image

    • DAG includes the following components:

      • Create dag

        default_args = {
          'owner': 'pmt',
          'start_date': pendulum.now(),
          'depends_on_past': False,
          'retries': 3,
          'retry_delay': timedelta(minutes=5),
          'catchup': False,
        }
        
        @dag(
          dag_id='local_to_s3',
          default_args=default_args,
          description='Load file from local file system to S3',
          schedule='@once'
        )
        
      • Create function execute() with operators

         def execute():
             end_operator = EmptyOperator(task_id='End_execution')
        
             files = []
        
             for file in glob.glob(f"/opt/airflow/data/data_sample_{DATE}/*"):
                 files.append(file)
        
             for idx, file in enumerate(files, start=1):
                 file_name = file.split("/")[-1]
        
                 transfer_operator = LocalFilesystemToS3Operator(
                     task_id="local_to_s3_" + str(idx),
                     filename=file,
                     dest_key=f"date={DATE}/{file_name}",
                     dest_bucket=S3_BUCKET_NAME,
                     replace=True,
                     aws_conn_id="pmt_airflow_aws"
                 )
        
      • Workflow operator

                 transfer_operator >> end_operator
        

4. Check result

  1. DAG show in Airflow

    Image

  2. Run DAG

    Image

    Image

  3. Check result

    Image

5. Clean up

  1. Delete S3 Bucket
  2. Delete IAM user
  3. Delete Airlow on Docker