Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions google-cloud-pubsublite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@
<version>0.8</version>
</dependency>

<!-- Apache Kafka Client (optional, for Managed Kafka backend) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
<optional>true</optional>
</dependency>

<!--test dependencies-->
<dependency>
<groupId>com.google.truth</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.cloudpubsub;

/** Specifies the messaging backend to use for Publisher and Subscriber clients. */
public enum MessagingBackend {
/**
* Use Google Cloud Pub/Sub Lite (default). This is the traditional backend with zonal storage and
* predictable pricing.
*/
PUBSUB_LITE,

/**
* Use Google Cloud Managed Service for Apache Kafka. Provides Kafka-compatible API with Google
* Cloud management.
*/
MANAGED_KAFKA
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.internal.KafkaPartitionPublisherFactory;
import com.google.cloud.pubsublite.cloudpubsub.internal.WrappingPublisher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
Expand All @@ -52,6 +53,7 @@
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.CallOptions;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -70,7 +72,7 @@ public abstract class PublisherSettings {
// Required parameters.

/** The topic path to publish to. */
abstract TopicPath topicPath();
public abstract TopicPath topicPath();

// Optional parameters.
/** A KeyExtractor for getting the routing key from a message. */
Expand All @@ -80,16 +82,16 @@ public abstract class PublisherSettings {
abstract Optional<MessageTransformer<PubsubMessage, Message>> messageTransformer();

/** Batching settings for this publisher to use. Apply per-partition. */
abstract BatchingSettings batchingSettings();
public abstract BatchingSettings batchingSettings();

/**
* Whether idempotence is enabled, where the server will ensure that unique messages within a
* single publisher session are stored only once. Default true.
*/
abstract boolean enableIdempotence();
public abstract boolean enableIdempotence();

/** Whether request compression is enabled. Default true. */
abstract boolean enableCompression();
public abstract boolean enableCompression();

/** A provider for credentials. */
abstract CredentialsProvider credentialsProvider();
Expand All @@ -111,6 +113,17 @@ public abstract class PublisherSettings {
// For testing.
abstract SinglePartitionPublisherBuilder.Builder underlyingBuilder();

/** The messaging backend to use. Defaults to PUBSUB_LITE for backward compatibility. */
public abstract MessagingBackend messagingBackend();

/**
* Kafka-specific configuration properties. Only used when messagingBackend is MANAGED_KAFKA.
* Common properties include: - "bootstrap.servers": Kafka broker addresses - "compression.type":
* Compression algorithm (e.g., "snappy", "gzip") - "max.in.flight.requests.per.connection":
* Pipelining configuration
*/
public abstract Optional<Map<String, Object>> kafkaProperties();

/** Get a new builder for a PublisherSettings. */
public static Builder newBuilder() {
return new AutoValue_PublisherSettings.Builder()
Expand All @@ -120,7 +133,8 @@ public static Builder newBuilder() {
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
.setEnableIdempotence(true)
.setEnableCompression(true)
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder());
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder())
.setMessagingBackend(MessagingBackend.PUBSUB_LITE);
}

@AutoValue.Builder
Expand Down Expand Up @@ -169,6 +183,12 @@ public abstract Builder setMessageTransformer(
abstract Builder setUnderlyingBuilder(
SinglePartitionPublisherBuilder.Builder underlyingBuilder);

/** Sets the messaging backend. Defaults to PUBSUB_LITE. */
public abstract Builder setMessagingBackend(MessagingBackend backend);

/** Sets Kafka-specific properties. Only used when backend is MANAGED_KAFKA. */
public abstract Builder setKafkaProperties(Map<String, Object> properties);

public abstract PublisherSettings build();
}

Expand All @@ -185,6 +205,12 @@ private PublisherServiceClient newServiceClient() throws ApiException {
}

private PartitionPublisherFactory getPartitionPublisherFactory() {
// Check backend and return appropriate factory
if (messagingBackend() == MessagingBackend.MANAGED_KAFKA) {
return new KafkaPartitionPublisherFactory(this);
}

// Existing Pub/Sub Lite implementation
PublisherServiceClient client = newServiceClient();
ByteString publisherClientId = UuidBuilder.toByteString(UuidBuilder.generate());
return new PartitionPublisherFactory() {
Expand Down Expand Up @@ -241,6 +267,11 @@ private AdminClient getAdminClient() throws ApiException {

@SuppressWarnings("CheckReturnValue")
Publisher instantiate() throws ApiException {
// For Kafka backend, use simpler publisher that doesn't need partition watching
if (messagingBackend() == MessagingBackend.MANAGED_KAFKA) {
return new com.google.cloud.pubsublite.cloudpubsub.internal.KafkaPublisher(this);
}

if (batchingSettings().getFlowControlSettings().getMaxOutstandingElementCount() != null
|| batchingSettings().getFlowControlSettings().getMaxOutstandingRequestBytes() != null) {
throw new CheckedApiException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

/** Adapts a Kafka producer to the internal Publisher interface for a specific partition. */
public class KafkaPartitionPublisher extends ProxyService implements Publisher<MessageMetadata> {

private final KafkaProducer<byte[], byte[]> producer;
private final String topicName;
private final Partition partition;
private final PublisherSettings settings;
private final ConcurrentLinkedQueue<SettableApiFuture<MessageMetadata>> pendingFutures;

public KafkaPartitionPublisher(
KafkaProducer<byte[], byte[]> producer,
String topicName,
Partition partition,
PublisherSettings settings) {
this.producer = producer;
this.topicName = topicName;
this.partition = partition;
this.settings = settings;
this.pendingFutures = new ConcurrentLinkedQueue<>();
}

@Override
public ApiFuture<MessageMetadata> publish(PubSubMessage message) {
if (state() == State.FAILED) {
return ApiFutures.immediateFailedFuture(
new CheckedApiException("Publisher has failed", Code.FAILED_PRECONDITION).underlying);
}

try {
// Convert to Kafka ProducerRecord
ProducerRecord<byte[], byte[]> record = convertToKafkaRecord(message);

// Create future for response
SettableApiFuture<MessageMetadata> future = SettableApiFuture.create();
pendingFutures.add(future);

// Send to Kafka
producer.send(
record,
(metadata, exception) -> {
pendingFutures.remove(future);

if (exception != null) {
CheckedApiException apiException = new CheckedApiException(exception, Code.INTERNAL);
future.setException(apiException.underlying);

// If this is a permanent error, fail the publisher
if (isPermanentError(exception)) {
onPermanentError(apiException);
}
} else {
// Convert Kafka metadata to MessageMetadata
MessageMetadata messageMetadata =
MessageMetadata.of(
Partition.of(metadata.partition()), Offset.of(metadata.offset()));
future.set(messageMetadata);
}
});

return future;

} catch (Exception e) {
CheckedApiException apiException = new CheckedApiException(e, Code.INTERNAL);
onPermanentError(apiException);
return ApiFutures.immediateFailedFuture(apiException.underlying);
}
}

@Override
public void flush() {
producer.flush();
}

@Override
public void cancelOutstandingPublishes() {
CheckedApiException exception =
new CheckedApiException("Publisher is shutting down", Code.CANCELLED);

pendingFutures.forEach(future -> future.setException(exception.underlying));
pendingFutures.clear();
}

// Note: doStart() and doStop() are handled by ProxyService

private ProducerRecord<byte[], byte[]> convertToKafkaRecord(PubSubMessage message) {
// Extract key - use ordering key if available
byte[] key = message.getKey().isEmpty() ? null : message.getKey().toByteArray();

// Create record with explicit partition
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<byte[], byte[]>(
topicName,
Integer.valueOf((int) partition.value()), // Use explicit partition
key,
message.getData().toByteArray());

// Convert attributes to headers
List<Header> headers = new ArrayList<>();
message.getAttributesMap().forEach((k, v) -> headers.add(new RecordHeader(k, v.toByteArray())));

// Add event time as header if present
if (message.hasEventTime()) {
headers.add(
new RecordHeader(
"pubsublite.event_time",
String.valueOf(message.getEventTime().getSeconds()).getBytes()));
}

headers.forEach(record.headers()::add);

return record;
}

private boolean isPermanentError(Exception e) {
// Determine if error is permanent and should fail the publisher
String message = e.getMessage();
return message != null
&& (message.contains("InvalidTopicException")
|| message.contains("AuthorizationException")
|| message.contains("SecurityDisabledException"));
}
}
Loading
Loading