Spring Cloud AWS simplifies using AWS managed services in a Spring and Spring Boot applications. It offers a convenient way to interact with AWS provided services using well-known Spring idioms and APIs.

3.0.0-M2

Spring Cloud is released under the non-restrictive Apache 2.0 license. If you would like to contribute to this section of the documentation or if you find an error, please find the source code and issue trackers in the project at github.

1. Using Amazon Web Services

AWS provides a Java SDK to issue requests for the all services provided by the Amazon Web Service platform. While the SDK offers all functionality available on AWS, there is a considerable amount of low level code needed to use it in Spring idiomatic way. Spring Cloud AWS provides application developers already integrated Spring-based modules to consume the most popular AWS services and avoid low level code as much as possible.

Thanks to Spring Cloud AWS modularity you can include only dependencies relevant to the particular AWS service you want to integrate with.

Spring Cloud AWS lets you leverage the power and simplicity of the Spring Framework to:

  • Write and read from Spring Resources backed up by S3

  • Send emails using SES

  • Import environment configuration using ConfigDataLoader from Parameter Store and Secrets Manager

  • Send and receive HTTP notifications from SNS

Other integrations are under development.

It also simplifies creating any non-integrated AWS SDK client by auto-configuring region and credentials providers.

That being said, it is perfectly valid option to use AWS SDK without using Spring Cloud AWS.

Note, that Spring provides support for other AWS services in following projects:

2. Getting Started

This section describes how to get up to speed with Spring Cloud AWS libraries.

2.1. Bill of Materials

The Spring Cloud AWS Bill of Materials (BOM) contains the versions of all the dependencies it uses.

