Class KclMessageDrivenChannelAdapter
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
io.awspring.cloud.kinesis.integration.KclMessageDrivenChannelAdapter
- All Implemented Interfaces:
Aware,BeanFactoryAware,BeanNameAware,DisposableBean,InitializingBean,SmartInitializingSingleton,ApplicationContextAware,ApplicationEventPublisherAware,Lifecycle,Phased,SmartLifecycle,org.springframework.integration.context.ComponentSourceAware,org.springframework.integration.context.ExpressionCapable,org.springframework.integration.core.MessageProducer,org.springframework.integration.IntegrationPattern,org.springframework.integration.support.context.NamedComponent,org.springframework.integration.support.management.IntegrationInboundManagement,org.springframework.integration.support.management.IntegrationManagement,org.springframework.integration.support.management.ManageableLifecycle,org.springframework.integration.support.management.ManageableSmartLifecycle,org.springframework.integration.support.management.TrackableComponent
@ManagedResource
@IntegrationManagedResource
public class KclMessageDrivenChannelAdapter
extends org.springframework.integration.endpoint.MessageProducerSupport
implements ApplicationEventPublisherAware
The
MessageProducerSupport implementation for receiving data from Amazon Kinesis stream(s) using AWS KCL.- Since:
- 4.0
- Author:
- Hervé Fortin, Artem Bilan, Dirk Bonhomme, Siddharth Jain, Minkyu Moon
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
org.springframework.integration.support.management.IntegrationManagement.ManagementOverrides -
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleLockFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE -
Constructor Summary
ConstructorsConstructorDescriptionKclMessageDrivenChannelAdapter(String... streams) KclMessageDrivenChannelAdapter(software.amazon.awssdk.regions.Region region, String... streams) KclMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient kinesisClient, software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient cloudWatchClient, software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient dynamoDBClient, String... streams) -
Method Summary
Modifier and TypeMethodDescriptionvoiddestroy()protected voiddoStart()protected voiddoStop()Takes no action by default.protected AttributeAccessorgetErrorMessageAttributes(Message<?> message) protected voidonInit()voidsetApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) voidsetBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA.voidsetCheckpointMode(CheckpointMode checkpointMode) voidsetCheckpointsInterval(long checkpointsInterval) Sets the interval between 2 checkpoints.voidsetConsumerBackoff(int consumerBackoff) voidsetConsumerGroup(String consumerGroup) voidsetConverter(Converter<byte[], Object> converter) Specify aConverterto deserialize thebyte[]from record's body.voidsetCoordinatorConfigCustomizer(Consumer<software.amazon.kinesis.coordinator.CoordinatorConfig> coordinatorConfigCustomizer) Set aConsumerto configure aCoordinatorConfig.voidsetDynamoDBStreams(software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient dynamoDBStreams) voidsetEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper) Specify anInboundMessageMapperto extract message headers embedded into the record data.voidsetEmptyRecordList(boolean emptyRecordList) Whether to return an empty record list from the consumer to the processor.voidsetExecutor(TaskExecutor executor) voidsetFanOut(boolean fanOut) Specify a retrieval strategy: fan-out (true; default) or polling (false).voidsetGlueSchemaRegistryDeserializer(com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer) voidsetGracefulShutdownTimeout(long gracefulShutdownTimeout) The timeout forScheduler.startGracefulShutdown().voidsetLeaseManagementConfigCustomizer(Consumer<software.amazon.kinesis.leases.LeaseManagementConfig> leaseManagementConfigCustomizer) Set aConsumerto configure aLeaseManagementConfig.voidsetLeaseTableName(String leaseTableName) Set a name of the DynamoDB table name for leases.voidsetLifecycleConfigCustomizer(Consumer<software.amazon.kinesis.lifecycle.LifecycleConfig> lifecycleConfigCustomizer) Set aConsumerto configure aLifecycleConfig.voidsetListenerMode(ListenerMode listenerMode) voidsetMetricsConfigCustomizer(Consumer<software.amazon.kinesis.metrics.MetricsConfig> metricsConfigCustomizer) Set aConsumerto configure aMetricsConfig.voidsetMetricsLevel(software.amazon.kinesis.metrics.MetricsLevel metricsLevel) Specify a metrics level to emit.voidsetPollingIdleTime(long pollingIdleTime) The idle timeout between polls when usingPollingConfig.voidsetPollingMaxRecords(int pollingMaxRecords) The number of records to poll from Kinesis when usingPollingConfig.voidsetStreamInitialSequence(software.amazon.kinesis.common.InitialPositionInStreamExtended streamInitialSequence) voidsetWorkerId(String workerId) Sets the worker identifier used to distinguish different workers/processes of a Kinesis application.toString()Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, getComponentType, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, getRequiredOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherMethods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskSchedulerMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedTypeMethods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentNameMethods inherited from interface org.springframework.context.SmartLifecycle
isPauseable
-
Constructor Details
-
KclMessageDrivenChannelAdapter
-
KclMessageDrivenChannelAdapter
public KclMessageDrivenChannelAdapter(software.amazon.awssdk.regions.Region region, String... streams) -
KclMessageDrivenChannelAdapter
public KclMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient kinesisClient, software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient cloudWatchClient, software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient dynamoDBClient, String... streams)
-
-
Method Details
-
setApplicationEventPublisher
- Specified by:
setApplicationEventPublisherin interfaceApplicationEventPublisherAware
-
setExecutor
-
setConsumerGroup
-
getConsumerGroup
-
setLeaseTableName
Set a name of the DynamoDB table name for leases. Defaults toconsumerGroup.- Parameters:
leaseTableName- the DynamoDB table name for leases.
-
setEmbeddedHeadersMapper
public void setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper) Specify anInboundMessageMapperto extract message headers embedded into the record data.- Parameters:
embeddedHeadersMapper- theInboundMessageMapperto use.
-
setStreamInitialSequence
public void setStreamInitialSequence(software.amazon.kinesis.common.InitialPositionInStreamExtended streamInitialSequence) -
setConsumerBackoff
public void setConsumerBackoff(int consumerBackoff) -
setConverter
-
setListenerMode
-
setCheckpointsInterval
public void setCheckpointsInterval(long checkpointsInterval) Sets the interval between 2 checkpoints.- Parameters:
checkpointsInterval- interval between 2 checkpoints (in milliseconds)
-
setCheckpointMode
-
setWorkerId
Sets the worker identifier used to distinguish different workers/processes of a Kinesis application.- Parameters:
workerId- the worker identifier to use
-
setGlueSchemaRegistryDeserializer
public void setGlueSchemaRegistryDeserializer(com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer) -
setBindSourceRecord
public void setBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.- Parameters:
bindSourceRecord- true to bind.
-
setFanOut
public void setFanOut(boolean fanOut) Specify a retrieval strategy: fan-out (true; default) or polling (false).- Parameters:
fanOut- false for a polling retrieval strategy.
-
setMetricsLevel
public void setMetricsLevel(software.amazon.kinesis.metrics.MetricsLevel metricsLevel) Specify a metrics level to emit. Defaults toMetricsLevel.DETAILED.- Parameters:
metricsLevel- theMetricsLevelfor emitting (or not) metrics into Cloud Watch.
-
setCoordinatorConfigCustomizer
-
setLifecycleConfigCustomizer
-
setMetricsConfigCustomizer
public void setMetricsConfigCustomizer(Consumer<software.amazon.kinesis.metrics.MetricsConfig> metricsConfigCustomizer) Set aConsumerto configure aMetricsConfig. May override whatever could be set individually, likesetMetricsLevel(MetricsLevel).- Parameters:
metricsConfigCustomizer- theConsumerto configure aMetricsConfig.- See Also:
-
setLeaseManagementConfigCustomizer
public void setLeaseManagementConfigCustomizer(Consumer<software.amazon.kinesis.leases.LeaseManagementConfig> leaseManagementConfigCustomizer) Set aConsumerto configure aLeaseManagementConfig.- Parameters:
leaseManagementConfigCustomizer- theConsumerto configure aLeaseManagementConfig.- See Also:
-
setEmptyRecordList
public void setEmptyRecordList(boolean emptyRecordList) Whether to return an empty record list from the consumer to the processor. Works only inListenerMode.batchmode. The message will be sent into the output channel with an emptyListas a payload.- Parameters:
emptyRecordList- true to return an empty record list.- See Also:
-
setPollingMaxRecords
public void setPollingMaxRecords(int pollingMaxRecords) The number of records to poll from Kinesis when usingPollingConfig.- Parameters:
pollingMaxRecords- the number of records to poll from Kinesis.- See Also:
-
setPollingIdleTime
public void setPollingIdleTime(long pollingIdleTime) The idle timeout between polls when usingPollingConfig.- Parameters:
pollingIdleTime- idle timeout between polls.- See Also:
-
setGracefulShutdownTimeout
public void setGracefulShutdownTimeout(long gracefulShutdownTimeout) The timeout forScheduler.startGracefulShutdown(). Defaults to0with the meaning to callScheduler.shutdown().- Parameters:
gracefulShutdownTimeout- the timeout forScheduler.startGracefulShutdown().- See Also:
-
setDynamoDBStreams
public void setDynamoDBStreams(software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient dynamoDBStreams) -
onInit
protected void onInit()- Overrides:
onInitin classorg.springframework.integration.endpoint.MessageProducerSupport
-
doStart
protected void doStart()- Overrides:
doStartin classorg.springframework.integration.endpoint.MessageProducerSupport
-
doStop
protected void doStop()Takes no action by default. Subclasses may override this if they need lifecycle-managed behavior.- Overrides:
doStopin classorg.springframework.integration.endpoint.MessageProducerSupport
-
destroy
public void destroy()- Specified by:
destroyin interfaceDisposableBean- Specified by:
destroyin interfaceorg.springframework.integration.support.management.IntegrationManagement- Overrides:
destroyin classorg.springframework.integration.endpoint.AbstractEndpoint
-
getErrorMessageAttributes
- Overrides:
getErrorMessageAttributesin classorg.springframework.integration.endpoint.MessageProducerSupport
-
toString
- Overrides:
toStringin classorg.springframework.integration.context.IntegrationObjectSupport
-