Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ The Cloud Foundry Java Client has two active versions. The `5.x` line is compati
> .build());
> ```

> [!NOTE]
> **Operations API users:** `Applications.logs(ApplicationLogsRequest)` now uses Log Cache under the hood for recent logs (the default). No migration is needed at the Operations layer.

[loggregator]: https://github.com/cloudfoundry/loggregator
[log-cache-api]: https://github.com/cloudfoundry/log-cache

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.cloudfoundry.operations.applications;

import org.cloudfoundry.doppler.LogMessage;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.ReadRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -128,19 +126,11 @@ public interface Applications {
@Deprecated
Flux<LogMessage> logs(LogsRequest request);

/**
* List the applications logs from logCacheClient.
* If no messages are available, an empty Flux is returned.
*
* @param request the application logs request
* @return the applications logs
*/
Flux<Log> logsRecent(ReadRequest request);

/**
* List the applications logs.
* Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3}
* and {@code TAS < 4.0}.
* Uses Log Cache under the hood when {@link ApplicationLogsRequest#getRecent()} is {@code true}.
* Streaming logs still use Doppler, which is not available in deployments following
* <a href="https://docs.cloudfoundry.org/loggregator/architecture.html#shared-nothing-architecture">shared-nothing architecture</a>.
*
* @param request the application logs request
* @return the applications logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@
import org.cloudfoundry.doppler.RecentLogsRequest;
import org.cloudfoundry.doppler.StreamRequest;
import org.cloudfoundry.logcache.v1.EnvelopeBatch;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.operations.util.OperationsLogging;
Expand Down Expand Up @@ -558,31 +557,25 @@ public Flux<LogMessage> logs(LogsRequest request) {
.checkpoint();
}

@Override
public Flux<Log> logsRecent(ReadRequest request) {
return getRecentLogsLogCache(this.logCacheClient, request)
.transform(OperationsLogging.log("Get Application Logs"))
.checkpoint();
}

@Override
public Flux<ApplicationLog> logs(ApplicationLogsRequest request) {
return logs(LogsRequest.builder()
.name(request.getName())
.recent(request.getRecent())
.build())
.map(
logMessage ->
ApplicationLog.builder()
.sourceId(logMessage.getApplicationId())
.sourceType(logMessage.getSourceType())
.instanceId(logMessage.getSourceInstance())
.message(logMessage.getMessage())
.timestamp(logMessage.getTimestamp())
.logType(
ApplicationLogType.from(
logMessage.getMessageType().name()))
.build());
if (request.getRecent() == null || request.getRecent()) {
return Mono.zip(this.cloudFoundryClient, this.spaceId)
.flatMap(
function(
(cloudFoundryClient, spaceId) ->
getApplicationId(
cloudFoundryClient,
request.getName(),
spaceId)))
.flatMapMany(
applicationId -> getLogsLogCache(this.logCacheClient, applicationId))
.transform(OperationsLogging.log("Get Application Logs"))
.checkpoint();
} else {
return logs(LogsRequest.builder().name(request.getName()).recent(false).build())
.map(DefaultApplications::toApplicationLog);
}
}

@Override
Expand Down Expand Up @@ -1637,12 +1630,30 @@ private static Flux<LogMessage> getLogs(
}
}

private static Flux<Log> getRecentLogsLogCache(
Mono<LogCacheClient> logCacheClient, ReadRequest readRequest) {
return requestLogsRecentLogCache(logCacheClient, readRequest)
.flatMapIterable(EnvelopeBatch::getBatch)
private static Flux<ApplicationLog> getLogsLogCache(
Mono<LogCacheClient> logCacheClient, String applicationId) {
return requestLogsRecentLogCache(logCacheClient, applicationId)
.filter(e -> e.getLog() != null)
.sort(LOG_MESSAGE_COMPARATOR_LOG_CACHE)
.mapNotNull(org.cloudfoundry.logcache.v1.Envelope::getLog);
.map(
envelope ->
ApplicationLog.builder()
.sourceId(
Optional.ofNullable(envelope.getSourceId())
.orElse(""))
.sourceType(
envelope.getTags().getOrDefault("source_type", ""))
.instanceId(
Optional.ofNullable(envelope.getInstanceId())
.orElse(""))
.message(envelope.getLog().getPayloadAsText())
.timestamp(
Optional.ofNullable(envelope.getTimestamp())
.orElse(0L))
.logType(
ApplicationLogType.from(
envelope.getLog().getType().name()))
.build());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -2538,12 +2549,14 @@ private static Flux<Envelope> requestLogsRecent(
RecentLogsRequest.builder().applicationId(applicationId).build()));
}

private static Mono<EnvelopeBatch> requestLogsRecentLogCache(
Mono<LogCacheClient> logCacheClient, ReadRequest readRequest) {
return logCacheClient.flatMap(
client ->
client.read(readRequest)
.flatMap(response -> Mono.justOrEmpty(response.getEnvelopes())));
private static Flux<org.cloudfoundry.logcache.v1.Envelope> requestLogsRecentLogCache(
Mono<LogCacheClient> logCacheClient, String applicationId) {
return logCacheClient
.flatMap(
client ->
client.read(ReadRequest.builder().sourceId(applicationId).build()))
.flatMap(response -> Mono.justOrEmpty(response.getEnvelopes()))
.flatMapIterable(EnvelopeBatch::getBatch);
}

private static Flux<Envelope> requestLogsStream(
Expand Down Expand Up @@ -2951,6 +2964,17 @@ private static Mono<AbstractApplicationResource> stopApplicationIfNotStopped(
: Mono.just(resource);
}

private static ApplicationLog toApplicationLog(LogMessage logMessage) {
return ApplicationLog.builder()
.sourceId(logMessage.getApplicationId())
.sourceType(logMessage.getSourceType())
.instanceId(logMessage.getSourceInstance())
.message(logMessage.getMessage())
.timestamp(logMessage.getTimestamp())
.logType(ApplicationLogType.from(logMessage.getMessageType().name()))
.build();
}

private static ApplicationDetail toApplicationDetail(
List<String> buildpacks,
SummaryApplicationResponse summaryApplicationResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -1367,18 +1369,25 @@ void logsRecentDoppler() {
}

@Test
void logsRecentLogCache() {
void logsLogCache() {
requestApplications(
this.cloudFoundryClient,
"test-application-name",
TEST_SPACE_ID,
"test-metadata-id");
requestLogsRecentLogCache(this.logCacheClient, "test-metadata-id", "test-payload");
requestLogsRecentLogCache(this.logCacheClient, "test-metadata-id");

this.applications
.logsRecent(ReadRequest.builder().sourceId("test-metadata-id").build())
.logs(ApplicationLogsRequest.builder().name("test-application-name").build())
.as(StepVerifier::create)
.expectNext(fill(Log.builder()).type(LogType.OUT).build())
.expectNextMatches(
log ->
log.getMessage().equals("test-payload")
&& log.getLogType() == ApplicationLogType.OUT
&& log.getSourceId().equals("test-sourceId")
&& log.getInstanceId().equals("test-instanceId")
&& log.getSourceType().equals("APP/PROC/WEB")
&& log.getTimestamp() == 1L)
.expectComplete()
.verify(Duration.ofSeconds(5));
}
Expand Down Expand Up @@ -5359,8 +5368,9 @@ private static void requestLogsRecent(DopplerClient dopplerClient, String applic
.build()));
}

private static void requestLogsRecentLogCache(
LogCacheClient logCacheClient, String sourceId, String payload) {
private static void requestLogsRecentLogCache(LogCacheClient logCacheClient, String sourceId) {
String base64Payload =
Base64.getEncoder().encodeToString("test-payload".getBytes(StandardCharsets.UTF_8));
when(logCacheClient.read(ReadRequest.builder().sourceId(sourceId).build()))
.thenReturn(
Mono.just(
Expand All @@ -5370,11 +5380,16 @@ private static void requestLogsRecentLogCache(
.batch(
Arrays.asList(
fill(Envelope.builder())
.tags(
Collections
.singletonMap(
"source_type",
"APP/PROC/WEB"))
.log(
Log
.builder()
.payload(
payload)
base64Payload)
.type(
LogType
.OUT)
Expand Down
Loading