If you’re a Maven user, adding the following to your pom.xml file will allow you omit any Spring Cloud AWS dependency version numbers from your configuration. Instead, the version of the BOM you’re using determines the versions of the used dependencies.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws-dependencies</artifactId>
            <version>{project-version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Gradle users can achieve the same kind of BOM experience using Spring’s dependency-management-plugin Gradle plugin. For simplicity, the Gradle dependency snippets in the remainder of this document will also omit their versions.

2.2. Starter Dependencies

Spring Cloud AWS offers starter dependencies through Maven to easily depend on different modules of the library. Each starter contains all the dependencies and transitive dependencies needed to begin using their corresponding Spring Cloud AWS module.

For example, if you wish to write a Spring application with S3, you would include the spring-cloud-aws-starter-s3 dependency in your project. You do not need to include the underlying spring-cloud-aws-s3 dependency, because the starter dependency includes it.

A summary of these artifacts are provided below.

Spring Cloud AWS Starter Description Maven Artifact Name

Core

Automatically configure authentication and region selection

io.awspring.cloud:spring-cloud-aws-starter

S3

Provides integrations with S3

io.awspring.cloud:spring-cloud-aws-starter-s3

SES

Provides integrations with SES

io.awspring.cloud:spring-cloud-aws-starter-ses

SNS

Provides integrations with SNS

io.awspring.cloud:spring-cloud-aws-starter-sns

SQS

Provides integrations with SQS

io.awspring.cloud:spring-cloud-aws-starter-sqs

Parameter Store

Provides integrations with AWS Parameter Store

io.awspring.cloud:spring-cloud-aws-starter-parameter-store

Secrets Manager

Provides integrations with AWS Secrets manager

io.awspring.cloud:spring-cloud-aws-starter-secrets-manager

2.3. Choosing AWS SDK version

The AWS SDK is released more frequently than Spring Cloud AWS. If you need to use a newer version of the SDK than the one configured by Spring Cloud AWS, add the SDK BOM to the dependency management section making sure it is declared before any other BOM dependency that configures AWS SDK dependencies.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>bom</artifactId>
            <version>${aws-java-sdk.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

3. Spring Cloud AWS Core

Each Spring Cloud AWS module uses AwsCredentialsProvider and AwsRegionProvider to get the AWS region and access credentials.

Spring Cloud AWS provides a Spring Boot starter to auto-configure the core components.

Maven coordinates, using Spring Cloud AWS BOM:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>

3.1. Credentials

software.amazon.awssdk.auth.credentials.AwsCredentialsProvider is a functional interface that returns the credentials to authenticate and authorize calls to AWS services.

public interface AwsCredentialsProvider {
    AwsCredentials resolveCredentials();
}

By default, Spring Cloud AWS starter auto-configures a DefaultCredentialsProvider, which looks for AWS credentials in this order:

  1. Java System Properties - aws.accessKeyId and aws.secretAccessKey

  2. Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

  3. Web Identity Token credentials from system properties or environment variables

  4. Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

  5. Credentials delivered through the Amazon EC2 container service if `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI`" environment variable is set and security manager has permission to access the variable,

  6. Instance profile credentials delivered through the Amazon EC2 metadata service

If it does not serve your project needs, this behavior can be changed by setting additional properties:

property example description

spring.cloud.aws.credentials.access-key

AKIAIOSFODNN7EXAMPLE

The access key to be used with a static provider

spring.cloud.aws.credentials.secret-key

wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

The secret key to be used with a static provider

spring.cloud.aws.credentials.instance-profile

true

Configures an InstanceProfileCredentialsProvider with no further configuration

spring.cloud.aws.credentials.profile.name

default

The name of a configuration profile in the specified configuration file

spring.cloud.aws.credentials.profile.path

~/.aws/credentials

The file path where the profile configuration file is located. Defaults to ~/.aws/credentials if value is not provided

It is also possible to configure custom AwsCredentialsProvider bean which will prevent Spring Cloud AWS from auto-configuring credentials provider:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

@Configuration
class CustomCredentialsProviderConfiguration {

    @Bean
    public AwsCredentialsProvider customAwsCredentialsProvider() {
        return new CustomAWSCredentialsProvider();
    }
}

3.2. Region

software.amazon.awssdk.regions.providers.AwsRegionProvider is a functional interface that returns the region AWS clients issue requests to.

public interface AwsRegionProvider {
    Region getRegion();
}

By default, Spring Cloud AWS starter auto-configures a DefaultAwsRegionProviderChain, which looks resolves AWS region in this order:

  1. Check the aws.region system property for the region.

  2. Check the AWS_REGION environment variable for the region.

  3. Check the {user.home}/.aws/credentials and {user.home}/.aws/config files for the region.

  4. If running in EC2, check the EC2 metadata service for the region.

If it does not serve your project needs, this behavior can be changed by setting additional properties:

property example description

spring.cloud.aws.region.static

eu-west-1

A static value for region used by auto-configured AWS clients

spring.cloud.aws.region.instance-profile

true

Configures an InstanceProfileRegionProvider with no further configuration

spring.cloud.aws.region.profile.name

default

The name of a configuration profile in the specified configuration file

spring.cloud.aws.region.profile.path

~/.aws/credentials

The file path where the profile configuration file is located. Defaults to ~/.aws/credentials if value is not provided

It is also possible to configure custom AwsRegionProvider bean which will prevent Spring Cloud AWS from auto-configuring region provider:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import software.amazon.awssdk.regions.providers.AwsRegionProvider;

@Configuration
class CustomRegionProviderConfiguration {

    @Bean
    public AwsRegionProvider customRegionProvider() {
        return new CustomRegionProvider();
    }
}

3.3. Endpoint

To simplify using services with AWS compatible APIs, or running applications against Localstack, it is possible to configure an endpoint set on all auto-configured AWS clients:

property example description

spring.cloud.aws.endpoint

localhost:4566

endpoint url applied to auto-configured AWS clients

3.4. Customizing AWS Clients

To configure an AWS client with custom HTTP client or ClientOverrideConfiguration, define a bean of type AwsClientConfigurer with a type parameter indicating configured client builder.

import io.awspring.cloud.autoconfigure.core.AwsClientCustomizer;
import org.springframework.context.annotation.Bean;

import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.services.sns.SnsClientBuilder;

import java.time.Duration;

@Configuration
class S3AwsClientConfigurerConfiguration {

    @Bean
    AwsClientCustomizer<S3ClientBuilder> s3ClientBuilderAwsClientConfigurer() {
        return new S3AwsClientClientConfigurer();
    }

    static class S3AwsClientClientConfigurer implements AwsClientCustomizer<S3ClientBuilder> {
        @Override
        public ClientOverrideConfiguration overrideConfiguration() {
            return ClientOverrideConfiguration.builder().apiCallTimeout(Duration.ofMillis(500)).build();
        }

        @Override
        public SdkHttpClient httpClient() {
            return ApacheHttpClient.builder().connectionTimeout(Duration.ofMillis(1000)).build();
        }
    }
}

4. DynamoDb Integration

DynamoDb is a fully managed serverless key/value Nosql database designed for high performance. A Spring Boot starter is provided to auto-configure DynamoDb integration beans.

Maven coordinates, using Spring Cloud AWS BOM:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-dynamodb</artifactId>
</dependency>

DynamoDb integration will only provide autoconfiguration and simple DynamoDbOperations which can be used to communicate with DynamoDb, build repositories and so on…​

4.1. DynamoDbOperations

The starter automatically configures and registers a DynamoDbOperations bean providing higher level abstractions for working with DynamoDb. DynamoDbTemplate - a default DynamoDbOperations implementation - being built on top of DynamoDbEnhancedClient uses annotations provided by AWS. The list of supported annotations can be found here.

DynamoDbEnchancedClient supports a programming model similar to JPA - where a class is turned into an entity through applying certain annotations:

import java.util.UUID;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey;

@DynamoDbBean
public class Person {
    private UUID id;
    private String name;
    private String lastName;

    @DynamoDbPartitionKey
    public UUID getId() {
        return id;
    }

    ...
}

DynamoDbTemplate provides methods to perform typical CRUD operations on such entities, plus it adds convenience methods for querying DynamoDb:

import io.awspring.cloud.dynamodb.DynamoDbTemplate;

...
@Autowired
DynamoDbTemplate dynamoDbTemplate;
...
Person person = new Person(...)
dynamoDbTemplate.save(person);

4.1.1. Resolving Table Name

To resolve a table name for an entity, DynamoDbTemplate uses a bean of type DynamoDbTableNameResolver. The default implementation turns an entity class name into its snake case representation. To use custom implementation, declare a bean of type DynamoDbTableNameResolver and it will get injected into DynamoDbTemplate automatically during auto-configuration.

4.1.2. Resolving Table Schema

To resolve a table schema for an entity, DynamoDbTemplate uses a bean of type DynamoDbTableSchemaResolver. The default implementation caches TableSchema objects in internal map. To use custom implementation, declare a bean of type DynamoDbTableSchemaResolver and it will get injected into DynamoDbTemplate automatically during auto-configuration.

4.2. Using DynamoDb Client

Autoconfiguration automatically configures DynamoDbClient which can be used for low level api calls and DynamoDbEnhancedClient if DynamoDbOperations are not enough.

If autoconfigured DynamoDbClient or DynamoDbEnhancedClient bean configuration does not meet your needs, it can be replaced by creating your custom bean.

import java.util.Collections;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;

public class DynamoDbService {
    private final DynamoDbClient dynamoDbClient;

    public DynamoDbService(DynamoDbClient dynamoDbClient) {
        this.dynamoDbClient = dynamoDbClient;
    }

    void deletePerson(String uuid) {
        dynamoDbClient.deleteItem(DeleteItemRequest.builder().key(Collections.singletonMap("uuid", AttributeValue.builder().s(uuid).build())).build());
    }
}

4.3. Using DynamoDB Accelerator

The starter automatically configures and registers an software.amazon.dax.ClusterDaxClient bean if it finds the following is added to the project:

<dependency>
    <groupId>software.amazon.dax</groupId>
    <artifactId>amazon-dax-client</artifactId>
</dependency>

4.4. Configuration

The Spring Boot Starter for DynamoDb provides the following configuration options:

Name

Description

Required

Default value

spring.cloud.aws.dynamodb.enabled

Enables the DynamoDb integration.

No

true

spring.cloud.aws.dynamodb.endpoint

Configures endpoint used by DynamoDbClient.

No

spring.cloud.aws.dynamodb.region

Configures region used by DynamoDbClient.

No

spring.cloud.aws.dynamodb.dax.idle-timeout-millis

Timeout for idle connections with the DAX cluster.

No

30000

spring.cloud.aws.dynamodb.dax.url

DAX cluster endpoint.

Yes

spring.cloud.aws.dynamodb.dax.connection-ttl-millis

Connection time to live.

No

0

spring.cloud.aws.dynamodb.dax.connect-timeout-millis

Connection timeout

No

1000

spring.cloud.aws.dynamodb.dax.request-timeout-millis

Request timeout for connections with the DAX cluster.

No

1000

spring.cloud.aws.dynamodb.dax.write-retries

Number of times to retry writes, initial try is not counted.

No

2

spring.cloud.aws.dynamodb.dax.read-retries

Number of times to retry reads, initial try is not counted.

No

2

spring.cloud.aws.dynamodb.dax.cluster-update-interval-millis

Interval between polling of cluster members for membership changes.

No

4000

spring.cloud.aws.dynamodb.dax.endpoint-refresh-timeout-millis

Timeout for endpoint refresh.

No

6000

spring.cloud.aws.dynamodb.dax.max-concurrency

Maximum number of concurrent requests.

No

1000

spring.cloud.aws.dynamodb.dax.max-pending-connection-acquires

Maximum number of pending Connections to acquire.

No

10000

spring.cloud.aws.dynamodb.dax.skip-host-name-verification

Skips hostname verification in url.

No

4.5. IAM Permissions

Since it depends on how you will use DynamoDb integration providing a list of IAM policies would be pointless since least privilege model should be used. To check what IAM policies DynamoDb uses and see which ones you should use please check IAM policies

5. S3 Integration

S3 allows storing files in a cloud. A Spring Boot starter is provided to auto-configure the various S3 integration related components.

Maven coordinates, using Spring Cloud AWS BOM:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-starter-aws-s3</artifactId>
</dependency>

5.1. Using S3 client

The starter automatically configures and registers a S3Client bean in the Spring application context. The S3Client bean can be used to perform operations on S3 buckets and objects.

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

import org.springframework.stereotype.Component;
import org.springframework.util.StreamUtils;

@Component
class S3ClientSample {
    private final S3Client s3Client;

    S3ClientSample(S3Client s3Client) {
        this.s3Client = s3Client;
    }

    void readFile() throws IOException {
        ResponseInputStream<GetObjectResponse> response = s3Client.getObject(
            request -> request.bucket("bucket-name").key("file-name.txt"));

        String fileContent = StreamUtils.copyToString(response, StandardCharsets.UTF_8);

        System.out.println(fileContent);
    }
}

5.2. Using S3TransferManager

S3 Transfer Manager provided by AWS is in developer preview phase and should not be used in production.

AWS launched a high level file transfer utility, called Transfer Manager. The starter automatically configures and registers an software.amazon.awssdk.transfer.s3.S3TransferManager bean if it finds the following is added to the project:

<dependency>
  <groupId>software.amazon.awssdk</groupId>
  <artifactId>s3-transfer-manager</artifactId>
</dependency>

5.3. Using Cross-Region S3 client

S3Client implementation provided in AWS SDK is region specific - meaning it can be used only to operate on buckets and objects stored in region for which the client has been configured to use.

For example, assuming that DefaultAwsRegionProviderChain resolves a region us-east-1, running any S3 operation that uses a bucket created in another region would result in an exception:

software.amazon.awssdk.services.s3.model.S3Exception: The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint. (Service: S3, Status Code: 301, Request ID: ..., Extended Request ID: ...)

Spring Cloud AWS provides a CrossRegionS3Client that solves this problem by maintaining an internal dictionary of S3Client objects per region. If you need to customize these clients, you can create a custom S3ClientBuilder bean that acts as a template to create region-specific S3 clients in CrossRegionS3Client. There is no extra work needed to use cross-region S3 client - it is the S3Client auto-configured by Spring Cloud AWS.

To disable CrossRegionS3Client creation and use instead a regular region-specific S3Client, you can either create a custom S3Client bean, or exclude spring-cloud-aws-s3-cross-region-client dependency:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-starter-aws-s3</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws-s3-cross-region-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>

5.4. S3 Objects as Spring Resources

Spring Resources are an abstraction for a number of low-level resources, such as file system files, classpath files, servlet context-relative files, etc. Spring Cloud AWS adds a new resource type: a S3Resource object.

The Spring Resource Abstraction for S3 allows S3 objects to be accessed by their S3 URL using the @Value annotation:

@Value("s3://[S3_BUCKET_NAME]/[FILE_NAME]")
private Resource s3Resource;

…​or the Spring application context

SpringApplication.run(...).getResource("s3://[S3_BUCKET_NAME]/[FILE_NAME]");

This creates a Resource object that can be used to read the file, among other possible operations.

It is also possible to write to a Resource, although a WriteableResource is required.

@Value("s3://[S3_BUCKET_NAME]/[FILE_NAME]")
private Resource s3Resource;
...
try (OutputStream os = ((WritableResource) s3Resource).getOutputStream()) {
  os.write("content".getBytes());
}

To work with the Resource as a S3 resource, cast it to io.awspring.cloud.s3.S3Resource. Using S3Resource directly lets you set the S3 object metadata.

@Value("s3://[S3_BUCKET_NAME]/[FILE_NAME]")
private Resource s3Resource;
...
ObjectMetadata objectMetadata = ObjectMetadata.builder()
    .contentType("application/json")
    .serverSideEncryption(ServerSideEncryption.AES256)
    .build();
s3Resource.setObjectMetadata(objectMetadata);
try (OutputStream outputStream = s3Resource.getOutputStream()) {
    outputStream.write("content".getBytes(StandardCharsets.UTF_8));
}

Under the hood by default S3Resource uses a io.awspring.cloud.s3.InMemoryBufferingS3OutputStream. When data is written to the resource, is gets sent to S3 using multipart upload. If a network error occurs during upload, S3Client has a built-in retry mechanism that will retry each failed part. If the upload fails after retries, multipart upload gets aborted and S3Resource throws io.awspring.cloud.s3.S3Exception.

If InMemoryBufferingS3OutputStream behavior does not fit your needs, you can use io.awspring.cloud.s3.DiskBufferingS3OutputStream by defining a bean of type DiskBufferingS3OutputStreamProvider which will override the default output stream provider. With DiskBufferingS3OutputStream when data is written to the resource, first it is stored on the disk in a tmp directory in the OS. Once the stream gets closed, the file gets uploaded with S3Client#putObject method. If a network error occurs during upload, S3Client has a built-in retry mechanism. If the upload fails after retries, S3Resource throws io.awspring.cloud.s3.UploadFailed exception containing a file location in a temporary directory in a file system.

try (OutputStream outputStream = s3Resource.getOutputStream()) {
    outputStream.write("content".getBytes(StandardCharsets.UTF_8));
} catch (UploadFailedException e) {
    // e.getPath contains a file location in temporary folder
}

If you are using the S3TransferManager, the default implementation will switch to io.awspring.cloud.s3.TransferManagerS3OutputStream. This OutputStream also uses a temporary file to write it on disk before uploading it to S3, but it may be faster as it uses a multi-part upload under the hood.

5.5. Using S3Template

Spring Cloud AWS provides a higher abstraction on the top of S3Client providing methods for the most common use cases when working with S3.

On the top of self-explanatory methods for creating and deleting buckets, S3Template provides a simple methods for uploading and downloading files:

@Autowired
private S3Template s3Template;

InputStream is = ...
// uploading file without metadata
s3Template.upload(BUCKET, "file.txt", is);

// uploading file with metadata
s3Template.upload(BUCKET, "file.txt", is, ObjectMetadata.builder().contentType("text/plain").build());

S3Template also allows storing & retrieving Java objects.

Person p = new Person("John", "Doe");
s3Template.store(BUCKET, "person.json", p);

Person loadedPerson = s3Template.read(BUCKET, "person.json", Person.class);

By default, if Jackson is on the classpath, S3Template uses ObjectMapper based Jackson2JsonS3ObjectConverter to convert from S3 object to Java object and vice versa. This behavior can be overwritten by providing custom bean of type S3ObjectConverter.

5.6. Determining S3 Objects Content Type

All S3 objects stored in S3 through S3Template, S3Resource or S3OutputStream automatically get set a contentType property on the S3 object metadata, based on the S3 object key (file name).

By default, PropertiesS3ObjectContentTypeResolver - a component supporting over 800 file extensions is responsible for content type resolution. If this content type resolution does not meet your needs, you can provide a custom bean of type S3ObjectContentTypeResolver which will be automatically used in all components responsible for uploading files.

5.7. Configuration

The Spring Boot Starter for S3 provides the following configuration options:

Name

Description

Required

Default value

spring.cloud.aws.s3.enabled

Enables the S3 integration.

No

true

spring.cloud.aws.s3.endpoint

Configures endpoint used by S3Client.

No

localhost:4566

spring.cloud.aws.s3.region

Configures region used by S3Client.

No

eu-west-1

spring.cloud.aws.s3.accelerate-mode-enabled

Option to enable using the accelerate endpoint when accessing S3. Accelerate endpoints allow faster transfer of objects by using Amazon CloudFront’s globally distributed edge locations.

No

null (falls back to SDK default)

spring.cloud.aws.s3.checksum-validation-enabled

Option to disable doing a validation of the checksum of an object stored in S3.

No

null (falls back to SDK default)

spring.cloud.aws.s3.chunked-encoding-enabled

Option to enable using chunked encoding when signing the request payload for PutObjectRequest and UploadPartRequest.

No

null (falls back to SDK default)

spring.cloud.aws.s3.path-style-access-enabled

Option to enable using path style access for accessing S3 objects instead of DNS style access. DNS style access is preferred as it will result in better load balancing when accessing S3.

No

null (falls back to SDK default)

spring.cloud.aws.s3.use-arn-region-enabled

If an S3 resource ARN is passed in as the target of an S3 operation that has a different region to the one the client was configured with, this flag must be set to 'true' to permit the client to make a cross-region call to the region specified in the ARN otherwise an exception will be thrown.

No

null (falls back to SDK default)

5.8. IAM Permissions

Following IAM permissions are required by Spring Cloud AWS:

Downloading files

s3:GetObject

Searching files

s3:ListObjects

Uploading files

s3:PutObject

Sample IAM policy granting access to spring-cloud-aws-demo bucket:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "s3:ListBucket",
            "Resource": "arn:aws:s3:::spring-cloud-aws-demo"
        },
        {
            "Effect": "Allow",
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::spring-cloud-aws-demo/*"
        },
        {
            "Effect": "Allow",
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::spring-cloud-aws-demo/*"
        }
    ]
}

6. SES Integration

Spring has a built-in support to send e-mails based on the Java Mail API to avoid any static method calls while using the Java Mail API and thus supporting the testability of an application. Spring Cloud AWS supports the Amazon SES as an implementation of the Spring Mail abstraction.

As a result Spring Cloud AWS users can decide to use the Spring Cloud AWS implementation of the Amazon SES service or use the standard Java Mail API based implementation that sends e-mails via SMTP to Amazon SES.

It is preferred to use the Spring Cloud AWS implementation instead of SMTP mainly for performance reasons. Spring Cloud AWS uses one API call to send a mail message, while the SMTP protocol makes multiple requests (EHLO, MAIL FROM, RCPT TO, DATA, QUIT) until it sends an e-mail.

A Spring Boot starter is provided to auto-configure SES integration beans.

Maven coordinates, using Spring Cloud AWS BOM:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-ses</artifactId>
</dependency>

6.1. Sending simple mails

Application developers can inject the MailSender into their application code and directly send simple text based e-mail messages. The sample below demonstrates the creation of a simple mail message.

import org.springframework.mail.MailSender;
import org.springframework.mail.SimpleMailMessage;

class MailSendingService {

    private final MailSender mailSender;

    public MailSendingService(MailSender mailSender) {
        this.mailSender = mailSender;
    }

    public void sendMailMessage() {
        SimpleMailMessage simpleMailMessage = new SimpleMailMessage();
        simpleMailMessage.setFrom("foo@bar.com");
        simpleMailMessage.setTo("bar@baz.com");
        simpleMailMessage.setSubject("test subject");
        simpleMailMessage.setText("test content");
        this.mailSender.send(simpleMailMessage);
    }
}

6.2. Sending attachments

Sending attachments with e-mail requires MIME messages to be created and sent. In order to create MIME messages, the Java Mail dependency is required and has to be included in the classpath. Spring Cloud AWS will detect the dependency and create a org.springframework.mail.javamail.JavaMailSender implementation that allows to create and build MIME messages and send them. A dependency configuration for the Java Mail API is the only change in the configuration which is shown below.

<dependency>
    <groupId>javax.mail</groupId>
    <artifactId>mailapi</artifactId>
</dependency>

Even though there is a dependency to the Java Mail API there is still the Amazon SES API used underneath to send mail messages. There is no SMTP setup required on the Amazon AWS side.

Sending the mail requires the application developer to use the JavaMailSender to send an e-mail as shown in the example below.

import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.mail.javamail.MimeMessagePreparator;

class MailSendingService {

    private final JavaMailSender mailSender;

    public MailSendingService(JavaMailSender mailSender) {
        this.mailSender = mailSender;
    }

    public void sendMailMessage() {
        this.mailSender.send(new MimeMessagePreparator() {

            @Override
            public void prepare(MimeMessage mimeMessage) throws Exception {
                MimeMessageHelper helper =
                    new MimeMessageHelper(mimeMessage, true, "UTF-8");
                helper.addTo("foo@bar.com");
                helper.setFrom("bar@baz.com");
                helper.addAttachment("test.txt", ...);
                helper.setSubject("test subject with attachment");
                helper.setText("mime body", false);
            }
        });
    }
}

6.3. Authenticating e-mails

To avoid any spam attacks on the Amazon SES mail service, applications without production access must verify each e-mail receiver otherwise the mail sender will throw a software.amazon.awssdk.services.ses.model.MessageRejectedException.

Production access can be requested and will disable the need for mail address verification.

6.4. Configuration

The Spring Boot Starter for SES provides the following configuration options:

Name

Description

Required

Default value

spring.cloud.aws.ses.enabled

Enables the SES integration.

No

true

spring.cloud.aws.ses.endpoint

Configures endpoint used by SesClient.

No

spring.cloud.aws.ses.region

Configures region used by SesClient.

No

Amazon SES is not available in all regions of the Amazon Web Services cloud. Therefore, an application hosted and operated in a region that does not support the mail service will produce an error while using the mail service. Therefore, the region must be overridden for the mail sender configuration. The example below shows a typical combination of a region (EU-CENTRAL-1) that does not provide an SES service where the client is overridden to use a valid region (EU-WEST-1).

spring.cloud.aws.ses.region=eu-west-1

6.5. IAM Permissions

Following IAM permissions are required by Spring Cloud AWS:

Send e-mail without attachment

ses:SendEmail

Send e-mail with attachment

ses:SendRawEmail

Sample IAM policy granting access to SES:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ses:SendEmail",
                "ses:SendRawEmail"
            ],
            "Resource": "arn:aws:ses:your:arn"
        }
    ]
}

7. SNS Integration

SNS is a pub/sub messaging service that allows clients to publish notifications to a particuluar topic. A Spring Boot starter is provided to auto-configure SNS integration beans.

Maven coordinates, using Spring Cloud AWS BOM:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sns</artifactId>
</dependency>

7.1. Sending Notifications

7.1.1. SNS Template

The starter automatically configures and registers a SnsTemplate bean providing higher level abstractions for sending SNS notifications. SnsTemplate implements Spring Messaging abstractions making it easy to combine with other messaging technologies compatible with Spring Messaging.

It supports sending notifications with payload of type:

  • String - using org.springframework.messaging.converter.StringMessageConverter

  • Object - which gets serialized to JSON using org.springframework.messaging.converter.MappingJackson2MessageConverter and Jackson’s com.fasterxml.jackson.databind.ObjectMapper autoconfigured by Spring Boot.

Additionally, it exposes handful of methods supporting org.springframework.messaging.Message.

import io.awspring.cloud.sns.core.SnsTemplate;

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

class NotificationService {
    private final SnsTemplate snsTemplate;

    NotificationService(SnsTemplate snsTemplate) {
        this.snsTemplate = snsTemplate;
    }

    void sendNotification() {
        // sends String payload
        snsTemplate.sendNotification("topic-arn", "payload", "subject");
        // sends object serialized to JSON
        snsTemplate.sendNotification("topic-arn", new Person("John", "Doe"), "subject");
        // sends a Spring Messaging Message
        Message<String> message = MessageBuilder.withPayload("payload")
            .setHeader("header-name", "header-value")
            .build();
        snsTemplate.send("topic-arn", message);
    }
}

If autoconfigured converters do not meet your needs, you can provide a custom SnsTemplate bean with a message converter of your choice.

When sending SNS notification, it is required to provide a topic ARN. Spring Cloud AWS simplifies it and allows providing a topic name instead, under a condition that topic with that name has already been created. Otherwise, Spring Cloud AWS will make an attempt to create topic with this name with a first call.

The behavior of resolving topic ARN by a topic name can be altered by providing a custom bean of type io.awspring.cloud.sns.core.TopicArnResolver.

7.1.2. SNS Operations

Because of Spring Messaging compatibility, SnsTemplate exposes many methods that you may not need if you don’t need Spring Messaging abstractions. In such case, we recommend using SnsOperations - an interface implemented by SnsTemplate, that exposes a convenient method for sending SNS notification, including support for FIFO topics.

import io.awspring.cloud.sns.core.SnsNotification;
import io.awspring.cloud.sns.core.SnsOperations;
import io.awspring.cloud.sns.core.SnsTemplate;

class NotificationService {
    private final SnsOperations snsOperations;

    NotificationService(SnsOperations snsOperations) {
        this.snsOperations = snsOperations;
    }

    void sendNotification() {
        SnsNotification<Person> notification = SnsNotification.builder(new Person("John", "Doe"))
            .deduplicationId("..")
            .groupId("..")
            .build();
        snsOperations.sendNotification("topic-arn", notification);
    }
}

7.2. Using SNS Client

To have access to all lower level SNS operations, we recommend using SnsClient from AWS SDK. SnsClient bean is autoconfigured by SnsAutoConfiguration.

If autoconfigured SnsClient bean configuration does not meet your needs, it can be replaced by creating a custom bean of type SnsClient.

import software.amazon.awssdk.services.sns.SnsClient;

class NotificationService {
    private final SnsClient snsClient;

    public NotificationService(SnsClient snsClient) {
        this.snsClient = snsClient;
    }

    void sendNotification() {
        snsClient.publish(request -> request.topicArn("sns-topic-arn").message("payload"));
    }
}

7.3. Annotation-driven HTTP notification endpoint

SNS supports multiple endpoint types (SQS, Email, HTTP, HTTPS), Spring Cloud AWS provides support for HTTP(S) endpoints. SNS sends three type of requests to an HTTP topic listener endpoint, for each of them annotations are provided:

  • Subscription request → @NotificationSubscriptionMapping

  • Notification request → @NotificationMessageMapping

  • Unsubscription request → @NotificationUnsubscribeMapping

HTTP endpoints are based on Spring MVC controllers. Spring Cloud AWS added some custom argument resolvers to extract the message and subject out of the notification requests.

Example of integration:

import io.awspring.cloud.sns.annotation.endpoint.NotificationMessageMapping;
import io.awspring.cloud.sns.annotation.endpoint.NotificationSubscriptionMapping;
import io.awspring.cloud.sns.annotation.endpoint.NotificationUnsubscribeConfirmationMapping;
import io.awspring.cloud.sns.annotation.handlers.NotificationMessage;
import io.awspring.cloud.sns.annotation.handlers.NotificationSubject;
import io.awspring.cloud.sns.handlers.NotificationStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
@RequestMapping("/topicName")
public class NotificationTestController {

    @NotificationSubscriptionMapping
    public void handleSubscriptionMessage(NotificationStatus status) {
        //We subscribe to start receive the message
        status.confirmSubscription();
    }

    @NotificationMessageMapping
    public void handleNotificationMessage(@NotificationSubject String subject, @NotificationMessage String message) {
        // ...
    }

    @NotificationUnsubscribeConfirmationMapping
    public void handleUnsubscribeMessage(NotificationStatus status) {
        //e.g. the client has been unsubscribed and we want to "re-subscribe"
        status.confirmSubscription();
    }
}

7.4. Configuration

The Spring Boot Starter for SNS provides the following configuration options:

Name

Description

Required

Default value

spring.cloud.aws.sns.enabled

Enables the SNS integration.

No

true

spring.cloud.aws.sns.endpoint

Configures endpoint used by SnsClient.

No

localhost:4566

spring.cloud.aws.sns.region

Configures region used by SnsClient.

No

eu-west-1

7.5. IAM Permissions

Following IAM permissions are required by Spring Cloud AWS:

To publish notification to topic

sns:Publish

To publish notification you will also need

sns:ListTopics

To use Annotation-driven HTTP notification endpoint

sns:ConfirmSubscription

For resolving topic name to ARN

sns:CreateTopic

Sample IAM policy granting access to SNS:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sns:Publish",
                "sns:ConfirmSubscription"
            ],
            "Resource": "yourArn"
        },
        {
            "Effect": "Allow",
            "Action": "sns:ListTopics",
            "Resource": "*"
        },
        {
        "Effect": "Allow",
        "Action": "sns:CreateTopic",
        "Resource": "*"
        }
    ]
}

8. SQS Support

Amazon Simple Queue Service is a messaging service that provides point-to-point communication with queues. Spring Cloud AWS SQS integration offers support to receive and send messages using common Spring abstractions such as @SqsListener, MessageListenerContainer and MessageListenerContainerFactory.

Compared to JMS or other message services Amazon SQS has limitations that should be taken into consideration.

  • Amazon SQS allows only String payloads, so any Object must be transformed into a String representation. Spring Cloud AWS has dedicated support to transfer Java objects with Amazon SQS messages by converting them to JSON.

  • Amazon SQS has a maximum message size of 256kb per message, so bigger messages will fail to be sent.

A Spring Boot starter is provided to auto-configure SQS integration beans. Maven coordinates, using Spring Cloud AWS BOM:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>

8.1. Sample Listener Application

Below is a minimal sample application leveraging auto-configuration from Spring Boot.

@SpringBootApplication
public class SqsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SqsApplication.class, args);
    }

    @SqsListener("myQueue")
    public void listen(String message) {
        System.out.println(message);
    }
}

Without Spring Boot, it’s necessary to import the SqsBootstrapConfiguration class in a @Configuration, as well as declare a SqsMessageListenerContainerFactory bean.

public class Listener {

    @SqsListener("myQueue")
    public void listen(String message) {
        System.out.println(message);
    }

}

@Import(SqsBootstrapConfiguration.class)
@Configuration
public class SQSConfiguration {

    @Bean
    public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
        return SqsMessageListenerContainerFactory
            .builder()
            .sqsAsyncClient(sqsAsyncClient())
            .build();
    }

    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder().build();
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

}

8.2. Sending Messages

Spring Cloud AWS SQS autoconfigures a SqsAsyncClient bean that can be used for sending messages. Note that the payload has to be converted to a JSON String to be sent. A SqsTemplate should be included in a future milestone simplifying this process.

Currently, a lightweight producer abstraction can be created to facilitate sending messages, such as:

@Component
public class SqsSampleProducer {

    private final SqsAsyncClient sqsAsyncClient;

    private final ObjectMapper objectMapper;

    public SqsSampleProducer(SqsAsyncClient sqsAsyncClient, ObjectMapper objectMapper) {
        this.sqsAsyncClient = sqsAsyncClient;
        this.objectMapper = objectMapper;
    }

    public CompletableFuture<Void> sendToUrl(String queueUrl, Object payload) {
        return this.sqsAsyncClient.sendMessage(request -> request.messageBody(getMessageBodyAsJson(payload)).queueUrl(queueUrl))
                .thenRun(() -> {});
    }

    public CompletableFuture<Void> send(String queueName, Object payload) {
        return this.sqsAsyncClient.getQueueUrl(request -> request.queueName(queueName))
                .thenApply(GetQueueUrlResponse::queueUrl)
                .thenCompose(queueUrl -> sendToUrl(queueUrl, payload));
    }

    private String getMessageBodyAsJson(Object payload) {
        try {
            return objectMapper.writeValueAsString(payload);
        }
        catch (JsonProcessingException e) {
            throw new IllegalArgumentException("Error converting payload: " + payload, e);
        }
    }

}

Modifications can be made to enable adding other attributes such as MessageGroupId, MessageAttributes and MessageDeduplicationId.

It can then be autowired and used such as:

sampleProducer.send(queueName, new MyPojo("My value", "My Other Value")).join();

The join() method blocks the thread and throws any error from the operation. If a CompletableFuture chain is being used, simply return the value instead.

8.3. Receiving Messages

The framework offers the following options to receive messages from a queue.

8.3.1. Message Listeners

To receive messages in a manually created container, a MessageListener or AsyncMessageListener must be provided. Both interfaces come with single message and a batch methods. These are functional interfaces and a lambda or method reference can be provided for the single message methods.

Single message / batch modes and message payload conversion can be configured via ContainerOptions. See Message Conversion and Payload Deserialization for more information.

@FunctionalInterface
public interface MessageListener<T> {

    void onMessage(Message<T> message);

    default void onMessage(Collection<Message<T>> messages) {
        throw new UnsupportedOperationException("Batch not implemented by this MessageListener");
    }

}
@FunctionalInterface
public interface AsyncMessageListener<T> {

    CompletableFuture<Void> onMessage(Message<T> message);

    default CompletableFuture<Void> onMessage(Collection<Message<T>> messages) {
        return CompletableFutures
                .failedFuture(new UnsupportedOperationException("Batch not implemented by this AsyncMessageListener"));
    }

}

8.3.2. SqsMessageListenerContainer

The MessageListenerContainer manages the entire messages` lifecycle, from polling, to processing, to acknowledging.

It can be instantiated directly, using a SqsMessageListenerContainerFactory, or using @SqsListener annotations. If declared as a @Bean, the Spring context will manage its lifecycle, starting the container on application startup and stopping it on application shutdown. See Container Lifecycle for more information.

It implements the MessageListenerContainer interface:

public interface MessageListenerContainer<T> extends SmartLifecycle {

    String getId();

    void setId(String id);

    void setMessageListener(MessageListener<T> messageListener);

    void setAsyncMessageListener(AsyncMessageListener<T> asyncMessageListener);

}
The generic parameter <T> stands for the payload type of messages to be consumed by this container. This allows ensuring at compile-time that all components used with the container are for the same type. If more than one payload type is to be used by the same container or factory, simply type it as Object. This type is not considered for payload conversion.

A container can be instantiated in a familiar Spring way in a @Configuration annotated class. For example:

@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient) {
    SqsMessageListenerContainer<Object> container = new SqsMessageListenerContainer<>(sqsAsyncClient);
    container.setMessageListener(System.out::println);
    container.setQueueNames("myTestQueue");
    return container;
}

This framework also provides a convenient Builder that allows a different approach, such as:

@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainer
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .queueNames("myTestQueue")
            .build();
}

The container’s lifecycle can also be managed manually:

void myMethod(SqsAsyncClient sqsAsyncClient) {
    SqsMessageListenerContainer<Object> container = SqsMessageListenerContainer
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .queueNames("myTestQueue")
            .build();
    container.start();
    container.stop();
}

8.3.3. SqsMessageListenerContainerFactory

A MessageListenerContainerFactory can be used to create MessageListenerContainer instances, both directly or through @SqsListener annotations.

It can be created in a familiar Spring way, such as:

@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
    SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
    factory.setSqsAsyncClient(sqsAsyncClient);
    return factory;
}

Or through the Builder:

@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .build();
}
Using this method for setting the SqsAsyncClient instance in the factory, all containers created by this factory will share the same SqsAsyncClient instance. For high-throughput applications, a Supplier<SqsAsyncClient> can be provided instead through the factory’s setSqsAsyncClientSupplier or the builder’s sqsAsyncSupplier methods. In this case each container will receive a SqsAsyncClient instance. Alternatively, a single SqsAsyncClient instance can be configured for higher throughput. See the AWS documentation for more information on tradeoffs of each approach.

The factory can also be used to create a container directly, such as:

@Bean
MessageListenerContainer<Object> myListenerContainer(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .build()
            .createContainer("myQueue");
}

8.3.4. @SqsListener Annotation

The simplest way to consume SQS messages is by annotating a method in a @Component class with the @SqsListener annotation. The framework will then create the MessageListenerContainer and set a MessagingMessageListenerAdapter to invoke the method when a message is received.

When using Spring Boot with auto-configuration, no configuration is necessary.

Most attributes on the annotation can be resolved from SpEL (#{…​}) or property placeholders (${…​}).

Queue Names

One or more queues can be specified in the annotation through the queueNames or value properties - there’s no distinction between the two properties.

Instead of queue names, queue urls can also be provided. Using urls instead of queue names can result in slightly faster startup times since it prevents the framework from looking up the queue url when the containers start.

@SqsListener({"${my.queue.url}", "myOtherQueue"})
public void listenTwoQueues(String message) {
    System.out.println(message);
}

Any number of @SqsListener annotations can be used in a bean class, and each annotated method will be handled by a separate MessageListenerContainer.

Queues declared in the same annotation will share the container, though each will have separate throughput and acknowledgement controls.
Specifying a MessageListenerContainerFactory

A MessageListenerContainerFactory can be specified through the factory property. Such factory will then be used to create the container for the annotated method.

If not specified, a factory with the defaultSqsListenerContainerFactory name will be looked up. For changing this default name, see Global Configuration for @SqsListeners.

@SqsListener(queueNames = "myQueue", factory = "myFactory")
public void listen(String message) {
    System.out.println(message);
}

When using a Spring Boot application with auto-configuration, a default factory is provided if there are no other factory beans declared in the context.

Other Annotation Properties

The following properties can be specified in the @SqsListener annotation. Such properties override the equivalent ContainerOptions for the resulting MessageListenerContainer.

  • id - Specify the resulting container’s id. This can be used for fetching the container from the MessageListenerContainerRegistry, and is used by the container and its components for general logging and thread naming.

  • maxInflightMessagesPerQueue - Set the maximum number of messages that can be inflight at any given moment. See Message Processing Throughput for more information.

  • pollTimeoutSeconds - Set the maximum time to wait before a poll returns from SQS. Note that if there are messages available the call may return earlier than this setting.

  • messageVisibilitySeconds - Set the minimum visibility for the messages retrieved in a poll. Note that for FIFO single message listener methods, this visibility is applied to the whole batch before each message is sent to the listener. See FIFO Support for more information.

Listener Method Arguments

A number of possible argument types are allowed in the listener method’s signature.

  • MyPojo - POJO types are automatically deserialized from JSON.

  • Message<MyPojo> - Provides a Message<MyPojo> instance with the deserialized payload and MessageHeaders.

  • List<MyPojo> - Enables batch mode and receives the batch that was polled from SQS.

  • List<Message<MyPojo>> - Enables batch mode and receives the batch that was polled from SQS along with headers.

  • @Header(String headerName) - provides the specified header.

  • @Headers - provides the MessageHeaders or a Map<String, Object>

  • Acknowledgement - provides methods for manually acknowledging messages for single message listeners. AcknowledgementMode must be set to MANUAL (see Acknowledging Messages)

  • BatchAcknowledgement - provides methods for manually acknowledging partial or whole message batches for batch listeners. AcknowledgementMode must be set to MANUAL (see Acknowledging Messages)

  • Visibility - provides the changeTo() method that enables changing the message’s visibility to the provided value.

  • QueueAttributes - provides queue attributes for the queue that received the message. See Retrieving Attributes from SQS for how to specify the queue attributes that will be fetched from SQS

  • software.amazon.awssdk.services.sqs.model.Message - provides the original Message from SQS

Here’s a sample with many arguments:

@SqsListener("${my-queue-name}")
public void listen(Message<MyPojo> message, MyPojo pojo, MessageHeaders headers, Acknowledgement ack, Visibility visibility, QueueAttributes queueAttributes, software.amazon.awssdk.services.sqs.model.Message originalMessage) {
    Assert.notNull(message);
    Assert.notNull(pojo);
    Assert.notNull(headers);
    Assert.notNull(ack);
    Assert.notNull(visibility);
    Assert.notNull(queueAttributes);
    Assert.notNull(originalMessage);
}
Batch listeners support a single List<MyPojo> and List<Message<MyPojo>> method arguments, and an optional BatchAcknowledgement or AsyncBatchAcknowledgement arguments. MessageHeaders should be extracted from the Message instances through the getHeaders() method.

8.3.5. Batch Processing

All message processing interfaces have both single message and batch methods. This means the same set of components can be used to process both single and batch methods, and share logic where applicable.

When batch mode is enabled, the framework will serve the entire result of a poll to the listener. If a value greater than 10 is provided for maxMessagesPerPoll, the result of multiple polls will be combined and up to the respective amount of messages will be served to the listener.

To enable batch processing using @SqsListener, a single List<MyPojo> or List<Message<MyPojo>> method argument should be provided in the listener method. The listener method can also have an optional BatchAcknowledgement argument for AcknowledgementMode.MANUAL.

Alternatively, ContainerOptions can be set to ListenerMode.BATCH in the ContainerOptions in the factory or container.

The same factory can be used to create both single message and batch containers for @SqsListener methods.
In case the same factory is shared by both delivery methods, any supplied ErrorHandler, MessageInterceptor or MessageListener should implement the proper methods.

8.3.6. Container Options

Each MessageListenerContainer can have a different set of options. MessageListenerContainerFactory instances have a ContainerOptions.Builder instance property that is used as a template for the containers it creates.

Both factory and container offer a configure method that can be used to change the options:

@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
            .builder()
            .configure(options -> options
                    .messagesPerPoll(5)
                    .pollTimeout(Duration.ofSeconds(10)))
            .sqsAsyncClient(sqsAsyncClient)
            .build();
}
@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainer
            .builder()
            .configure(options -> options
                    .messagesPerPoll(5)
                    .pollTimeout(Duration.ofSeconds(10)))
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .queueNames("myTestQueue")
            .build();
}

The ContainerOptions instance is immutable and can be retrieved via the container.getContainerOptions() method. If more complex configurations are necessary, the toBuilder and fromBuilder methods provide ways to create a new copy of the options, and then set it back to the factory or container:

void myMethod(MessageListenerContainer<Object> container) {
    ContainerOptions.Builder modifiedOptions = container.getContainerOptions()
            .toBuilder()
            .pollTimeout(Duration.ofSeconds(5))
            .shutdownTimeout(Duration.ofSeconds(20));
    container.configure(options -> options.fromBuilder(modifiedOptions));
}

A copy of the options can also be created with containerOptions.createCopy() or containerOptionsBuilder.createCopy().

Using Auto-Configuration

The Spring Boot Starter for SQS provides the following auto-configuration properties:

Name

Description

Required

Default value

spring.cloud.aws.sqs.enabled

Enables the SQS integration.

No

true

spring.cloud.aws.sqs.endpoint

Configures endpoint used by SqsAsyncClient.

No

localhost:4566

spring.cloud.aws.sqs.region

Configures region used by SqsAsyncClient.

No

eu-west-1

spring.cloud.aws.sqs.listener.max-inflight-messages-per-queue

Maximum number of inflight messages per queue.

No

10

spring.cloud.aws.sqs.listener.max-messages-per-poll

Maximum number of messages to be received per poll.

No

10

spring.cloud.aws.sqs.listener.poll-timeout

Maximum amount of time to wait for messages in a poll.

No

10 seconds

ContainerOptions Descriptions
Property Range Default Description

maxInflightMessagesPerQueue

1 - Integer.MAX_VALUE

10

The maximum number of messages from each queue that can be processed simultaneously in this container. This number will be used for defining the thread pool size for the container following (maxInflightMessagesPerQueue * number of queues). For batching acknowledgements a message is considered as no longer inflight when it’s handed to the acknowledgement queue. See Acknowledging Messages.

maxMessagesPerPoll

1 - Integer.MAX_VALUE

10

The maximum number of messages that will be received by a poll to a SQS queue in this container. If a value greater than 10 is provided, the result of multiple polls will be combined, which can be useful for batch listeners.

See AWS documentation for more information.

pollTimeout

1 - 10 seconds

10 seconds

The maximum duration for a poll to a SQS queue before returning empty. Longer polls decrease the chance of empty polls when messages are available. See AWS documentation for more information.

permitAcquireTimeout

1 - 10 seconds

10 seconds

The maximum time the framework will wait for permits to be available for a queue before attempting the next poll. After that period, the framework will try to perform a partial acquire with the available permits, resulting in a poll for less than maxMessagesPerPoll messages, unless otherwise configured. See Message Processing Throughput.

shutdownTimeout

0 - undefined

10 seconds

The amount of time the container will wait for a queue to complete its operations before attempting to forcefully shutdown. See Container Lifecycle.

backPressureMode

AUTO, ALWAYS_POLL_MAX_MESSAGES, FIXED_HIGH_THROUGHPUT

AUTO

Configures the backpressure strategy to be used by the container. See Configuring BackPressureMode.

listenerMode

SINGLE_MESSAGE, BATCH

SINGLE_MESSAGE

Configures whether this container will use single message or batch listeners. This value is overriden by @SqsListener depending on whether the listener method contains a List argument. See Batch Processing.

queueAttributeNames

Collection<QueueAttributeName>

Empty list

Configures the QueueAttributes that will be retrieved from SQS when a container starts. See Retrieving Attributes from SQS.

messageAttributeNames

Collection<String>

ALL

Configures the MessageAttributes that will be retrieved from SQS for each message. See Retrieving Attributes from SQS.

messageSystemAttributeNames

Collection<String>

ALL

Configures the MessageSystemAttribute that will be retrieved from SQS for each message. See Retrieving Attributes from SQS.

messageConverter

MessagingMessageConverter

SqsMessagingMessageConverter

Configures the MessagingMessageConverter that will be used to convert SQS messages into Spring Messaging Messages. See Message Conversion and Payload Deserialization.

acknowledgementMode

ON_SUCCESS, ALWAYS, MANUAL

ON_SUCCESS

Configures the processing outcomes that will trigger automatic acknowledging of messages. See Acknowledging Messages.

acknowledgementInterval

0 - undefined

1 second for Standard SQS, Duration.ZERO for FIFO SQS

Configures the interval between acknowledges for batching. Set to Duration.ZERO along with acknowledgementThreshold to zero to enable immediate acknowledgement See Acknowledging Messages.

acknowledgementThreshold

0 - undefined

10 for Standard SQS, 0 for FIFO SQS

Configures the minimal amount of messages in the acknowledgement queue to trigger acknowledgement of a batch. Set to zero along with acknowledgementInterval to Duration.ZERO to enable immediate acknowledgement. See Acknowledging Messages.

acknowledgementOrdering

PARALLEL, ORDERED

PARALLEL for Standard SQS and FIFO queues with immediate acknowledgement, ORDERED for FIFO queues with acknowledgement batching enabled.

Configures the order acknowledgements should be made. Fifo queues can be acknowledged in parallel for immediate acknowledgement since the next message for a message group will only start being processed after the previous one has been acknowledged. See Acknowledging Messages.

containerComponentsTaskExecutor

TaskExecutor

null

Provides a TaskExecutor instance to be used by the MessageListenerContainer internal components. See Providing a TaskExecutor.

messageVisibility

Duration

null

Specify the message visibility duration for messages polled in this container. For FIFO queues, visibility is extended for all messages in a message group before each message is processed. See FIFO Support. Otherwise, visibility is specified once when polling SQS.

queueNotFoundStrategy

FAIL, CREATE

CREATE

Configures the behavior when a queue is not found at container startup. See Container Lifecycle.

8.3.7. Retrieving Attributes from SQS

QueueAttributes, MessageAttributes and MessageSystemAttributes can be retrieved from SQS. These can be configured using the ContainerOptions queueAttributeNames, messageAttributeNames and messageSystemAttributeNames methods.

QueueAttributes for a queue are retrieved when containers start, and can be looked up by adding the QueueAttributes method parameter in a @SqsListener method, or by getting the SqsHeaders.SQS_QUEUE_ATTRIBUTES_HEADER header.

MessageAttributes and MessageSystemAttributes are retrieved with each message, and are mapped to message headers. Those can be retrieved with @Header parameters, or directly in the Message. The message headers are prefixed with SqsHeaders.SQS_MA_HEADER_PREFIX ("Sqs_MA_") for message attributes and SqsHeaders.SQS_MSA_HEADER_PREFIX ("Sqs_MSA_") for message system attributes.

By default, no QueueAttributes and ALL MessageAttributes and MessageSystemAttributes are retrieved.

8.3.8. Container Lifecycle

The MessageListenerContainer interface extends SmartLifecycle, which provides methods to control the container’s lifecycle.

Containers created from @SqsListener annotations are registered in a MessageListenerContainerRegistry bean that is registered by the framework. The containers themselves are not Spring-managed beans, and the registry is responsible for managing these containers` lifecycle in application startup and shutdown.

At startup, the containers will make requests to SQS to retrieve the queues` urls for the provided queue names, and for retrieving QueueAttributes if so configured. Providing queue urls instead of names and not requesting queue attributes can result in slightly better startup times since there’s no need for such requests.

If retrieving the queue url fails due to the queue not existing, the framework can be configured to either create the queue or fail. If a URL is provided instead of a queue name the framework will not make this request at startup, and thus if the queue does not exist it will fail at runtime. This configuration is available in ContainerOptions queueNotFoundStrategy.

At shutdown, by default containers will wait for all polling, processing and acknowledging operations to finish, up to ContainerOptions.getShutdownTimeout(). After this period, operations will be canceled and the container will attempt to forcefully shutdown.

Containers as Spring Beans

Manually created containers can be registered as beans, e.g. by declaring a @Bean in a @Configuration annotated class. In these cases the containers lifecycle will be managed by the Spring context at application startup and shutdown.

@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainer
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .queueNames("myTestQueue")
            .build();
}
Retrieving Containers from the Registry

Containers can be retrieved by fetching the MessageListenerContainer bean from the container and using the getListenerContainers and getContainerById methods. Then lifecycle methods can be used to start and stop instances.

@Autowired
MessageListenerContainerRegistry registry;

public void myLifecycleMethod() {
    MessageListenerContainer container = registry.getContainerById("myId");
    container.stop();
    container.start();
}
Lifecycle Execution

By default, all lifecycle actions performed by the MessageListenerContainerRegistry and internally by the MessageListenerContainer instances are executed in parallel.

This behavior can be disabled by setting LifecycleHandler.get().setParallelLifecycle(false).

Spring-managed MessageListenerContainer beans' lifecycle actions are always performed sequentially.

8.3.9. FIFO Support

FIFO SQS queues are fully supported for receiving messages - queues with names that ends in .fifo will automatically be setup as such.

  • Messages are polled with a receiveRequestAttemptId, and the received batch of messages is split according to the message`s MessageGroupId.

  • Each message from a given group will then be processed in order, while each group is processed in parallel.

  • If processing fails for a message, the following messages from the same message group are discarded so they will be served again after their message visibility expires.

  • Messages which were already successfully processed and acknowledged will not be served again.

  • If a batch listener is used, each message group from a poll will be served as a batch to the listener method.

  • FIFO queues also have different defaults for acknowledging messages, see Acknowledgement Defaults for more information.

  • If a message visibility is set through @SqsListener or ContainerOptions, visibility will be extended for all messages in the message group before each message is processed.

A MessageListenerContainer can either have only Standard queues or FIFO queues - not both. This is valid both for manually created containers and @SqsListener annotated methods.

8.4. Message Interceptor

The framework offers the MessageInterceptor and the AsyncMessageInterceptor interfaces:

public interface MessageInterceptor<T> {

    default Message<T> intercept(Message<T> message) {
        return message;
    }

    default Collection<Message<T>> intercept(Collection<Message<T>> messages) {
        return messages;
    }

    default void afterProcessing(Message<T> message, Throwable t) {
    }

    default void afterProcessing(Collection<Message<T>> messages, Throwable t) {
    }

}
public interface AsyncMessageInterceptor<T> {

    default CompletableFuture<Message<T>> intercept(Message<T> message) {
        return CompletableFuture.completedFuture(message);
    }

    default CompletableFuture<Collection<Message<T>>> intercept(Collection<Message<T>> messages) {
        return CompletableFuture.completedFuture(messages);
    }

    default CompletableFuture<Void> afterProcessing(Message<T> message, Throwable t) {
        return CompletableFuture.completedFuture(null);
    }

    default CompletableFuture<Void> afterProcessing(Collection<Message<T>> messages, Throwable t) {
        return CompletableFuture.completedFuture(null);
    }

}

When using the auto-configured factory, simply declare a @Bean and the interceptor will be set:

@Bean
public MessageInterceptor<Object> messageInterceptor() {
    return new MessageInterceptor<Object>() {
            @Override
            public Message<Object> intercept(Message<Object> message) {
                return MessageBuilder
                    .fromMessage(message)
                    .setHeader("newHeader", "newValue")
                    .build();
            }
        };
}

Alternatively, implementations can be set in the MessageListenerContainerFactory or directly in the MessageListenerContainer:

@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
    return SqsMessageListenerContainerFactory
        .builder()
        .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
        .messageInterceptor(new MessageInterceptor<Object>() {
            @Override
            public Message<Object> intercept(Message<Object> message) {
                return MessageBuilder
                    .fromMessage(message)
                    .setHeader("newHeader", "newValue")
                    .build();
            }
        })
        .build();
}
Multiple interceptors can be added to the same factory or container.

The intercept methods are executed before a message is processed, and a different message can be returned.

In case a different message is returned, it’s important to add the SqsHeaders.SQS_RECEIPT_HANDLE_HEADER with the value of the original handler so the original message is acknowledged after processing. Also, a SqsHeaders.SQS_MESSAGE_ID_HEADER must always be present.
The intercept methods must not return null.

The afterProcessing methods are executed after message is processed and the ErrorHandler is invoked, but before the message is acknowledged.

8.5. Error Handling

By default, messages that have an error thrown by the listener will not be acknowledged, and the message can be polled again after visibility timeout expires.

Alternatively, the framework offers the ErrorHandler and AsyncErrorHandler interfaces, which are invoked after a listener execution fails.

public interface ErrorHandler<T> {

    default void handle(Message<T> message, Throwable t) {
    }

    default void handle(Collection<Message<T>> messages, Throwable t) {
    }

}
public interface AsyncErrorHandler<T> {

    default CompletableFuture<Void> handle(Message<T> message, Throwable t) {
        return CompletableFutures.failedFuture(t);
    }

    default CompletableFuture<Void> handle(Collection<Message<T>> messages, Throwable t) {
        return CompletableFutures.failedFuture(t);
    }

}

When using the auto-configured factory, simply declare a @Bean and the error handler will be set:

@Bean
public ErrorHandler<Object> errorHandler() {
    return new ErrorHandler<Object>() {
        @Override
        public void handle(Message<Object> message, Throwable t) {
            // error handling logic
            // throw if the message should not be acknowledged
        }
    }}

Alternatively, implementations can be set in the MessageListenerContainerFactory or directly in the MessageListenerContainer:

@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
    return SqsMessageListenerContainerFactory
        .builder()
        .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
        .errorHandler(new ErrorHandler<Object>() {
            @Override
            public void handle(Message<Object> message, Throwable t) {
                // error handling logic
            }
        })
        .build();
}

If the error handler execution succeeds, i.e. no error is thrown from the error handler, the message is considered to be recovered and is acknowledged according to the acknowledgement configuration.

If the message should not be acknowledged and the ON_SUCCESS acknowledgement mode is set, it’s important to propagate the error. For simply executing an action in case of errors, an interceptor should be used instead, checking the presence of the throwable argument for detecting a failed execution.

8.6. Message Conversion and Payload Deserialization

Payloads are automatically deserialized from JSON for @SqsListener annotated methods using a MappingJackson2MessageConverter.

When using Spring Boot’s auto-configuration, if there’s a single ObjectMapper in Spring Context, such object mapper will be used for converting messages. This includes the one provided by Spring Boot’s auto-configuration itself. For configuring a different ObjectMapper, see Global Configuration for @SqsListeners.

For manually created MessageListeners, MessageInterceptor and ErrorHandler components, or more fine-grained conversion such as using interfaces or inheritance in listener methods, type mapping is required for payload deserialization.

By default, the framework looks for a MessageHeader named Sqs_MA_JavaType containing the fully qualified class name (FQCN) for which the payload should be deserialized to. If such header is found, the message is automatically deserialized to the provided class.

Further configuration can be achieved by providing a configured MessagingMessageConverter instance in the ContainerOptions.

If type mapping is setup or type information is added to the headers, payloads are deserialized right after the message is polled. Otherwise, for @SqsListener annotated methods, payloads are deserialized right before the message is sent to the listener. For providing custom MessageConverter instances to be used by @SqsListener methods, see Global Configuration for @SqsListeners

8.6.1. Configuring a MessagingMessageConverter

The framework provides the SqsMessagingMessageConverter, which implements the MessagingMessageConverter interface.

public interface MessagingMessageConverter<S> {

    Message<?> toMessagingMessage(S source);

    S fromMessagingMessage(Message<?> message);

}

The default header-based type mapping can be configured to use a different header name by using the setPayloadTypeHeader method.

More complex mapping can be achieved by using the setPayloadTypeMapper method, which overrides the default header-based mapping. This method receives a Function<Message<?>, Class<?>> payloadTypeMapper that will be applied to incoming messages.

The default MappingJackson2MessageConverter can be replaced by using the setPayloadMessageConverter method.

The framework also provides the SqsHeaderMapper, which implements the HeaderMapper interface and is invoked by the SqsMessagingMessageConverter. To provide a different HeaderMapper implementation, use the setHeaderMapper method.

An example of such configuration follows:

// Create converter instance
SqsMessagingMessageConverter messageConverter = new SqsMessagingMessageConverter();

// Configure Type Header
messageConverter.setPayloadTypeHeader("myTypeHeader");

// Configure Header Mapper
SqsHeaderMapper headerMapper = new SqsHeaderMapper();
headerMapper.setAdditionalHeadersFunction(((sqsMessage, accessor) -> {
    accessor.setHeader("myCustomHeader", "myValue");
    return accessor.toMessageHeaders();
}));
messageConverter.setHeaderMapper(headerMapper);

// Configure Payload Converter
MappingJackson2MessageConverter payloadConverter = new MappingJackson2MessageConverter();
payloadConverter.setPrettyPrint(true);
messageConverter.setPayloadMessageConverter(payloadConverter);

// Set MessageConverter to the factory or container
factory.configure(options -> options.messageConverter(messageConverter));

8.6.2. Interfaces and Subclasses in Listener Methods

Interfaces and subclasses can be used in @SqsListener annotated methods by configuring a type mapper:

messageConverter.setPayloadTypeMapper(message -> {
    String eventTypeHeader = message.getHeaders().get("myEventTypeHeader", String.class);
    return "eventTypeA".equals(eventTypeHeader)
        ? MyTypeA.class
        : MyTypeB.class;
});

And then, in the listener method:

@SpringBootApplication
public class SqsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SqsApplication.class, args);
    }

    // Retrieve the converted payload
    @SqsListener("myQueue")
    public void listen(MyInterface message) {
        System.out.println(message);
    }

    // Or retrieve a Message with the converted payload
    @SqsListener("myOtherQueue")
    public void listen(Message<MyInterface> message) {
        System.out.println(message);
    }
}

8.7. Acknowledging Messages

In SQS acknowledging a message is the same as deleting the message from the queue. A number of Acknowledgement strategies are available and can be configured via ContainerOptions. Optionally, a callback action can be added to be executed after either a successful or failed acknowledgement.

Here’s an example of a possible configuration:

@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
            .builder()
            .configure(options -> options
                    .acknowledgementMode(AcknowledgementMode.ALWAYS)
                    .acknowledgementInterval(Duration.ofSeconds(3))
                    .acknowledgementThreshold(5)
                    .acknowledgementOrdering(AcknowledgementOrdering.ORDERED)
            )
            .sqsAsyncClient(sqsAsyncClient)
            .build();
}

Each option is explained in the following sections.

All options are available for both single message and batch message listeners.

8.7.1. Acknowledgement Mode

  • ON_SUCCESS - Acknowledges a message or batch of messages after successful processing.

  • ALWAYS - Acknowledges a message or batch of messages after processing returns success or error.

  • MANUAL - The framework won’t acknowledge messages automatically and Acknowledgement objects can be received in the listener method.

8.7.2. Acknowledgement Batching

The acknowledgementInterval and acknowledgementThreshold options enable acknowledgement batching. Acknowledgements will be executed after either the amount of time specified in the interval or the number of messages to acknowledge reaches the threshold.

Setting acknowledgementInterval to Duration.ZERO will disable the periodic acknowledgement, which will be executed only when the number of messages to acknowledge reaches the specified acknowledgementThreshold.

Setting acknowledgementThreshold to 0 will disable acknowledging per number of messages, and messages will be acknowledged only on the specified acknowldgementInterval

When using acknowledgement batching messages stay inflight for SQS purposes until their respective batch is acknowledged. MessageVisibility should be taken into consideration when configuring this strategy.
Immediate Acknowledging

Setting both acknowledgementInterval and acknowledgementThreshold to Duration.ZERO and 0 respectively enables Immediate Acknowledging.

With this configuration, messages are acknowledged sequentially after being processed, and the message is only considered processed after the message is successfully acknowledged.

If an immediate acknowledging triggers an error, message processing is considered failed and will be retried after the specified visibilityTimeout.

8.7.3. Manual Acknowledgement

Acknowledgements can be handled manually by setting AcknowledgementMode.MANUAL in the ContainerOptions.

Manual acknowledgement can be used in conjunction with acknowledgement batching - the message will be queued for acknowledgement but won’t be executed until one of the above criteria is met.

It can also be used in conjunction with immediate acknowledgement.

The following arguments can be used in listener methods to manually acknowledge:

Acknowledgement

The Acknowledgement interface can be used to acknowledge messages in ListenerMode.SINGLE_MESSAGE.

public interface Acknowledgement {

    /**
     * Acknowledge the message.
     */
    void acknowledge();

    /**
     * Asynchronously acknowledge the message.
     */
    CompletableFuture<Void> acknowledgeAsync();

}
BatchAcknowledgement

The BatchAcknowledgement interface can be used to acknowledge messages in ListenerMode.BATCH.

The acknowledge(Collection<Message<T>) method enables acknowledging partial batches.

public interface BatchAcknowledgement<T> {

    /**
     * Acknowledge all messages from the batch.
     */
    void acknowledge();

    /**
     * Asynchronously acknowledge all messages from the batch.
     */
    CompletableFuture<Void> acknowledgeAsync();

    /**
     * Acknowledge the provided messages.
     */
    void acknowledge(Collection<Message<T>> messagesToAcknowledge);

    /**
     * Asynchronously acknowledge the provided messages.
     */
    CompletableFuture<Void> acknowledgeAsync(Collection<Message<T>> messagesToAcknowledge);

}

8.7.4. Acknowledgement Ordering

  • PARALLEL - Acknowledges the messages as soon as one of the above criteria is met - many acknowledgement calls can be made in parallel.

  • ORDERED - One batch of acknowledgements will be executed after the previous one is completed, ensuring FIFO ordering for batching acknowledgements.

  • ORDERED_BY_GROUP - One batch of acknowledgements will be executed after the previous one for the same group is completed, ensuring FIFO ordering of acknowledgements with parallelism between message groups. Only available for FIFO queues.

8.7.5. Acknowledgement Defaults

The defaults for acknowledging differ for Standard and FIFO SQS queues.

Standard SQS
  • Acknowledgement Interval: One second

  • Acknowledgement Threshold: Ten messages

  • Acknowledgement Ordering: PARALLEL

FIFO SQS
  • Acknowledgement Interval: Zero (Immediate)

  • Acknowledgement Threshold: Zero (Immediate)

  • Acknowledgement Ordering: PARALLEL if immediate acknowledgement, ORDERED if batching is enabled (one or both above defaults are overridden).

PARALLEL is the default for FIFO because ordering is guaranteed for processing. This assures no messages from a given MessageGroup will be polled until the previous batch is acknowledged. Implementations of this interface will be executed after an acknowledgement execution completes with either success or failure.

8.7.6. Acknowledgement Result Callback

The framework offers the AcknowledgementResultCallback and AsyncAcknowledgementCallback interfaces that can be added to a SqsMessageListenerContainer or SqsMessageListenerContainerFactory.

public interface AcknowledgementResultCallback<T> {

    default void onSuccess(Collection<Message<T>> messages) {
    }

    default void onFailure(Collection<Message<T>> messages, Throwable t) {
    }

}
public interface AsyncAcknowledgementResultCallback<T> {

    default CompletableFuture<Void> onSuccess(Collection<Message<T>> messages) {
        return CompletableFuture.completedFuture(null);
    }

    default CompletableFuture<Void> onFailure(Collection<Message<T>> messages, Throwable t) {
        return CompletableFuture.completedFuture(null);
    }

}
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
        .builder()
        .sqsAsyncClient(sqsAsyncClient)
        .acknowledgementResultCallback(getAcknowledgementResultCallback())
        .build();
}
When immediate acknowledgement is set, as is the default for FIFO queues, the callback will be executed before the next message in the batch is processed, and next message processing will wait for the callback completion. This can be useful for taking action such as retrying to delete the messages, or stopping the container to prevent duplicate processing in case an acknowledgement fails in a FIFO queue. For batch parallel processing, as is the default for Standard queues the callback execution happens asynchronously.

