Class KinesisMessageDrivenChannelAdapter
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
io.awspring.cloud.kinesis.integration.KinesisMessageDrivenChannelAdapter
- All Implemented Interfaces:
Aware,BeanFactoryAware,BeanNameAware,DisposableBean,InitializingBean,SmartInitializingSingleton,ApplicationContextAware,ApplicationEventPublisherAware,Lifecycle,Phased,SmartLifecycle,org.springframework.integration.context.ComponentSourceAware,org.springframework.integration.context.ExpressionCapable,org.springframework.integration.core.MessageProducer,org.springframework.integration.IntegrationPattern,org.springframework.integration.support.context.NamedComponent,org.springframework.integration.support.management.IntegrationInboundManagement,org.springframework.integration.support.management.IntegrationManagement,org.springframework.integration.support.management.ManageableLifecycle,org.springframework.integration.support.management.ManageableSmartLifecycle,org.springframework.integration.support.management.TrackableComponent
@ManagedResource
@IntegrationManagedResource
public class KinesisMessageDrivenChannelAdapter
extends org.springframework.integration.endpoint.MessageProducerSupport
implements DisposableBean, ApplicationEventPublisherAware
The
MessageProducerSupport implementation for receiving data from Amazon Kinesis stream(s).- Since:
- 4.0
- Author:
- Artem Bilan, Krzysztof Witkowski, Hervé Fortin, Dirk Bonhomme, Greg Eales, Asiel Caballero, Jonathan Nagayoshi
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
org.springframework.integration.support.management.IntegrationManagement.ManagementOverrides -
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleLockFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE -
Constructor Summary
ConstructorsConstructorDescriptionKinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, KinesisShardOffset... shardOffsets) KinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, String... streams) -
Method Summary
Modifier and TypeMethodDescriptionvoiddestroy()protected voiddoStart()protected voiddoStop()protected AttributeAccessorgetErrorMessageAttributes(Message<?> message) protected voidonInit()voidresetCheckpointForShardAtTimestamp(String stream, String shard, long timestamp) voidresetCheckpointForShardToLatest(String stream, String shard) voidresetCheckpointForShardToSequenceNumber(String stream, String shard, String sequenceNumber) voidresetCheckpointForShardToTrimHorizon(String stream, String shard) voidvoidsetApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) voidsetBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA.voidsetCheckpointMode(CheckpointMode checkpointMode) voidsetCheckpointsInterval(long checkpointsInterval) Sets the interval between 2 checkpoints.voidsetCheckpointStore(org.springframework.integration.metadata.ConcurrentMetadataStore checkpointStore) voidsetConcurrency(int concurrency) The maximum number of concurrentKinesisMessageDrivenChannelAdapter.ConsumerInvokers running.voidsetConsumerBackoff(int consumerBackoff) voidsetConsumerExecutor(Executor executor) voidsetConsumerGroup(String consumerGroup) voidsetConverter(Converter<byte[], Object> converter) Specify aConverterto deserialize thebyte[]from record's body.voidsetDescribeStreamBackoff(int describeStreamBackoff) voidsetDescribeStreamRetries(int describeStreamRetries) voidsetDispatcherExecutor(Executor dispatcherExecutor) voidsetEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper) Specify anInboundMessageMapperto extract message headers embedded into the record data.voidsetIdleBetweenPolls(int idleBetweenPolls) The sleep interval in milliseconds used in the main loop between shards polling cycles.voidsetListenerMode(ListenerMode listenerMode) voidsetLockRegistry(org.springframework.integration.support.locks.LockRegistry<?> lockRegistry) Specify aLockRegistryfor exclusive access to provided streams.voidsetLockRenewalTimeout(long lockRenewalTimeout) Configure a timeout in milliseconds to wait for lock on shard renewal.voidsetRecordsLimit(int recordsLimit) The maximum record to poll per on get-records request.voidsetShardListFilter(Function<List<software.amazon.awssdk.services.kinesis.model.Shard>, List<software.amazon.awssdk.services.kinesis.model.Shard>> shardListFilter) Specify aFunction<List<Shard>, List<Shard>>to filter the shards which will be read from.voidsetStartTimeout(int startTimeout) voidsetStreamInitialSequence(KinesisShardOffset streamInitialSequence) voidstartConsumer(String stream, String shard) voidstopConsumer(String stream, String shard) toString()Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, getComponentType, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, getRequiredOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherMethods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskSchedulerMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedTypeMethods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentNameMethods inherited from interface org.springframework.context.SmartLifecycle
isPauseable
-
Constructor Details
-
KinesisMessageDrivenChannelAdapter
public KinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, String... streams) -
KinesisMessageDrivenChannelAdapter
public KinesisMessageDrivenChannelAdapter(software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesis, KinesisShardOffset... shardOffsets)
-
-
Method Details
-
setApplicationEventPublisher
- Specified by:
setApplicationEventPublisherin interfaceApplicationEventPublisherAware
-
setConsumerGroup
-
setCheckpointStore
public void setCheckpointStore(org.springframework.integration.metadata.ConcurrentMetadataStore checkpointStore) -
setConsumerExecutor
-
setDispatcherExecutor
-
setStreamInitialSequence
-
setConverter
-
setListenerMode
-
setCheckpointMode
-
setCheckpointsInterval
public void setCheckpointsInterval(long checkpointsInterval) Sets the interval between 2 checkpoints. Only used when checkpointMode is periodic.- Parameters:
checkpointsInterval- interval between 2 checkpoints (in milliseconds)
-
setRecordsLimit
public void setRecordsLimit(int recordsLimit) The maximum record to poll per on get-records request. Not greater then10000.- Parameters:
recordsLimit- the number of records to for per on get-records request.- See Also:
-
setConsumerBackoff
public void setConsumerBackoff(int consumerBackoff) -
setDescribeStreamBackoff
public void setDescribeStreamBackoff(int describeStreamBackoff) -
setDescribeStreamRetries
public void setDescribeStreamRetries(int describeStreamRetries) -
setStartTimeout
public void setStartTimeout(int startTimeout) -
setLockRenewalTimeout
public void setLockRenewalTimeout(long lockRenewalTimeout) Configure a timeout in milliseconds to wait for lock on shard renewal.- Parameters:
lockRenewalTimeout- the timeout to wait for lock renew in milliseconds.
-
setConcurrency
public void setConcurrency(int concurrency) The maximum number of concurrentKinesisMessageDrivenChannelAdapter.ConsumerInvokers running. TheKinesisMessageDrivenChannelAdapter.ShardConsumers are evenly distributed betweenKinesisMessageDrivenChannelAdapter.ConsumerInvokers. Messages from within the same shard will be processed sequentially. In other words each shard is tied with the particular thread. By default, the concurrency is unlimited and shard is processed in theconsumerExecutordirectly.- Parameters:
concurrency- the concurrency maximum number
-
setIdleBetweenPolls
public void setIdleBetweenPolls(int idleBetweenPolls) The sleep interval in milliseconds used in the main loop between shards polling cycles. Defaults to1000l minimum250.- Parameters:
idleBetweenPolls- the interval to sleep between shards polling cycles.
-
setEmbeddedHeadersMapper
public void setEmbeddedHeadersMapper(org.springframework.integration.mapping.InboundMessageMapper<byte[]> embeddedHeadersMapper) Specify anInboundMessageMapperto extract message headers embedded into the record data.- Parameters:
embeddedHeadersMapper- theInboundMessageMapperto use.
-
setLockRegistry
public void setLockRegistry(org.springframework.integration.support.locks.LockRegistry<?> lockRegistry) Specify aLockRegistryfor exclusive access to provided streams. This is not used when shards-based configuration is provided.- Parameters:
lockRegistry- theLockRegistryto use.
-
setBindSourceRecord
public void setBindSourceRecord(boolean bindSourceRecord) Set to true to bind the source consumer record in the header namedIntegrationMessageHeaderAccessor.SOURCE_DATA. Does not apply to batch listeners.- Parameters:
bindSourceRecord- true to bind.
-
setShardListFilter
public void setShardListFilter(Function<List<software.amazon.awssdk.services.kinesis.model.Shard>, List<software.amazon.awssdk.services.kinesis.model.Shard>> shardListFilter) Specify aFunction<List<Shard>, List<Shard>>to filter the shards which will be read from.- Parameters:
shardListFilter- the filterFunction<List<Shard>, List<Shard>>
-
onInit
protected void onInit()- Overrides:
onInitin classorg.springframework.integration.endpoint.MessageProducerSupport
-
destroy
public void destroy()- Specified by:
destroyin interfaceDisposableBean- Specified by:
destroyin interfaceorg.springframework.integration.support.management.IntegrationManagement- Overrides:
destroyin classorg.springframework.integration.endpoint.AbstractEndpoint
-
stopConsumer
-
startConsumer
-
resetCheckpointForShardToLatest
-
resetCheckpointForShardToTrimHorizon
-
resetCheckpointForShardToSequenceNumber
@ManagedOperation public void resetCheckpointForShardToSequenceNumber(String stream, String shard, String sequenceNumber) -
resetCheckpointForShardAtTimestamp
@ManagedOperation public void resetCheckpointForShardAtTimestamp(String stream, String shard, long timestamp) -
resetCheckpoints
-
doStart
protected void doStart()- Overrides:
doStartin classorg.springframework.integration.endpoint.MessageProducerSupport
-
doStop
protected void doStop()- Overrides:
doStopin classorg.springframework.integration.endpoint.MessageProducerSupport
-
getErrorMessageAttributes
- Overrides:
getErrorMessageAttributesin classorg.springframework.integration.endpoint.MessageProducerSupport
-
toString
- Overrides:
toStringin classorg.springframework.integration.context.IntegrationObjectSupport
-