Class KclMessageDrivenChannelAdapter

java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
io.awspring.cloud.kinesis.integration.KclMessageDrivenChannelAdapter
All Implemented Interfaces:
Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, SmartInitializingSingleton, ApplicationContextAware, ApplicationEventPublisherAware, Lifecycle, Phased, SmartLifecycle, org.springframework.integration.context.ComponentSourceAware, org.springframework.integration.context.ExpressionCapable, org.springframework.integration.core.MessageProducer, org.springframework.integration.IntegrationPattern, org.springframework.integration.support.context.NamedComponent, org.springframework.integration.support.management.IntegrationInboundManagement, org.springframework.integration.support.management.IntegrationManagement, org.springframework.integration.support.management.ManageableLifecycle, org.springframework.integration.support.management.ManageableSmartLifecycle, org.springframework.integration.support.management.TrackableComponent

@ManagedResource @IntegrationManagedResource public class KclMessageDrivenChannelAdapter extends org.springframework.integration.endpoint.MessageProducerSupport implements ApplicationEventPublisherAware
The MessageProducerSupport implementation for receiving data from Amazon Kinesis stream(s) using AWS KCL.
Since:
4.0
Author:
Hervé Fortin, Artem Bilan, Dirk Bonhomme, Siddharth Jain, Minkyu Moon
  • Nested Class Summary

    Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement

    org.springframework.integration.support.management.IntegrationManagement.ManagementOverrides
  • Field Summary

    Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint

    lifecycleLock

    Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport

    EXPRESSION_PARSER, logger

    Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement

    METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME

    Fields inherited from interface org.springframework.context.SmartLifecycle

    DEFAULT_PHASE
  • Constructor Summary

    Constructors
    Constructor
    Description
     
    KclMessageDrivenChannelAdapter(software.amazon.awssdk.regions.Region region, String... streams)
     
    KclMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient kinesisClient, software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient cloudWatchClient, software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient dynamoDBClient, String... streams)
     
  • Method Summary

    Modifier and Type
    Method
    Description
    void
     
    protected void
     
    protected void
    Takes no action by default.
     
     
    protected void
     
    void
     
    void
    setBindSourceRecord(boolean bindSourceRecord)
    Set to true to bind the source consumer record in the header named IntegrationMessageHeaderAccessor.SOURCE_DATA.
    void
     
    void
    setCheckpointsInterval(long checkpointsInterval)
    Sets the interval between 2 checkpoints.
    void
    setConsumerBackoff(int consumerBackoff)
     
    void
    setConsumerGroup(String consumerGroup)
     
    void
    setConverter(Converter<byte[],Object> converter)
    Specify a Converter to deserialize the byte[] from record's body.
    void
    setCoordinatorConfigCustomizer(Consumer<software.amazon.kinesis.coordinator.CoordinatorConfig> coordinatorConfigCustomizer)
    Set a Consumer to configure a CoordinatorConfig.
    void
    setDynamoDBStreams(software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient dynamoDBStreams)
     
    void
    setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper)
    Specify an InboundMessageMapper to extract message headers embedded into the record data.
    void
    setEmptyRecordList(boolean emptyRecordList)
    Whether to return an empty record list from the consumer to the processor.
    void
     
    void
    setFanOut(boolean fanOut)
    Specify a retrieval strategy: fan-out (true; default) or polling (false).
    void
    setGlueSchemaRegistryDeserializer(com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer)
     
    void
    setGracefulShutdownTimeout(long gracefulShutdownTimeout)
    The timeout for Scheduler.startGracefulShutdown().
    void
    setLeaseManagementConfigCustomizer(Consumer<software.amazon.kinesis.leases.LeaseManagementConfig> leaseManagementConfigCustomizer)
    Set a Consumer to configure a LeaseManagementConfig.
    void
    setLeaseTableName(String leaseTableName)
    Set a name of the DynamoDB table name for leases.
    void
    setLifecycleConfigCustomizer(Consumer<software.amazon.kinesis.lifecycle.LifecycleConfig> lifecycleConfigCustomizer)
    Set a Consumer to configure a LifecycleConfig.
    void
     
    void
    setMetricsConfigCustomizer(Consumer<software.amazon.kinesis.metrics.MetricsConfig> metricsConfigCustomizer)
    Set a Consumer to configure a MetricsConfig.
    void
    setMetricsLevel(software.amazon.kinesis.metrics.MetricsLevel metricsLevel)
    Specify a metrics level to emit.
    void
    setPollingIdleTime(long pollingIdleTime)
    The idle timeout between polls when using PollingConfig.
    void
    setPollingMaxRecords(int pollingMaxRecords)
    The number of records to poll from Kinesis when using PollingConfig.
    void
    setStreamInitialSequence(software.amazon.kinesis.common.InitialPositionInStreamExtended streamInitialSequence)
     
    void
    setWorkerId(String workerId)
    Sets the worker identifier used to distinguish different workers/processes of a Kinesis application.
     

    Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport

    afterSingletonsInstantiated, buildErrorMessage, getComponentType, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, getRequiredOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher

    Methods inherited from class org.springframework.integration.endpoint.AbstractEndpoint

    doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop

    Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport

    afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait

    Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement

    getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedType

    Methods inherited from interface org.springframework.integration.support.context.NamedComponent

    getBeanName, getComponentName

    Methods inherited from interface org.springframework.context.SmartLifecycle

    isPauseable
  • Constructor Details

    • KclMessageDrivenChannelAdapter

      public KclMessageDrivenChannelAdapter(String... streams)
    • KclMessageDrivenChannelAdapter

      public KclMessageDrivenChannelAdapter(software.amazon.awssdk.regions.Region region, String... streams)
    • KclMessageDrivenChannelAdapter

      public KclMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient kinesisClient, software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient cloudWatchClient, software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient dynamoDBClient, String... streams)
  • Method Details

    • setApplicationEventPublisher

      public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
      Specified by:
      setApplicationEventPublisher in interface ApplicationEventPublisherAware
    • setExecutor

      public void setExecutor(TaskExecutor executor)
    • setConsumerGroup

      public void setConsumerGroup(String consumerGroup)
    • getConsumerGroup

      public String getConsumerGroup()
    • setLeaseTableName

      public void setLeaseTableName(String leaseTableName)
      Set a name of the DynamoDB table name for leases. Defaults to consumerGroup.
      Parameters:
      leaseTableName - the DynamoDB table name for leases.
    • setEmbeddedHeadersMapper

      public void setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper)
      Specify an InboundMessageMapper to extract message headers embedded into the record data.
      Parameters:
      embeddedHeadersMapper - the InboundMessageMapper to use.
    • setStreamInitialSequence

      public void setStreamInitialSequence(software.amazon.kinesis.common.InitialPositionInStreamExtended streamInitialSequence)
    • setConsumerBackoff

      public void setConsumerBackoff(int consumerBackoff)
    • setConverter

      public void setConverter(Converter<byte[],Object> converter)
      Specify a Converter to deserialize the byte[] from record's body. Can be null meaning no deserialization.
      Parameters:
      converter - the Converter to use or null
    • setListenerMode

      public void setListenerMode(ListenerMode listenerMode)
    • setCheckpointsInterval

      public void setCheckpointsInterval(long checkpointsInterval)
      Sets the interval between 2 checkpoints.
      Parameters:
      checkpointsInterval - interval between 2 checkpoints (in milliseconds)
    • setCheckpointMode

      public void setCheckpointMode(CheckpointMode checkpointMode)
    • setWorkerId

      public void setWorkerId(String workerId)
      Sets the worker identifier used to distinguish different workers/processes of a Kinesis application.
      Parameters:
      workerId - the worker identifier to use
    • setGlueSchemaRegistryDeserializer

      public void setGlueSchemaRegistryDeserializer(com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer)
    • setBindSourceRecord

      public void setBindSourceRecord(boolean bindSourceRecord)
      Set to true to bind the source consumer record in the header named IntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.
      Parameters:
      bindSourceRecord - true to bind.
    • setFanOut

      public void setFanOut(boolean fanOut)
      Specify a retrieval strategy: fan-out (true; default) or polling (false).
      Parameters:
      fanOut - false for a polling retrieval strategy.
    • setMetricsLevel

      public void setMetricsLevel(software.amazon.kinesis.metrics.MetricsLevel metricsLevel)
      Specify a metrics level to emit. Defaults to MetricsLevel.DETAILED.
      Parameters:
      metricsLevel - the MetricsLevel for emitting (or not) metrics into Cloud Watch.
    • setCoordinatorConfigCustomizer

      public void setCoordinatorConfigCustomizer(Consumer<software.amazon.kinesis.coordinator.CoordinatorConfig> coordinatorConfigCustomizer)
      Set a Consumer to configure a CoordinatorConfig.
      Parameters:
      coordinatorConfigCustomizer - the Consumer to configure a CoordinatorConfig.
      See Also:
      • CoordinatorConfig
    • setLifecycleConfigCustomizer

      public void setLifecycleConfigCustomizer(Consumer<software.amazon.kinesis.lifecycle.LifecycleConfig> lifecycleConfigCustomizer)
      Set a Consumer to configure a LifecycleConfig.
      Parameters:
      lifecycleConfigCustomizer - the Consumer to configure a LifecycleConfig.
      See Also:
      • LifecycleConfig
    • setMetricsConfigCustomizer

      public void setMetricsConfigCustomizer(Consumer<software.amazon.kinesis.metrics.MetricsConfig> metricsConfigCustomizer)
      Set a Consumer to configure a MetricsConfig. May override whatever could be set individually, like setMetricsLevel(MetricsLevel).
      Parameters:
      metricsConfigCustomizer - the Consumer to configure a MetricsConfig.
      See Also:
      • MetricsConfig
    • setLeaseManagementConfigCustomizer

      public void setLeaseManagementConfigCustomizer(Consumer<software.amazon.kinesis.leases.LeaseManagementConfig> leaseManagementConfigCustomizer)
      Set a Consumer to configure a LeaseManagementConfig.
      Parameters:
      leaseManagementConfigCustomizer - the Consumer to configure a LeaseManagementConfig.
      See Also:
      • LeaseManagementConfig
    • setEmptyRecordList

      public void setEmptyRecordList(boolean emptyRecordList)
      Whether to return an empty record list from the consumer to the processor. Works only in ListenerMode.batch mode. The message will be sent into the output channel with an empty List as a payload.
      Parameters:
      emptyRecordList - true to return an empty record list.
      See Also:
      • ProcessorConfig.callProcessRecordsEvenForEmptyRecordList(boolean)
    • setPollingMaxRecords

      public void setPollingMaxRecords(int pollingMaxRecords)
      The number of records to poll from Kinesis when using PollingConfig.
      Parameters:
      pollingMaxRecords - the number of records to poll from Kinesis.
      See Also:
      • PollingConfig.maxRecords(int)
    • setPollingIdleTime

      public void setPollingIdleTime(long pollingIdleTime)
      The idle timeout between polls when using PollingConfig.
      Parameters:
      pollingIdleTime - idle timeout between polls.
      See Also:
      • PollingConfig.idleTimeBetweenReadsInMillis(long)
    • setGracefulShutdownTimeout

      public void setGracefulShutdownTimeout(long gracefulShutdownTimeout)
      The timeout for Scheduler.startGracefulShutdown(). Defaults to 0 with the meaning to call Scheduler.shutdown().
      Parameters:
      gracefulShutdownTimeout - the timeout for Scheduler.startGracefulShutdown().
      See Also:
      • Scheduler.startGracefulShutdown()
    • setDynamoDBStreams

      public void setDynamoDBStreams(software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient dynamoDBStreams)
    • onInit

      protected void onInit()
      Overrides:
      onInit in class org.springframework.integration.endpoint.MessageProducerSupport
    • doStart

      protected void doStart()
      Overrides:
      doStart in class org.springframework.integration.endpoint.MessageProducerSupport
    • doStop

      protected void doStop()
      Takes no action by default. Subclasses may override this if they need lifecycle-managed behavior.
      Overrides:
      doStop in class org.springframework.integration.endpoint.MessageProducerSupport
    • destroy

      public void destroy()
      Specified by:
      destroy in interface DisposableBean
      Specified by:
      destroy in interface org.springframework.integration.support.management.IntegrationManagement
      Overrides:
      destroy in class org.springframework.integration.endpoint.AbstractEndpoint
    • getErrorMessageAttributes

      protected AttributeAccessor getErrorMessageAttributes(Message<?> message)
      Overrides:
      getErrorMessageAttributes in class org.springframework.integration.endpoint.MessageProducerSupport
    • toString

      public String toString()
      Overrides:
      toString in class org.springframework.integration.context.IntegrationObjectSupport