8.8. Global Configuration for @SqsListeners

A set of configurations can be set for all containers from @SqsListener by providing SqsListenerConfigurer beans.

@FunctionalInterface
public interface SqsListenerConfigurer {

    void configure(EndpointRegistrar registrar);

}

The following attributes can be configured in the registrar:

  • setMessageHandlerMethodFactory - provide a different factory to be used to create the invocableHandlerMethod instances that wrap the listener methods.

  • setListenerContainerRegistry - provide a different MessageListenerContainerRegistry implementation to be used to register the MessageListenerContainers

  • setMessageListenerContainerRegistryBeanName - provide a different bean name to be used to retrieve the MessageListenerContainerRegistry

  • setObjectMapper - set the ObjectMapper instance that will be used to deserialize payloads in listener methods. See Message Conversion and Payload Deserialization for more information on where this is used.

  • manageMessageConverters - gives access to the list of message converters that will be used to convert messages. By default, StringMessageConverter, SimpleMessageConverter and MappingJackson2MessageConverter are used.

  • manageArgumentResolvers - gives access to the list of argument resolvers that will be used to resolve the listener method arguments. The order of resolvers is important - PayloadMethodArgumentResolver should generally be last since it’s used as default.

A simple example would be:

@Bean
SqsListenerConfigurer configurer(ObjectMapper objectMapper) {
    return registrar -> registrar.setObjectMapper(objectMapper);
}
Any number of SqsListenerConfigurer beans can be registered in the context. All instances will be looked up at application startup and iterated through.

