S3 Batch Processing

S3 batch processing is best used when you need to run a massively parallel process across files in S3 and you can pack your processing code into a small size for Lambda.

  • If the process is not parallel across files, use SageMaker processing, which will allocate a machine and make S3 files available locally for python processing SageMaker processing documentation
  • If the process requires more code or a custom image that cannot be used by a Lambda, but the process can be fully parallelized, use SageMaker batch processing to allocate a fleet of containers that will process each object in S3. SageMaker transform documentation

Usage

There are two steps to writing a process for S3 Batch:

  • Write a Lambda function that performs the parallel processing
  • Write a python wrapper that configures deploying the function and any arguments

Lambda

Write a Lambda function.

  • Create a folder containing your Lambda function
  • package.json containing dependencies
  • index.js exporting a function named handler
  • See required input and output format in AWS S3 Batch documentation
  • You may use import statements and packages, the function will be automatically webpacked
// Command-line flags are passed to the Lambda as environment variables
const myCustomFlag = process.env.MY_CUSTOM_FLAG

// Handler receives list of tasks and returns a result for each one
async function handler(event) {
    let results = await Promise.all(event.tasks.map(
        async ({
            taskId,
            s3Key,
            s3BucketArn
        }) => {
            let bucket = s3BucketArn.split(":").pop()
            let path = `s3://${bucket}/${s3Key}`
            // do something with the file
            return {
                taskId: taskId,
                resultCode: "Succeeded",
                resultString: "Arbitrary result string for report"
            }
        }
    ))
    return {
        invocationSchemaVersion: "1.0",
        treatMissingKeysAs: "PermanentFailure",
        invocationId: event.invocationId,
        results: results
    }
}

// Export the handler
export { handler }

Python

Write a Python wrapper referencing the folder containing your Lambda to generate a CLI. Running this wrapper will:

  • Create roles and permissions (if necessary)
  • Build and deploy the Lambda (if necessary)
  • Tag a version of the Lambda with environment arguments specified by the command line
  • Create a batch processing job in S3 using that Lambda tag
  • Optionally confirm the job, otherwise it can be confirmed in the AWS S3 console
  • Optionally save Job ID in a JSON file for later reference
from aws_sagemaker_remote.batch.main import BatchCommand, BatchConfig
import os
import argparse


def argparse_callback(parser: argparse.ArgumentParser):
    """
    Add any custom arguments you require
    """
    parser.add_argument(
        '--my-custom-flag', default="Default value", type=str, help='My custom flag'
    )


def env_callback(args):
    """
    Map custom arguments to Lambda environment variables
    """
    return {
        "MY_CUSTOM_FLAG": args.my_custom_flag
    }


def command():
    """
    Define defaults for your command
    """
    return BatchCommand(
        config=BatchConfig(
            stack_name='my-unique-stack-name',
            code_dir=os.path.abspath(os.path.join(
                __file__, '../lambda'
            )),
            description='Demo batch processing',
            argparse_callback=argparse_callback,
            env_callback=env_callback,
            webpack=True
        )
    )


def main():
    command().run_command(
        description="Demo batch processing"
    )


if __name__ == '__main__':
    main()

Command-Line Interface

The above code generates the following command line interface which can be used to build, deploy, and run the batch job.

usage: demo-batch [-h] [--profile PROFILE] [--output-json OUTPUT_JSON]
                  [--stack-name STACK_NAME] [--code-dir CODE_DIR]
                  [--deploy [DEPLOY]] [--deploy-only [DEPLOY_ONLY]]
                  [--confirmation-required [CONFIRMATION_REQUIRED]]
                  [--development [DEVELOPMENT]] --manifest MANIFEST
                  [--report REPORT] [--description DESCRIPTION]
                  [--timeout TIMEOUT] [--ignore IGNORE] [--memory MEMORY]
                  [--my-custom-flag MY_CUSTOM_FLAG]

Named Arguments

--profile

AWS profile name

Default: “default”

--output-json Output job information to JSON file
--stack-name

AWS CloudFormation stack name to which resources are deployed (default: my-unique-stack-name)

Default: “my-unique-stack-name”

--code-dir

Directory of Lambda code (default: /home/docs/checkouts/readthedocs.org/user_builds/aws-sagemaker-remote/checkouts/latest/demo/demo_batch/lambda)

Default: “/home/docs/checkouts/readthedocs.org/user_builds/aws-sagemaker-remote/checkouts/latest/demo/demo_batch/lambda”

--deploy

Force Lambda deployment even if function already exists

Default: False

--deploy-only

Deploy and exit. Use –deploy yes –deploy-only yes to force deployment and exit

Default: False

--confirmation-required
 

Require confirmation in console to run job

Default: True

--development

Webpack in development mode

Default: False

--manifest File manifest to process. Must be a CSV with first column containing an S3 bucket and second column containing an S3 key.
--report

S3 path to store report

Default: “aws-sagemaker-remote/batch-reports,sagemaker”

--description

Description of batch job

Default: “Demo batch processing”

--timeout

Hard timeout of Lambda in seconds

Default: 30

--ignore

Number of columns of input CSV to ignore. Job will fail if CSV does not have 2+ignore columns. For example, if your CSV has bucket, key, and 5 more columns set ignore to 5.

Default: 0

--memory

Memory to allocate

Default: 128

--my-custom-flag
 

My custom flag

Default: “Default value”