diff --git a/pom.xml b/pom.xml index 70f1954dd..dacbc2334 100644 --- a/pom.xml +++ b/pom.xml @@ -64,6 +64,9 @@ spring-cloud-aws-starters/spring-cloud-aws-starter-sqs spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sqs spring-cloud-aws-starters/spring-cloud-aws-starter-imds + spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis + spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer + spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library spring-cloud-aws-samples spring-cloud-aws-test spring-cloud-aws-modulith diff --git a/spring-cloud-aws-autoconfigure/pom.xml b/spring-cloud-aws-autoconfigure/pom.xml index be8982859..a356c9c47 100644 --- a/spring-cloud-aws-autoconfigure/pom.xml +++ b/spring-cloud-aws-autoconfigure/pom.xml @@ -191,6 +191,21 @@ kms true + + software.amazon.awssdk + kinesis + true + + + software.amazon.kinesis + amazon-kinesis-client + true + + + software.amazon.kinesis + amazon-kinesis-producer + true + software.amazon.encryption.s3 amazon-s3-encryption-client-java diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java new file mode 100644 index 000000000..60bdb3d74 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAsyncClientCustomizer.java @@ -0,0 +1,29 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import io.awspring.cloud.autoconfigure.AwsClientCustomizer; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; + +/** + * Callback interface that can be used to customize a {@link KinesisAsyncClientBuilder}. + * + * @author Matej Nedic + * @since 4.0.0 + */ +@FunctionalInterface +public interface KinesisAsyncClientCustomizer extends AwsClientCustomizer { +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java new file mode 100644 index 000000000..2a221a3b3 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java @@ -0,0 +1,52 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer; +import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; +import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +@AutoConfiguration +@ConditionalOnClass({ KinesisAsyncClient.class }) +@EnableConfigurationProperties({ KinesisProperties.class }) +@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) +@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.enabled", havingValue = "true", matchIfMissing = true) +public class KinesisAutoConfiguration { + + @ConditionalOnMissingBean + @Bean + public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties, + AwsClientBuilderConfigurer awsClientBuilderConfigurer, + ObjectProvider connectionDetails, + ObjectProvider kinesisAsyncClientCustomizer, + ObjectProvider awsSyncClientCustomizers) { + return awsClientBuilderConfigurer + .configureAsyncClient(KinesisAsyncClient.builder(), properties, connectionDetails.getIfAvailable(), + kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream()) + .build(); + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java new file mode 100644 index 000000000..99418ea67 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java @@ -0,0 +1,100 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer; +import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.context.annotation.Bean; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.providers.AwsRegionProvider; +import software.amazon.kinesis.producer.KinesisProducer; +import software.amazon.kinesis.producer.KinesisProducerConfiguration; + +@AutoConfiguration +@ConditionalOnClass({ KinesisProducer.class, KinesisProducerConfiguration.class }) +@EnableConfigurationProperties({ KinesisProducerProperties.class }) +@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class }) +@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.producer.enabled", havingValue = "true", matchIfMissing = true) +public class KinesisProducerAutoConfiguration { + + @ConditionalOnMissingBean + @Bean + public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProducerProperties prop, + AwsCredentialsProvider credentialsProvider, AwsRegionProvider awsRegionProvider, + ObjectProvider connectionDetails) { + PropertyMapper propertyMapper = PropertyMapper.get(); + KinesisProducerConfiguration config = new KinesisProducerConfiguration(); + propertyMapper.from(prop::getAggregationEnabled).to(config::setAggregationEnabled); + propertyMapper.from(prop::getAggregationMaxCount).to(config::setAggregationMaxCount); + propertyMapper.from(prop::getAggregationMaxSize).to(config::setAggregationMaxSize); + propertyMapper.from(prop::getCloudwatchEndpoint).whenHasText().to(config::setCloudwatchEndpoint); + propertyMapper.from(prop::getCloudwatchPort).to(config::setCloudwatchPort); + propertyMapper.from(prop::getCollectionMaxCount).to(config::setCollectionMaxCount); + propertyMapper.from(prop::getCollectionMaxSize).to(config::setCollectionMaxSize); + propertyMapper.from(prop::getConnectTimeout).to(config::setConnectTimeout); + propertyMapper.from(prop::getCredentialsRefreshDelay).to(config::setCredentialsRefreshDelay); + propertyMapper.from(prop::getEnableCoreDumps).to(config::setEnableCoreDumps); + propertyMapper.from(prop::getFailIfThrottled).to(config::setFailIfThrottled); + propertyMapper.from(prop::getLogLevel).whenHasText().to(config::setLogLevel); + propertyMapper.from(prop::getMaxConnections).to(config::setMaxConnections); + propertyMapper.from(prop::getMetricsGranularity).whenHasText().to(config::setMetricsGranularity); + propertyMapper.from(prop::getMetricsLevel).whenHasText().to(config::setMetricsLevel); + propertyMapper.from(prop::getMetricsNamespace).whenHasText().to(config::setMetricsNamespace); + propertyMapper.from(prop::getMetricsUploadDelay).to(config::setMetricsUploadDelay); + propertyMapper.from(prop::getMinConnections).to(config::setMinConnections); + propertyMapper.from(prop::getNativeExecutable).to(config::setNativeExecutable); + propertyMapper.from(prop::getRateLimit).to(config::setRateLimit); + propertyMapper.from(prop::getRecordMaxBufferedTime).to(config::setRecordMaxBufferedTime); + propertyMapper.from(prop::getRecordTtl).to(config::setRecordTtl); + propertyMapper.from(prop::getRequestTimeout).to(config::setRequestTimeout); + propertyMapper.from(prop::getTempDirectory).to(config::setTempDirectory); + propertyMapper.from(prop::getVerifyCertificate).to(config::setVerifyCertificate); + propertyMapper.from(prop.getProxyHost()).to(config::setProxyHost); + propertyMapper.from(prop.getProxyPort()).to(config::setProxyPort); + propertyMapper.from(prop.getProxyUserName()).whenHasText().to(config::setProxyUserName); + propertyMapper.from(prop.getProxyPassword()).whenHasText().to(config::setProxyPassword); + propertyMapper.from(prop.getStsEndpoint()).whenHasText().to(config::setStsEndpoint); + propertyMapper.from(prop.getStsPort()).to(config::setStsPort); + propertyMapper.from(prop.getThreadingModel()).to(config::setThreadingModel); + propertyMapper.from(prop.getThreadPoolSize()).to(config::setThreadPoolSize); + propertyMapper.from(prop.getUserRecordTimeoutInMillis()).to(config::setUserRecordTimeoutInMillis); + + config.setCredentialsProvider(credentialsProvider); + config.setRegion(AwsClientBuilderConfigurer + .resolveRegion(prop, connectionDetails.getIfAvailable(), awsRegionProvider).toString()); + connectionDetails.ifAvailable(cd -> { + config.setKinesisPort(cd.getEndpoint().getPort()); + config.setKinesisEndpoint(cd.getEndpoint().getHost()); + }); + return config; + } + + @ConditionalOnMissingBean + @Bean + public KinesisProducer kinesisProducer(KinesisProducerConfiguration kinesisProducerConfiguration) { + return new KinesisProducer(kinesisProducerConfiguration); + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java new file mode 100644 index 000000000..5d90fde67 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java @@ -0,0 +1,479 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import static io.awspring.cloud.autoconfigure.kinesis.KinesisProducerProperties.PREFIX; + +import io.awspring.cloud.autoconfigure.AwsClientProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import software.amazon.kinesis.producer.KinesisProducerConfiguration; + +/** + * Properties related to KinesisProducer + * + * @author Matej Nedic + * @since 4.0.0 + */ +@ConfigurationProperties(prefix = PREFIX) +public class KinesisProducerProperties extends AwsClientProperties { + + /** + * The prefix used for AWS Kinesis configuration. + */ + public static final String PREFIX = "spring.cloud.aws.kinesis.producer"; + + /** + * Whether aggregation of user records is enabled. + */ + private Boolean aggregationEnabled; + + /** + * Maximum number of user records to aggregate. Must be between 1 and Long.MAX_VALUE. + */ + private Long aggregationMaxCount; + + /** + * Maximum size in bytes of aggregated records. Must be between 64 and 1048576. + */ + private Long aggregationMaxSize; + + /** + * CloudWatch endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$ + */ + private String cloudwatchEndpoint; + + /** + * CloudWatch port. Must be between 1 and 65535. + */ + private Long cloudwatchPort; + + /** + * Maximum number of records to collect before sending a batch. Must be between 1 and 500. + */ + private Long collectionMaxCount; + + /** + * Maximum size in bytes of collected records before sending. Must be between 52224 and Long.MAX_VALUE. + */ + private Long collectionMaxSize; + + /** + * Connection timeout in milliseconds. Must be between 100 and 300000. + */ + private Long connectTimeout; + + /** + * Delay in milliseconds for credentials refresh. Must be between 1 and 300000. + */ + private Long credentialsRefreshDelay; + + /** + * Whether core dumps are enabled. + */ + private Boolean enableCoreDumps; + + /** + * Whether to fail if throttled by Kinesis. + */ + private Boolean failIfThrottled; + + /** + * Log level for the producer. Allowed values: trace, debug, info, warning, error. + */ + private String logLevel; + + /** + * Maximum number of connections. Must be between 1 and 256. + */ + private Long maxConnections; + + /** + * Metrics granularity. Allowed values: global, stream, shard. + */ + private String metricsGranularity; + + /** + * Metrics level. Allowed values: none, summary, detailed. + */ + private String metricsLevel; + + /** + * Metrics namespace. Must match the regex: (?!AWS/).{1,255} + */ + private String metricsNamespace; + + /** + * Delay in milliseconds for uploading metrics. Must be between 1 and 60000. + */ + private Long metricsUploadDelay; + + /** + * Minimum number of connections. Must be between 1 and 16. + */ + private Long minConnections; + + /** + * Native executable path. + */ + private String nativeExecutable; + + /** + * Rate limit for records per second. Must be between 1 and Long.MAX_VALUE. + */ + private Long rateLimit; + + /** + * Maximum buffered time for records in milliseconds. Must be between 0 and Long.MAX_VALUE. + */ + private Long recordMaxBufferedTime; + + /** + * Time to live for records in milliseconds. Must be between 100 and Long.MAX_VALUE. + */ + private Long recordTtl; + + /** + * Request timeout in milliseconds. Must be between 100 and 600000. + */ + private Long requestTimeout; + + /** + * Temporary directory path for the producer. + */ + private String tempDirectory; + + /** + * Whether to verify SSL certificates. + */ + private Boolean verifyCertificate; + + /** + * Proxy host. + */ + private String proxyHost; + + /** + * Proxy port. Must be between 1 and 65535. + */ + private Long proxyPort; + + /** + * Proxy username. + */ + private String proxyUserName; + + /** + * Proxy password. + */ + private String proxyPassword; + + /** + * STS endpoint. Must match the regex: ^([A-Za-z0-9-\\.]+)?$ + */ + private String stsEndpoint; + + /** + * STS port. Must be between 1 and 65535. + */ + private Long stsPort; + + /** + * Threading model for the producer. + */ + private KinesisProducerConfiguration.ThreadingModel threadingModel; + + /** + * Thread pool size. Must be greater than or equal to 0. + */ + private Integer threadPoolSize; + + /** + * Timeout in milliseconds for user records. Must be greater than or equal to 0. + */ + private Long userRecordTimeoutInMillis; + + public Boolean getAggregationEnabled() { + return aggregationEnabled; + } + + public void setAggregationEnabled(Boolean aggregationEnabled) { + this.aggregationEnabled = aggregationEnabled; + } + + public Long getAggregationMaxCount() { + return aggregationMaxCount; + } + + public void setAggregationMaxCount(Long aggregationMaxCount) { + this.aggregationMaxCount = aggregationMaxCount; + } + + public Long getAggregationMaxSize() { + return aggregationMaxSize; + } + + public void setAggregationMaxSize(Long aggregationMaxSize) { + this.aggregationMaxSize = aggregationMaxSize; + } + + public String getCloudwatchEndpoint() { + return cloudwatchEndpoint; + } + + public void setCloudwatchEndpoint(String cloudwatchEndpoint) { + this.cloudwatchEndpoint = cloudwatchEndpoint; + } + + public Long getCloudwatchPort() { + return cloudwatchPort; + } + + public void setCloudwatchPort(Long cloudwatchPort) { + this.cloudwatchPort = cloudwatchPort; + } + + public Long getCollectionMaxCount() { + return collectionMaxCount; + } + + public void setCollectionMaxCount(Long collectionMaxCount) { + this.collectionMaxCount = collectionMaxCount; + } + + public Long getCollectionMaxSize() { + return collectionMaxSize; + } + + public void setCollectionMaxSize(Long collectionMaxSize) { + this.collectionMaxSize = collectionMaxSize; + } + + public Long getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(Long connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public Long getCredentialsRefreshDelay() { + return credentialsRefreshDelay; + } + + public void setCredentialsRefreshDelay(Long credentialsRefreshDelay) { + this.credentialsRefreshDelay = credentialsRefreshDelay; + } + + public Boolean getEnableCoreDumps() { + return enableCoreDumps; + } + + public void setEnableCoreDumps(Boolean enableCoreDumps) { + this.enableCoreDumps = enableCoreDumps; + } + + public Boolean getFailIfThrottled() { + return failIfThrottled; + } + + public void setFailIfThrottled(Boolean failIfThrottled) { + this.failIfThrottled = failIfThrottled; + } + + public String getLogLevel() { + return logLevel; + } + + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + } + + public Long getMaxConnections() { + return maxConnections; + } + + public void setMaxConnections(Long maxConnections) { + this.maxConnections = maxConnections; + } + + public String getMetricsGranularity() { + return metricsGranularity; + } + + public void setMetricsGranularity(String metricsGranularity) { + this.metricsGranularity = metricsGranularity; + } + + public String getMetricsLevel() { + return metricsLevel; + } + + public void setMetricsLevel(String metricsLevel) { + this.metricsLevel = metricsLevel; + } + + public String getMetricsNamespace() { + return metricsNamespace; + } + + public void setMetricsNamespace(String metricsNamespace) { + this.metricsNamespace = metricsNamespace; + } + + public Long getMetricsUploadDelay() { + return metricsUploadDelay; + } + + public void setMetricsUploadDelay(Long metricsUploadDelay) { + this.metricsUploadDelay = metricsUploadDelay; + } + + public Long getMinConnections() { + return minConnections; + } + + public void setMinConnections(Long minConnections) { + this.minConnections = minConnections; + } + + public String getNativeExecutable() { + return nativeExecutable; + } + + public void setNativeExecutable(String nativeExecutable) { + this.nativeExecutable = nativeExecutable; + } + + public Long getRateLimit() { + return rateLimit; + } + + public void setRateLimit(Long rateLimit) { + this.rateLimit = rateLimit; + } + + public Long getRecordMaxBufferedTime() { + return recordMaxBufferedTime; + } + + public void setRecordMaxBufferedTime(Long recordMaxBufferedTime) { + this.recordMaxBufferedTime = recordMaxBufferedTime; + } + + public Long getRecordTtl() { + return recordTtl; + } + + public void setRecordTtl(Long recordTtl) { + this.recordTtl = recordTtl; + } + + public Long getRequestTimeout() { + return requestTimeout; + } + + public void setRequestTimeout(Long requestTimeout) { + this.requestTimeout = requestTimeout; + } + + public String getTempDirectory() { + return tempDirectory; + } + + public void setTempDirectory(String tempDirectory) { + this.tempDirectory = tempDirectory; + } + + public Boolean getVerifyCertificate() { + return verifyCertificate; + } + + public void setVerifyCertificate(Boolean verifyCertificate) { + this.verifyCertificate = verifyCertificate; + } + + public String getProxyHost() { + return proxyHost; + } + + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public Long getProxyPort() { + return proxyPort; + } + + public void setProxyPort(Long proxyPort) { + this.proxyPort = proxyPort; + } + + public String getProxyUserName() { + return proxyUserName; + } + + public void setProxyUserName(String proxyUserName) { + this.proxyUserName = proxyUserName; + } + + public String getProxyPassword() { + return proxyPassword; + } + + public void setProxyPassword(String proxyPassword) { + this.proxyPassword = proxyPassword; + } + + public String getStsEndpoint() { + return stsEndpoint; + } + + public void setStsEndpoint(String stsEndpoint) { + this.stsEndpoint = stsEndpoint; + } + + public Long getStsPort() { + return stsPort; + } + + public void setStsPort(Long stsPort) { + this.stsPort = stsPort; + } + + public KinesisProducerConfiguration.ThreadingModel getThreadingModel() { + return threadingModel; + } + + public void setThreadingModel(KinesisProducerConfiguration.ThreadingModel threadingModel) { + this.threadingModel = threadingModel; + } + + public Integer getThreadPoolSize() { + return threadPoolSize; + } + + public void setThreadPoolSize(Integer threadPoolSize) { + this.threadPoolSize = threadPoolSize; + } + + public Long getUserRecordTimeoutInMillis() { + return userRecordTimeoutInMillis; + } + + public void setUserRecordTimeoutInMillis(Long userRecordTimeoutInMillis) { + this.userRecordTimeoutInMillis = userRecordTimeoutInMillis; + } +} diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java new file mode 100644 index 000000000..a809a0e18 --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java @@ -0,0 +1,35 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX; + +import io.awspring.cloud.autoconfigure.AwsClientProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Properties related to KinesisClient + * + * @author Matej Nedic + * @since 4.0.0 + */ +@ConfigurationProperties(prefix = PREFIX) +public class KinesisProperties extends AwsClientProperties { + /** + * The prefix used for AWS Kinesis configuration. + */ + public static final String PREFIX = "spring.cloud.aws.kinesis"; +} diff --git a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 685d3a75a..ac5cee27b 100644 --- a/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -16,3 +16,5 @@ io.awspring.cloud.autoconfigure.config.secretsmanager.SecretsManagerAutoConfigur io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreReloadAutoConfiguration io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreAutoConfiguration io.awspring.cloud.autoconfigure.config.s3.S3ReloadAutoConfiguration +io.awspring.cloud.autoconfigure.kinesis.KinesisAutoConfiguration +io.awspring.cloud.autoconfigure.kinesis.KinesisProducerAutoConfiguration diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java new file mode 100644 index 000000000..03fdbf00e --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfigurationTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; +import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import java.net.URI; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +/** + * Tests for {@link KinesisAutoConfiguration}. + * + * @author Matej Nedic + */ +class KinesisAutoConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", + "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") + .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); + + @Test + void disableKinesisIntegration() { + this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.enabled:false").run(context -> { + assertThat(context).doesNotHaveBean(KinesisAsyncClient.class); + }); + } + + @Test + void withCustomEndpoint() { + this.contextRunner.withPropertyValues("spring.cloud.aws.kinesis.endpoint:http://localhost:8090") + .run(context -> { + ConfiguredAwsClient client = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class)); + assertThat(client.getEndpoint()).isEqualTo(URI.create("http://localhost:8090")); + assertThat(client.isEndpointOverridden()).isTrue(); + }); + } +} diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java new file mode 100644 index 000000000..a4043f14f --- /dev/null +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/kinesis/KinesisClientCustomizerTests.java @@ -0,0 +1,72 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.autoconfigure.kinesis; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.autoconfigure.ConfiguredAwsClient; +import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration; +import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration; +import java.time.Duration; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +public class KinesisClientCustomizerTests { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withPropertyValues("spring.cloud.aws.region.static:eu-west-1", + "spring.cloud.aws.credentials.access-key:noop", "spring.cloud.aws.credentials.secret-key:noop") + .withConfiguration(AutoConfigurations.of(AwsAutoConfiguration.class, RegionProviderAutoConfiguration.class, + CredentialsProviderAutoConfiguration.class, KinesisAutoConfiguration.class)); + + @Test + void customKinesisClientCustomizer() { + contextRunner.withUserConfiguration(KinesisClientCustomizerTests.CustomizerConfig.class).run(context -> { + ConfiguredAwsClient kinesisAsyncClient = new ConfiguredAwsClient(context.getBean(KinesisAsyncClient.class)); + assertThat(kinesisAsyncClient.getApiCallTimeout()).describedAs("sets property from first customizer") + .isEqualTo(Duration.ofMillis(2001)); + assertThat(kinesisAsyncClient.getApiCallAttemptTimeout()) + .describedAs("sets property from second customizer").isEqualTo(Duration.ofMillis(2002)); + }); + } + + @Configuration(proxyBeanMethods = false) + static class CustomizerConfig { + + @Bean + KinesisAsyncClientCustomizer kinesisClientCustomizer() { + return builder -> { + builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> { + c.apiCallTimeout(Duration.ofMillis(2001)); + })); + }; + } + + @Bean + KinesisAsyncClientCustomizer kinesisClientCustomizer2() { + return builder -> { + builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> { + c.apiCallAttemptTimeout(Duration.ofMillis(2002)); + })); + }; + } + } +} diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml index 210d2d436..eb25f51c9 100644 --- a/spring-cloud-aws-dependencies/pom.xml +++ b/spring-cloud-aws-dependencies/pom.xml @@ -275,11 +275,13 @@ software.amazon.kinesis amazon-kinesis-client ${kcl.version} + true software.amazon.kinesis amazon-kinesis-producer ${kpl.version} + true diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml new file mode 100644 index 000000000..ef90ec96b --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library/pom.xml @@ -0,0 +1,33 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + + 4.0.0 + + spring-cloud-aws-starter-kinesis-client-library + Spring Cloud AWS Kinesis Client Library + Spring Cloud AWS Kinesis Client Library + + + + io.awspring.cloud + spring-cloud-aws-starter + + + software.amazon.kinesis + amazon-kinesis-client + + + software.amazon.awssdk + kinesis + + + diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml new file mode 100644 index 000000000..92fa24ae8 --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer/pom.xml @@ -0,0 +1,29 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + + 4.0.0 + + spring-cloud-aws-starter-kinesis-producer + Spring Cloud AWS Kinesis Producer Starter + Spring Cloud AWS Kinesis Producer Starter + + + + io.awspring.cloud + spring-cloud-aws-starter + + + software.amazon.kinesis + amazon-kinesis-producer + + + diff --git a/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml new file mode 100644 index 000000000..a9f02f837 --- /dev/null +++ b/spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis/pom.xml @@ -0,0 +1,29 @@ + + + + + spring-cloud-aws + io.awspring.cloud + 4.0.0-SNAPSHOT + ../../pom.xml + + + 4.0.0 + + spring-cloud-aws-starter-kinesis + Spring Cloud AWS Kinesis Starter + Spring Cloud AWS Kinesis Starter + + + + io.awspring.cloud + spring-cloud-aws-starter + + + software.amazon.awssdk + kinesis + + +