8.9. Message Processing Throughput

The following options are available for tuning the application’s throughput. When a configuration is available both in the ContainerOptions and @SqsListener annotation, the annotation value takes precedence, if any.

8.9.1. ContainerOptions and @SqsListener properties

maxInflightMessagesPerQueue

Can be set in either the ContainerOptions or the @SqsListener annotation. Represents the maximum number of messages being processed by the container at a given time. Defaults to 10.

This value is enforced per queue, meaning the number of inflight messages in a container can be up to (number of queues in container * maxInflightMessagesPerQueue).

When using acknowledgement batching, a message is considered as no longer inflight when it’s delivered to the acknowledgement queue. In this case, the actual number of inflight messages on AWS SQS console can be higher than the configured value. When using immediate acknowledgement, a message is considered as no longer inflight after it’s been acknowledged or throws an error.
maxMessagesPerPoll

Set in ContainerOptions or the @SqsListener annotation. Represents the maximum number of messages returned by a single poll to a SQS queue, to a maximum of 10. This value has to be less than or equal to maxInflightMessagesPerQueue. Defaults to 10.

Note that even if the queue has more messages, a poll can return less messages than specified. See the AWS documentation for more information.

pollTimeout

Can be set in either the ContainerOptions or the @SqsListener annotation. Represents the maximum duration of a poll. Higher values represent long polls and increase the probability of receiving full batches of messages. Defaults to 10 seconds.

