diff --git a/pom.xml b/pom.xml
index 70f1954dd..dacbc2334 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,9 @@
spring-cloud-aws-starters/spring-cloud-aws-starter-sqs
spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sqs
spring-cloud-aws-starters/spring-cloud-aws-starter-imds
+ spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis
+ spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer
+ spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library
spring-cloud-aws-samples
spring-cloud-aws-test
spring-cloud-aws-modulith
diff --git a/spring-cloud-aws-autoconfigure/pom.xml b/spring-cloud-aws-autoconfigure/pom.xml
index be8982859..a356c9c47 100644
--- a/spring-cloud-aws-autoconfigure/pom.xml
+++ b/spring-cloud-aws-autoconfigure/pom.xml
@@ -191,6 +191,21 @@
kms
true
+
+ software.amazon.awssdk
+ kinesis
+ true
+
+
+ software.amazon.kinesis
+ amazon-kinesis-client
+ true
+
+
+ software.amazon.kinesis
+ amazon-kinesis-producer
+ true
+
software.amazon.encryption.s3
amazon-s3-encryption-client-java
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java
new file mode 100644
index 000000000..60bdb3d74
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import io.awspring.cloud.autoconfigure.AwsClientCustomizer;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+
+/**
+ * Callback interface that can be used to customize a {@link KinesisAsyncClientBuilder}.
+ *
+ * @author Matej Nedic
+ * @since 4.0.0
+ */
+@FunctionalInterface
+public interface KinesisAsyncClientCustomizer extends AwsClientCustomizer {
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java
new file mode 100644
index 000000000..2a221a3b3
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer;
+import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
+import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails;
+import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+@AutoConfiguration
+@ConditionalOnClass({ KinesisAsyncClient.class })
+@EnableConfigurationProperties({ KinesisProperties.class })
+@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class })
+@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.enabled", havingValue = "true", matchIfMissing = true)
+public class KinesisAutoConfiguration {
+
+ @ConditionalOnMissingBean
+ @Bean
+ public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties,
+ AwsClientBuilderConfigurer awsClientBuilderConfigurer,
+ ObjectProvider connectionDetails,
+ ObjectProvider kinesisAsyncClientCustomizer,
+ ObjectProvider awsSyncClientCustomizers) {
+ return awsClientBuilderConfigurer
+ .configureAsyncClient(KinesisAsyncClient.builder(), properties, connectionDetails.getIfAvailable(),
+ kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream())
+ .build();
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java
new file mode 100644
index 000000000..99418ea67
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
+import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails;
+import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.context.properties.PropertyMapper;
+import org.springframework.context.annotation.Bean;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.providers.AwsRegionProvider;
+import software.amazon.kinesis.producer.KinesisProducer;
+import software.amazon.kinesis.producer.KinesisProducerConfiguration;
+
+@AutoConfiguration
+@ConditionalOnClass({ KinesisProducer.class, KinesisProducerConfiguration.class })
+@EnableConfigurationProperties({ KinesisProducerProperties.class })
+@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class })
+@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.producer.enabled", havingValue = "true", matchIfMissing = true)
+public class KinesisProducerAutoConfiguration {
+
+ @ConditionalOnMissingBean
+ @Bean
+ public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProducerProperties prop,
+ AwsCredentialsProvider credentialsProvider, AwsRegionProvider awsRegionProvider,
+ ObjectProvider connectionDetails) {
+ PropertyMapper propertyMapper = PropertyMapper.get();
+ KinesisProducerConfiguration config = new KinesisProducerConfiguration();
+ propertyMapper.from(prop::getAggregationEnabled).to(config::setAggregationEnabled);
+ propertyMapper.from(prop::getAggregationMaxCount).to(config::setAggregationMaxCount);
+ propertyMapper.from(prop::getAggregationMaxSize).to(config::setAggregationMaxSize);
+ propertyMapper.from(prop::getCloudwatchEndpoint).whenHasText().to(config::setCloudwatchEndpoint);
+ propertyMapper.from(prop::getCloudwatchPort).to(config::setCloudwatchPort);
+ propertyMapper.from(prop::getCollectionMaxCount).to(config::setCollectionMaxCount);
+ propertyMapper.from(prop::getCollectionMaxSize).to(config::setCollectionMaxSize);
+ propertyMapper.from(prop::getConnectTimeout).to(config::setConnectTimeout);
+ propertyMapper.from(prop::getCredentialsRefreshDelay).to(config::setCredentialsRefreshDelay);
+ propertyMapper.from(prop::getEnableCoreDumps).to(config::setEnableCoreDumps);
+ propertyMapper.from(prop::getFailIfThrottled).to(config::setFailIfThrottled);
+ propertyMapper.from(prop::getLogLevel).whenHasText().to(config::setLogLevel);
+ propertyMapper.from(prop::getMaxConnections).to(config::setMaxConnections);
+ propertyMapper.from(prop::getMetricsGranularity).whenHasText().to(config::setMetricsGranularity);
+ propertyMapper.from(prop::getMetricsLevel).whenHasText().to(config::setMetricsLevel);
+ propertyMapper.from(prop::getMetricsNamespace).whenHasText().to(config::setMetricsNamespace);
+ propertyMapper.from(prop::getMetricsUploadDelay).to(config::setMetricsUploadDelay);
+ propertyMapper.from(prop::getMinConnections).to(config::setMinConnections);
+ propertyMapper.from(prop::getNativeExecutable).to(config::setNativeExecutable);
+ propertyMapper.from(prop::getRateLimit).to(config::setRateLimit);
+ propertyMapper.from(prop::getRecordMaxBufferedTime).to(config::setRecordMaxBufferedTime);
+ propertyMapper.from(prop::getRecordTtl).to(config::setRecordTtl);
+ propertyMapper.from(prop::getRequestTimeout).to(config::setRequestTimeout);
+ propertyMapper.from(prop::getTempDirectory).to(config::setTempDirectory);
+ propertyMapper.from(prop::getVerifyCertificate).to(config::setVerifyCertificate);
+ propertyMapper.from(prop.getProxyHost()).to(config::setProxyHost);
+ propertyMapper.from(prop.getProxyPort()).to(config::setProxyPort);
+ propertyMapper.from(prop.getProxyUserName()).whenHasText().to(config::setProxyUserName);
+ propertyMapper.from(prop.getProxyPassword()).whenHasText().to(config::setProxyPassword);
+ propertyMapper.from(prop.getStsEndpoint()).whenHasText().to(config::setStsEndpoint);
+ propertyMapper.from(prop.getStsPort()).to(config::setStsPort);
+ propertyMapper.from(prop.getThreadingModel()).to(config::setThreadingModel);
+ propertyMapper.from(prop.getThreadPoolSize()).to(config::setThreadPoolSize);
+ propertyMapper.from(prop.getUserRecordTimeoutInMillis()).to(config::setUserRecordTimeoutInMillis);
+
+ config.setCredentialsProvider(credentialsProvider);
+ config.setRegion(AwsClientBuilderConfigurer
+ .resolveRegion(prop, connectionDetails.getIfAvailable(), awsRegionProvider).toString());
+ connectionDetails.ifAvailable(cd -> {
+ config.setKinesisPort(cd.getEndpoint().getPort());
+ config.setKinesisEndpoint(cd.getEndpoint().getHost());
+ });
+ return config;
+ }
+
+ @ConditionalOnMissingBean
+ @Bean
+ public KinesisProducer kinesisProducer(KinesisProducerConfiguration kinesisProducerConfiguration) {
+ return new KinesisProducer(kinesisProducerConfiguration);
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java
new file mode 100644
index 000000000..5d90fde67
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java
@@ -0,0 +1,479 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import static io.awspring.cloud.autoconfigure.kinesis.KinesisProducerProperties.PREFIX;
+
+import io.awspring.cloud.autoconfigure.AwsClientProperties;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import software.amazon.kinesis.producer.KinesisProducerConfiguration;
+
+/**
+ * Properties related to KinesisProducer
+ *
+ * @author Matej Nedic
+ * @since 4.0.0
+ */
+@ConfigurationProperties(prefix = PREFIX)
+public class KinesisProducerProperties extends AwsClientProperties {
+
+ /**
+ * The prefix used for AWS Kinesis configuration.
+ */
+ public static final String PREFIX = "spring.cloud.aws.kinesis.producer";
+
+ /**
+ * Whether aggregation of user records is enabled.
+ */
+ private Boolean aggregationEnabled;
+
+ /**
+ * Maximum number of user records to aggregate. Must be between 1 and Long.MAX_VALUE.
+ */
+ private Long aggregationMaxCount;
+
+ /**
+ * Maximum size in bytes of aggregated records. Must be between 64 and 1048576.
+ */
+ private Long aggregationMaxSize;
+
+ /**
+ * CloudWatch endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$
+ */
+ private String cloudwatchEndpoint;
+
+ /**
+ * CloudWatch port. Must be between 1 and 65535.
+ */
+ private Long cloudwatchPort;
+
+ /**
+ * Maximum number of records to collect before sending a batch. Must be between 1 and 500.
+ */
+ private Long collectionMaxCount;
+
+ /**
+ * Maximum size in bytes of collected records before sending. Must be between 52224 and Long.MAX_VALUE.
+ */
+ private Long collectionMaxSize;
+
+ /**
+ * Connection timeout in milliseconds. Must be between 100 and 300000.
+ */
+ private Long connectTimeout;
+
+ /**
+ * Delay in milliseconds for credentials refresh. Must be between 1 and 300000.
+ */
+ private Long credentialsRefreshDelay;
+
+ /**
+ * Whether core dumps are enabled.
+ */
+ private Boolean enableCoreDumps;
+
+ /**
+ * Whether to fail if throttled by Kinesis.
+ */
+ private Boolean failIfThrottled;
+
+ /**
+ * Log level for the producer. Allowed values: trace, debug, info, warning, error.
+ */
+ private String logLevel;
+
+ /**
+ * Maximum number of connections. Must be between 1 and 256.
+ */
+ private Long maxConnections;
+
+ /**
+ * Metrics granularity. Allowed values: global, stream, shard.
+ */
+ private String metricsGranularity;
+
+ /**
+ * Metrics level. Allowed values: none, summary, detailed.
+ */
+ private String metricsLevel;
+
+ /**
+ * Metrics namespace. Must match the regex: (?!AWS/).{1,255}
+ */
+ private String metricsNamespace;
+
+ /**
+ * Delay in milliseconds for uploading metrics. Must be between 1 and 60000.
+ */
+ private Long metricsUploadDelay;
+
+ /**
+ * Minimum number of connections. Must be between 1 and 16.
+ */
+ private Long minConnections;
+
+ /**
+ * Native executable path.
+ */
+ private String nativeExecutable;
+
+ /**
+ * Rate limit for records per second. Must be between 1 and Long.MAX_VALUE.
+ */
+ private Long rateLimit;
+
+ /**
+ * Maximum buffered time for records in milliseconds. Must be between 0 and Long.MAX_VALUE.
+ */
+ private Long recordMaxBufferedTime;
+
+ /**
+ * Time to live for records in milliseconds. Must be between 100 and Long.MAX_VALUE.
+ */
+ private Long recordTtl;
+
+ /**
+ * Request timeout in milliseconds. Must be between 100 and 600000.
+ */
+ private Long requestTimeout;
+
+ /**
+ * Temporary directory path for the producer.
+ */
+ private String tempDirectory;
+
+ /**
+ * Whether to verify SSL certificates.
+ */
+ private Boolean verifyCertificate;
+
+ /**
+ * Proxy host.
+ */
+ private String proxyHost;
+
+ /**
+ * Proxy port. Must be between 1 and 65535.
+ */
+ private Long proxyPort;
+
+ /**
+ * Proxy username.
+ */
+ private String proxyUserName;
+
+ /**
+ * Proxy password.
+ */
+ private String proxyPassword;
+
+ /**
+ * STS endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$
+ */
+ private String stsEndpoint;
+
+ /**
+ * STS port. Must be between 1 and 65535.
+ */
+ private Long stsPort;
+
+ /**
+ * Threading model for the producer.
+ */
+ private KinesisProducerConfiguration.ThreadingModel threadingModel;
+
+ /**
+ * Thread pool size. Must be greater than or equal to 0.
+ */
+ private Integer threadPoolSize;
+
+ /**
+ * Timeout in milliseconds for user records. Must be greater than or equal to 0.
+ */
+ private Long userRecordTimeoutInMillis;
+
+ public Boolean getAggregationEnabled() {
+ return aggregationEnabled;
+ }
+
+ public void setAggregationEnabled(Boolean aggregationEnabled) {
+ this.aggregationEnabled = aggregationEnabled;
+ }
+
+ public Long getAggregationMaxCount() {
+ return aggregationMaxCount;
+ }
+
+ public void setAggregationMaxCount(Long aggregationMaxCount) {
+ this.aggregationMaxCount = aggregationMaxCount;
+ }
+
+ public Long getAggregationMaxSize() {
+ return aggregationMaxSize;
+ }
+
+ public void setAggregationMaxSize(Long aggregationMaxSize) {
+ this.aggregationMaxSize = aggregationMaxSize;
+ }
+
+ public String getCloudwatchEndpoint() {
+ return cloudwatchEndpoint;
+ }
+
+ public void setCloudwatchEndpoint(String cloudwatchEndpoint) {
+ this.cloudwatchEndpoint = cloudwatchEndpoint;
+ }
+
+ public Long getCloudwatchPort() {
+ return cloudwatchPort;
+ }
+
+ public void setCloudwatchPort(Long cloudwatchPort) {
+ this.cloudwatchPort = cloudwatchPort;
+ }
+
+ public Long getCollectionMaxCount() {
+ return collectionMaxCount;
+ }
+
+ public void setCollectionMaxCount(Long collectionMaxCount) {
+ this.collectionMaxCount = collectionMaxCount;
+ }
+
+ public Long getCollectionMaxSize() {
+ return collectionMaxSize;
+ }
+
+ public void setCollectionMaxSize(Long collectionMaxSize) {
+ this.collectionMaxSize = collectionMaxSize;
+ }
+
+ public Long getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(Long connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public Long getCredentialsRefreshDelay() {
+ return credentialsRefreshDelay;
+ }
+
+ public void setCredentialsRefreshDelay(Long credentialsRefreshDelay) {
+ this.credentialsRefreshDelay = credentialsRefreshDelay;
+ }
+
+ public Boolean getEnableCoreDumps() {
+ return enableCoreDumps;
+ }
+
+ public void setEnableCoreDumps(Boolean enableCoreDumps) {
+ this.enableCoreDumps = enableCoreDumps;
+ }
+
+ public Boolean getFailIfThrottled() {
+ return failIfThrottled;
+ }
+
+ public void setFailIfThrottled(Boolean failIfThrottled) {
+ this.failIfThrottled = failIfThrottled;
+ }
+
+ public String getLogLevel() {
+ return logLevel;
+ }
+
+ public void setLogLevel(String logLevel) {
+ this.logLevel = logLevel;
+ }
+
+ public Long getMaxConnections() {
+ return maxConnections;
+ }
+
+ public void setMaxConnections(Long maxConnections) {
+ this.maxConnections = maxConnections;
+ }
+
+ public String getMetricsGranularity() {
+ return metricsGranularity;
+ }
+
+ public void setMetricsGranularity(String metricsGranularity) {
+ this.metricsGranularity = metricsGranularity;
+ }
+
+ public String getMetricsLevel() {
+ return metricsLevel;
+ }
+
+ public void setMetricsLevel(String metricsLevel) {
+ this.metricsLevel = metricsLevel;
+ }
+
+ public String getMetricsNamespace() {
+ return metricsNamespace;
+ }
+
+ public void setMetricsNamespace(String metricsNamespace) {
+ this.metricsNamespace = metricsNamespace;
+ }
+
+ public Long getMetricsUploadDelay() {
+ return metricsUploadDelay;
+ }
+
+ public void setMetricsUploadDelay(Long metricsUploadDelay) {
+ this.metricsUploadDelay = metricsUploadDelay;
+ }
+
+ public Long getMinConnections() {
+ return minConnections;
+ }
+
+ public void setMinConnections(Long minConnections) {
+ this.minConnections = minConnections;
+ }
+
+ public String getNativeExecutable() {
+ return nativeExecutable;
+ }
+
+ public void setNativeExecutable(String nativeExecutable) {
+ this.nativeExecutable = nativeExecutable;
+ }
+
+ public Long getRateLimit() {
+ return rateLimit;
+ }
+
+ public void setRateLimit(Long rateLimit) {
+ this.rateLimit = rateLimit;
+ }
+
+ public Long getRecordMaxBufferedTime() {
+ return recordMaxBufferedTime;
+ }
+
+ public void setRecordMaxBufferedTime(Long recordMaxBufferedTime) {
+ this.recordMaxBufferedTime = recordMaxBufferedTime;
+ }
+
+ public Long getRecordTtl() {
+ return recordTtl;
+ }
+
+ public void setRecordTtl(Long recordTtl) {
+ this.recordTtl = recordTtl;
+ }
+
+ public Long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public void setRequestTimeout(Long requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ public String getTempDirectory() {
+ return tempDirectory;
+ }
+
+ public void setTempDirectory(String tempDirectory) {
+ this.tempDirectory = tempDirectory;
+ }
+
+ public Boolean getVerifyCertificate() {
+ return verifyCertificate;
+ }
+
+ public void setVerifyCertificate(Boolean verifyCertificate) {
+ this.verifyCertificate = verifyCertificate;
+ }
+
+ public String getProxyHost() {
+ return proxyHost;
+ }
+
+ public void setProxyHost(String proxyHost) {
+ this.proxyHost = proxyHost;
+ }
+
+ public Long getProxyPort() {
+ return proxyPort;
+ }
+
+ public void setProxyPort(Long proxyPort) {
+ this.proxyPort = proxyPort;
+ }
+
+ public String getProxyUserName() {
+ return proxyUserName;
+ }
+
+ public void setProxyUserName(String proxyUserName) {
+ this.proxyUserName = proxyUserName;
+ }
+
+ public String getProxyPassword() {
+ return proxyPassword;
+ }
+
+ public void setProxyPassword(String proxyPassword) {
+ this.proxyPassword = proxyPassword;
+ }
+
+ public String getStsEndpoint() {
+ return stsEndpoint;
+ }
+
+ public void setStsEndpoint(String stsEndpoint) {
+ this.stsEndpoint = stsEndpoint;
+ }
+
+ public Long getStsPort() {
+ return stsPort;
+ }
+
+ public void setStsPort(Long stsPort) {
+ this.stsPort = stsPort;
+ }
+
+ public KinesisProducerConfiguration.ThreadingModel getThreadingModel() {
+ return threadingModel;
+ }
+
+ public void setThreadingModel(KinesisProducerConfiguration.ThreadingModel threadingModel) {
+ this.threadingModel = threadingModel;
+ }
+
+ public Integer getThreadPoolSize() {
+ return threadPoolSize;
+ }
+
+ public void setThreadPoolSize(Integer threadPoolSize) {
+ this.threadPoolSize = threadPoolSize;
+ }
+
+ public Long getUserRecordTimeoutInMillis() {
+ return userRecordTimeoutInMillis;
+ }
+
+ public void setUserRecordTimeoutInMillis(Long userRecordTimeoutInMillis) {
+ this.userRecordTimeoutInMillis = userRecordTimeoutInMillis;
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java
new file mode 100644
index 000000000..a809a0e18
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX;
+
+import io.awspring.cloud.autoconfigure.AwsClientProperties;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * Properties related to KinesisClient
+ *
+ * @author Matej Nedic
+ * @since 4.0.0
+ */
+@ConfigurationProperties(prefix = PREFIX)
+public class KinesisProperties extends AwsClientProperties {
+ /**
+ * The prefix used for AWS Kinesis configuration.
+ */
+ public static final String PREFIX = "spring.cloud.aws.kinesis";
+}
diff --git a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index 685d3a75a..ac5cee27b 100644
--- a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -16,3 +16,5 @@ io.awspring.cloud.autoconfigure.config.secretsmanager.SecretsManagerAutoConfigur
io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreReloadAutoConfiguration
io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreAutoConfiguration
io.awspring.cloud.autoconfigure.config.s3.S3ReloadAutoConfiguration
+io.awspring.cloud.autoconfigure.kinesis.KinesisAutoConfiguration
+io.awspring.cloud.autoconfigure.kinesis.KinesisProducerAutoConfiguration
diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java
new file mode 100644
index 000000000..03fdbf00e
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.awspring.cloud.autoconfigure.ConfiguredAwsClient;
+import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
+import java.net.URI;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+/**
+ * Tests for {@link KinesisAutoConfiguration}.
+ *
+ * @author Matej Nedic
+ */
+class KinesisAutoConfigurationTest {
+
+ private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
+ .withPropertyValues("spring.cloud.aws.region.static:eu-west-1",
+ "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop")
+ .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class,
+ CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class));
+
+ @Test
+ void disableKinesisIntegration() {
+ this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.enabled:false").run(context -> {
+ assertThat(context).doesNotHaveBean(KinesisAsyncClient.class);
+ });
+ }
+
+ @Test
+ void withCustomEndpoint() {
+ this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.endpoint:http://localhost:8090")
+ .run(context -> {
+ ConfiguredAwsClient client = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class));
+ assertThat(client.getEndpoint()).isEqualTo(URI.create("http://localhost:8090"));
+ assertThat(client.isEndpointOverridden()).isTrue();
+ });
+ }
+}
diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java
new file mode 100644
index 000000000..a4043f14f
--- /dev/null
+++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2013-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.awspring.cloud.autoconfigure.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.awspring.cloud.autoconfigure.ConfiguredAwsClient;
+import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
+import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
+import java.time.Duration;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+public class KinesisClientCustomizerTests {
+
+ private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
+ .withPropertyValues("spring.cloud.aws.region.static:eu-west-1",
+ "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop")
+ .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class,
+ CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class));
+
+ @Test
+ void customKinesisClientCustomizer() {
+ contextRunner.withUserConfiguration(KinesisClientCustomizerTests.CustomizerConfig.class).run(context -> {
+ ConfiguredAwsClient kinesisAsyncClient = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class));
+ assertThat(kinesisAsyncClient.getApiCallTimeout()).describedAs("sets property from first customizer")
+ .isEqualTo(Duration.ofMillis(2001));
+ assertThat(kinesisAsyncClient.getApiCallAttemptTimeout())
+ .describedAs("sets property from second customizer").isEqualTo(Duration.ofMillis(2002));
+ });
+ }
+
+ @Configuration(proxyBeanMethods = false)
+ static class CustomizerConfig {
+
+ @Bean
+ KinesisAsyncClientCustomizer kinesisClientCustomizer() {
+ return builder -> {
+ builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> {
+ c.apiCallTimeout(Duration.ofMillis(2001));
+ }));
+ };
+ }
+
+ @Bean
+ KinesisAsyncClientCustomizer kinesisClientCustomizer2() {
+ return builder -> {
+ builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> {
+ c.apiCallAttemptTimeout(Duration.ofMillis(2002));
+ }));
+ };
+ }
+ }
+}
diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml
index 210d2d436..eb25f51c9 100644
--- a/spring-cloud-aws-dependencies/pom.xml
+++ b/spring-cloud-aws-dependencies/pom.xml
@@ -275,11 +275,13 @@
software.amazon.kinesis
amazon-kinesis-client
${kcl.version}
+ true
software.amazon.kinesis
amazon-kinesis-producer
${kpl.version}
+ true
diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml
new file mode 100644
index 000000000..ef90ec96b
--- /dev/null
+++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml
@@ -0,0 +1,33 @@
+
+
+
+
+ spring-cloud-aws
+ io.awspring.cloud
+ 4.0.0-SNAPSHOT
+ ../../pom.xml
+
+
+ 4.0.0
+
+ spring-cloud-aws-starter-kinesis-client-library
+ Spring Cloud AWS Kinesis Client Library
+ Spring Cloud AWS Kinesis Client Library
+
+
+
+ io.awspring.cloud
+ spring-cloud-aws-starter
+
+
+ software.amazon.kinesis
+ amazon-kinesis-client
+
+
+ software.amazon.awssdk
+ kinesis
+
+
+
diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml
new file mode 100644
index 000000000..92fa24ae8
--- /dev/null
+++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml
@@ -0,0 +1,29 @@
+
+
+
+
+ spring-cloud-aws
+ io.awspring.cloud
+ 4.0.0-SNAPSHOT
+ ../../pom.xml
+
+
+ 4.0.0
+
+ spring-cloud-aws-starter-kinesis-producer
+ Spring Cloud AWS Kinesis Producer Starter
+ Spring Cloud AWS Kinesis Producer Starter
+
+
+
+ io.awspring.cloud
+ spring-cloud-aws-starter
+
+
+ software.amazon.kinesis
+ amazon-kinesis-producer
+
+
+
diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml
new file mode 100644
index 000000000..a9f02f837
--- /dev/null
+++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml
@@ -0,0 +1,29 @@
+
+
+
+
+ spring-cloud-aws
+ io.awspring.cloud
+ 4.0.0-SNAPSHOT
+ ../../pom.xml
+
+
+ 4.0.0
+
+ spring-cloud-aws-starter-kinesis
+ Spring Cloud AWS Kinesis Starter
+ Spring Cloud AWS Kinesis Starter
+
+
+
+ io.awspring.cloud
+ spring-cloud-aws-starter
+
+
+ software.amazon.awssdk
+ kinesis
+
+
+