permitAcquireTimeout

Set in ContainerOptions. Represents the maximum amount of time the container will wait for maxMessagesPerPoll permits to be available before trying to acquire a partial batch. This wait is applied per queue and one queue has no interference in another in this regard. Defaults to 10 seconds.

8.9.2. Default Polling Behavior

By default, the framework starts all queues in low throughput mode, where it will perform one poll for messages at a time. When a poll returns at least one message, the queue enters a high throughput mode where it will try to fulfill maxInflightMessagesPerQueue messages by making (maxInflightMessagesPerQueue / maxMessagesPerPoll) parallel polls to the queue. Any poll that returns no messages will trigger a low throughput mode again, until at least one message is returned, triggering high throughput mode again, and so forth.

After permitAcquireTimeout, if maxMessagesPerPoll permits are not available, it’ll poll for the difference, i.e. as many messages as have been processed so far, if any.

E.g. Let’s consider a scenario where the container is configured for: maxInflightMessagesPerQueue = 20, maxMessagesPerPoll = 10, permitAcquireTimeout = 5 seconds, and a pollTimeout = 10 seconds.

The container starts in low throughput mode, meaning it’ll attempt a single poll for 10 messages. If any messages are returned, it’ll switch to high throughput mode, and will make up to 2 simultaneous polls for 10 messages each. If all 20 messages are retrieved, it’ll not attempt any more polls until messages are processed. If after the 5 seconds for permitAcquireTimeout 6 messages have been processed, the framework will poll for the 6 messages. If the queue is depleted and a poll returns no messages, it’ll enter low throughput mode again and perform only one poll at a time.

8.9.3. Configuring BackPressureMode

The following BackPressureMode values can be set in ContainerOptions to configure polling behavior:

  • AUTO - The default mode, as described in the previous section.

  • ALWAYS_POLL_MAX_MESSAGES - Disables partial batch polling, i.e. if the container is configured for 10 messages per poll, it’ll wait for 10 messages to be processed before attempting to poll for the next 10 messages. Useful for optimizing for fewer polls at the expense of throughput.

  • FIXED_HIGH_THROUGHPUT - Disables low throughput mode, while still attempting partial batch polling as described in the previous section. Useful for really high throughput scenarios where the risk of making parallel polls to an idle queue is preferable to an eventual switch to low throughput mode .

The AUTO setting should be balanced for most use cases, including high throughput ones.

8.10. Blocking and Non-Blocking (Async) Components

The SQS integration leverages the CompletableFuture-based async capabilities of AWS SDK 2.0 to deliver a fully non-blocking infrastructure. All processing involved in polling for messages, changing message visibilities and acknowledging messages is done in an async, non-blocking fashion. This allows a higher overall throughput for the application.

When a MessageListener, MessageInterceptor, and ErrorHandler implementation is set to a MesssageListenerContainer or MesssageListenerContainerFactory these are adapted by the framework. This way, blocking and non-blocking components can be used in conjunction with each other.

Listener methods annotated with @SqsListener can either return a simple value, e.g. void, or a CompletableFuture<Void>. The listener method will then be wrapped in either a MessagingMessageListenerAdapter or a AsyncMessagingMessageListenerAdapter respectively.

In order to achieve higher throughput, it’s encouraged that, at least for simpler logic in message listeners, interceptors and error handlers, the async variants are used.

8.10.1. Threading and Blocking Components

Message processing always starts in a framework thread from the default or provided TaskExecutor.

If an async component is invoked and the execution returns to the framework on a different thread, such thread will be used until a blocking component is found, when the execution switches back to a TaskExecutor thread to avoid blocking i.e. SqsAsyncClient or HttpClient threads.

If by the time the execution reaches a blocking component it’s already on a framework thread, it remains in the same thread to avoid excessive thread allocation and hopping.

When using async methods it’s critical not to block the incoming thread, which might be very detrimental to overall performance. If thread-blocking logic has to be used, the blocking logic should be executed on another thread, e.g. using CompletableFuture.supplyAsync(() → myLogic(), myExecutor). Otherwise, a sync interface should be used.

8.10.2. Providing a TaskExecutor

The default TaskExecutor is a ThreadPoolTaskExecutor, and a different componentTaskExecutor supplier can be set in the ContainerOptions.

When providing a custom executor, it’s important that it’s configured to support all threads that will be created, which should be (maxInflightMessagesPerQueue * total number of queues). Also, to avoid unnecessary thread hopping between blocking components, a MessageExecutionThreadFactory should be set to the executor.

If setting the ThreadFactory is not possible, it’s advisable to allow for extra threads in the thread pool to account for the time between a new thread is requested and the previous thread is released.

8.11. IAM Permissions

Following IAM permissions are required by Spring Cloud AWS SQS:

Send message to Queue

sqs:SendMessage

Receive message from queue

sqs:ReceiveMessage

Delete message from queue

sqs:DeleteMessage

To use sqsListener with SimpleMessageListenerContainerFactory you will need to add as well

sqs:GetQueueAttributes

To use SqsListener with Sqs name instead of ARN you will need

sqs:GetQueueUrl

Sample IAM policy granting access to SQS:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl"
            ],
            "Resource": "yourARN"
        }

9. Secrets Manager Integration

Secrets Manager helps to protect secrets needed to access your applications, services, and IT resources. The service enables you to easily rotate, manage, and retrieve database credentials, API keys, and other secrets throughout their lifecycle.

Spring Cloud AWS adds support for loading configuration properties from Secrets Manager through Spring Boot config import feature.

Maven coordinates, using Spring Cloud AWS BOM:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-secrets-manager</artifactId>
</dependency>

9.1. Loading External Configuration

To fetch secrets from Secrets Manager and add them to Spring’s environment properties, add spring.config.import property to application.properties:

For example, assuming that the secret name in Secrets Manager is /secrets/database-secrets:

spring.config.import=aws-secretsmanager:/secrets/database-secrets

If a secret with given name does not exist in Secrets Manager, application will fail to start. If secret value is not required for the application, and it should continue to startup even when secret is missing, add optional before prefix:

spring.config.import=optional:aws-secretsmanager:/secrets/database-secrets

To load multiple secrets, separate their names with ;:

spring.config.import=aws-secretsmanager:/secrets/database-secrets;/secrets/webclient-secrets

If some secrets are required, and other ones are optional, list them as separate entries in spring.config.import property:

spring.config.import[0]=optional:aws-secretsmanager=/secrets/required-secret
spring.config.import[1]=aws-secretsmanager=/secrets/optional-secret

Fetched secrets can be referenced with @Value, bound to @ConfigurationProperties classes, or referenced in application.properties file.

9.1.1. Using Key-Value (JSON) Secrets

Secrets resolved with spring.config.import can be also referenced in application.properties. When a content of SecretString in a JSON, all top level JSON keys are added as properties to Spring Environment.

For example, with a file mycreds.json containing following JSON structure:

{
      "username": "saanvi",
      "password": "EXAMPLE-PASSWORD"
}

Secret is created with a command:

$ aws secretsmanager create-secret --name /secrets/database-secrets --secret-string file://mycreds.json

spring.config.import entry is added to application.properties:

spring.config.import=aws-secretsmanager:/secrets/database-secrets

Secret values can be referenced by JSON key names:

@Value("${username}"
private String username;

@Value("${password}"
private String password;

9.1.2. Using plain text secrets

If a SecretString is a plain text, use secret name to retrieve its value. For example, we will JDBC saved as plain text secret type with name /secrets/my-certificate:

$ aws secretsmanager create-secret --name /secrets/prod/jdbc-url --secret-string jdbc:url

spring.config.import entry is added to application.properties:

spring.config.import=aws-secretsmanager:/secrets/prod/jdbc-url

Secret value can be retrieved by referencing secret name:

spring.datasource.url=${jdbc-url}

9.2. Using SecretsManagerClient

The starter automatically configures and registers a SecretsManagerClient bean in the Spring application context. The SecretsManagerClient bean can be used to create or retrieve secrets imperatively.

import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
import software.amazon.awssdk.services.secretsmanager.model.CreateSecretRequest;
...
@Autowired
private SecretsManagerClient secretsManagerClient;
...
secretsManagerClient.createSecret(CreateSecretRequest.builder().name(name).secretString(secret).build());

9.3. Customizing SecretsManagerClient

To use custom SecretsManagerClient in spring.config.import, provide an implementation of BootstrapRegistryInitializer. For example:

package com.app;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import org.springframework.boot.BootstrapRegistry;
import org.springframework.boot.BootstrapRegistryInitializer;

public class SecretsManagerBootstrapConfiguration implements BootstrapRegistryInitializer {

    @Override
    public void initialize(BootstrapRegistry registry) {
        registry.register(SecretsManagerClient.class, context -> {
            AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("yourAccessKey", "yourSecretKey"));
            return SecretsManagerClient.builder().credentialsProvider(awsCredentialsProvider).region(Region.EU_WEST_2).build();
        });
    }
}

Note that this class must be listed under org.springframework.boot.BootstrapRegistryInitializer key in META-INF/spring.factories:

org.springframework.boot.BootstrapRegistryInitializer=com.app.SecretsManagerBootstrapConfiguration

If you want to use autoconfigured SecretsManagerClient but change underlying SDKClient or ClientOverrideConfiguration you will need to register bean of type AwsClientConfigurerSecretsManager: Autoconfiguration will configure SecretsManagerClient Bean with provided values after that, for example:

package com.app;

import io.awspring.cloud.autoconfigure.config.secretsmanager.AwsSecretsManagerClientCustomizer;
import java.time.Duration;
import org.springframework.boot.BootstrapRegistry;
import org.springframework.boot.BootstrapRegistryInitializer;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClientBuilder;

class SecretsManagerBootstrapConfiguration implements BootstrapRegistryInitializer {

    @Override
    public void initialize(BootstrapRegistry registry) {
        registry.register(AwsSecretsManagerClientCustomizer.class,
            context -> new AwsSecretsManagerClientCustomizer() {

                @Override
                public ClientOverrideConfiguration overrideConfiguration() {
                    return ClientOverrideConfiguration.builder().apiCallTimeout(Duration.ofMillis(500))
                            .build();
                }

                @Override
                public SdkHttpClient httpClient() {
                    return ApacheHttpClient.builder().connectionTimeout(Duration.ofMillis(1000)).build();
                }
            });
    }
}

9.4. Configuration

The Spring Boot Starter for Secrets Manager provides the following configuration options:

Name

Description

Required

Default value

spring.cloud.aws.secretsmanager.enabled

Enables the Secrets Manager integration.

No

true

spring.cloud.aws.secretsmanager.endpoint

Configures endpoint used by SecretsManagerClient.

No

null

spring.cloud.aws.secretsmanager.region

Configures region used by SecretsManagerClient.

No

null

9.5. IAM Permissions

Following IAM permissions are required by Spring Cloud AWS:

Get secret value:

secretsmanager:GetSecretValue

Sample IAM policy granting access to Secrets Manager:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "secretsmanager:GetSecretValue",
            "Resource": "yourArn"
        }
    ]
}

10. Parameter Store Integration

Spring Cloud AWS adds support for loading configuration properties from Parameter Store through Spring Boot config import feature.

Maven coordinates, using Spring Cloud AWS BOM:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-starter-aws-parameter-store</artifactId>
</dependency>

10.1. Loading External Configuration

To fetch parameters from Parameter Store and add them to Spring’s environment properties, add spring.config.import property to application.properties:

For example, assuming that the parameters in Parameter Store are stored under path /config/spring:

Parameter Name Parameter Value

/config/spring/message

Welcome

/config/spring/httpUrl

external-service:3030/

Once spring.config.import statement is added:

spring.config.import=aws-parameterstore:/config/spring

Two parameters are added to environment: message and httpUrl.

If a given path in Parameter Store does not exist, application will fail to start. If parameters retrieved from Parameter Store are not required for the application, and it should continue to startup even when the path is missing, add optional before prefix:

spring.config.import=optional:aws-parameterstore:/config/spring

To load parameters from multiple paths, separate their names with ;:

spring.config.import=aws-parameterstore:/config/spring;/config/app

If some parameters are required, and other ones are optional, list them as separate entries in spring.config.import property:

spring.config.import[0]=optional:aws-parameterstore=/config/spring
spring.config.import[1]=aws-parameterstore=/config/optional-params/

10.2. Using SsmClient

The starter automatically configures and registers a SsmClient bean in the Spring application context. The SsmClient bean can be used to create or retrieve parameters from Parameter Store.

import org.springframework.stereotype.Component;
import software.amazon.awssdk.services.ssm.SsmClient;
...
@Autowired
private SsmClient ssmClient;
...
ssmClient.getParametersByPath(request -> request.path("/config/spring/")).parameters();

10.3. Customizing SsmClient

To use custom SsmClient in spring.config.import, provide an implementation of BootstrapRegistryInitializer. For example:

package com.app;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.ssm.SsmClient;

import org.springframework.boot.BootstrapRegistry;
import org.springframework.boot.BootstrapRegistryInitializer;

class ParameterStoreBootstrapConfiguration implements BootstrapRegistryInitializer {

    @Override
    public void initialize(BootstrapRegistry registry) {
        registry.register(SsmClient.class, context -> {
            AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("yourAccessKey", "yourSecretKey"));
            return SsmClient.builder().credentialsProvider(awsCredentialsProvider).region(Region.EU_WEST_2).build();
        });
    }
}

Note that this class must be listed under org.springframework.boot.BootstrapRegistryInitializer key in META-INF/spring.factories:

org.springframework.boot.BootstrapRegistryInitializer=com.app.ParameterStoreBootstrapConfiguration

If you want to use autoconfigured SsmClient but change underlying SDKClient or ClientOverrideConfiguration you will need to register bean of type AwsClientConfigurerParameterStore: Autoconfiguration will configure SsmClient Bean with provided values after that, for example:

package com.app;

import io.awspring.cloud.autoconfigure.config.parameterstore.AwsParameterStoreClientCustomizer;
import java.time.Duration;
import org.springframework.boot.BootstrapRegistry;
import org.springframework.boot.BootstrapRegistryInitializer;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.services.ssm.SsmClientBuilder;

class ParameterStoreBootstrapConfiguration implements BootstrapRegistryInitializer {

    @Override
    public void initialize(BootstrapRegistry registry) {
        registry.register(AwsParameterStoreClientCustomizer.class,
            context -> new AwsParameterStoreClientCustomizer() {

                @Override
                public ClientOverrideConfiguration overrideConfiguration() {
                    return ClientOverrideConfiguration.builder().apiCallTimeout(Duration.ofMillis(500))
                            .build();
                }

                @Override
                public SdkHttpClient httpClient() {
                    return ApacheHttpClient.builder().connectionTimeout(Duration.ofMillis(1000)).build();
                }
            });
    }
}

10.4. Configuration

The Spring Boot Starter for Parameter Store provides the following configuration options:

Name Description Required Default value

spring.cloud.aws.parameterstore.enabled

Enables the Parameter Store integration.

No

true

spring.cloud.aws.parameterstore.endpoint

Configures endpoint used by SsmClient.

No

null

spring.cloud.aws.parameterstore.region

Configures region used by SsmClient.

No

null

10.5. IAM Permissions

Following IAM permissions are required by Spring Cloud AWS:

Get parameter from specific path

ssm:GetParametersByPath

Sample IAM policy granting access to Parameter Store:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "ssm:GetParametersByPath",
            "Resource": "yourArn"
        }
    ]
}

11. CloudWatch Metrics

Spring Cloud AWS provides Spring Boot auto-configuration for Micrometer CloudWatch integration. To send metrics to CloudWatch add a dependency to micrometer-registry-cloudwatch module:

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-cloudwatch2</artifactId>
</dependency>

Additionally, CloudWatch integration requires a value provided for management.metrics.export.cloudwatch.namespace configuration property.

Following configuration properties are available to configure CloudWatch integration:

property default description

management.metrics.export.cloudwatch.namespace

The namespace which will be used when sending metrics to CloudWatch. This property is needed and must not be null.

management.metrics.export.cloudwatch.step

1m

The interval at which metrics are sent to CloudWatch. The default is 1 minute.

spring.cloud.aws.cloudwatch.enabled

true

If CloudWatch integration should be enabled. This property should be likely set to false for a local development profile.

spring.cloud.aws.cloudwatch.endpoint

Overrides the default endpoint.

spring.cloud.aws.cloudwatch.region

The specific region for CloudWatch integration.

12. Migration from 2.x to 3.x

Properties that have changed from 2.x to 3.x are listed below:

Version 2.x Version 3.x cloud.aws.credentials.access-key

spring.cloud.aws.credentials.access-key

cloud.aws.credentials.secret-key

spring.cloud.aws.credentials.secret-key

cloud.aws.region.static

spring.cloud.aws.region.static

aws.paramstore.enabled

spring.cloud.aws.parameterstore.enabled

aws.paramstore.region

spring.cloud.aws.parameterstore.region

aws.secretsmanager.enabled

spring.cloud.aws.secretsmanager.enabled

aws.secretsmanager.region

Properties that have been removed in 3.x are listed below:

  • Cognito Properties

    • spring.cloud.aws.security.cognito.app-client-id

    • spring.cloud.aws.security.cognito.user-pool-id

    • spring.cloud.aws.security.algorithm

    • spring.cloud.aws.security.region

    • spring.cloud.aws.security.enabled

The same behaviour can be enabled using Spring Boot integration with Spring Security OAuth2 support:

spring.security.oauth2.resourceserver.jwt.issuer-uri=http://127.0.0.1:4566/us-east-1_f865f8979c4d4361b6af703db533dbb4
spring.security.oauth2.resourceserver.jwt.jwk-set-uri=http://127.0.0.1:4566/us-east-1_f865f8979c4d4361b6af703db533dbb4/.well-known/jwks.json

The example above set the URIs to LocalStack’s Cognito Server.

13. Configuration properties

To see the list of all Spring Cloud AWS related configuration properties please check the Appendix page.