Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ allprojects {
testImplementation("junit:junit:${junitVersion}")
testImplementation("org.apache.commons:commons-collections4:${commonsCollections4Version}")
testImplementation("org.mockito:mockito-core:${mockitoVersion}")
testImplementation("org.objenesis:objenesis:3.2")
}

clean.doLast {
Expand Down
22 changes: 17 additions & 5 deletions src/main/java/org/fisco/bcos/sdk/v3/client/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public class ClientImpl implements Client {
private GroupNodeIniConfig groupNodeIniConfig;
private CryptoSuite cryptoSuite;
private RpcJniObj rpcJniObj;
private boolean started;
private boolean stopped;
private boolean destroyed;

protected final ObjectMapper objectMapper = getObjectMapper();

Expand Down Expand Up @@ -1594,21 +1597,27 @@ public void getFilterLogsAsync(LogFilterResponse filter, RespCallback<LogWrapper
}

@Override
public void start() {
if (rpcJniObj != null) {
public synchronized void start() {
if (!destroyed && rpcJniObj != null && (!started || stopped)) {
rpcJniObj.start();
started = true;
stopped = false;
}
}

@Override
public void stop() {
if (rpcJniObj != null) {
public synchronized void stop() {
if (!destroyed && started && !stopped && rpcJniObj != null) {
rpcJniObj.stop();
stopped = true;
}
}

@Override
public void destroy() {
public synchronized void destroy() {
if (destroyed) {
return;
}
if (rpcJniObj != null) {
BcosSDKJniObj.destroy(rpcJniObj.getNativePointer());
rpcJniObj = null;
Expand All @@ -1617,6 +1626,9 @@ public void destroy() {
cryptoSuite.destroy();
cryptoSuite = null;
}
started = false;
stopped = true;
destroyed = true;
}

public static <T extends JsonRpcResponse<?>> ResponseCallback createResponseCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface EventSubscribe {
*/
static EventSubscribe build(String group, ConfigOption configOption) throws JniException {
Client client = Client.build(group, configOption);
return new EventSubscribeImp(client, configOption);
return new EventSubscribeImp(client, configOption, true);
}

/**
Expand Down
65 changes: 55 additions & 10 deletions src/main/java/org/fisco/bcos/sdk/v3/eventsub/EventSubscribeImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.math.BigInteger;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.fisco.bcos.sdk.jni.BcosSDKJniObj;
import org.fisco.bcos.sdk.jni.common.JniException;
import org.fisco.bcos.sdk.jni.event.EventSubJniObj;
import org.fisco.bcos.sdk.v3.client.Client;
Expand All @@ -37,14 +38,25 @@ public class EventSubscribeImp implements EventSubscribe {
private String groupId;
private ConfigOption configOption;
private CryptoSuite cryptoSuite;
private final Client ownerClient;
private final boolean ownsClient;
private EventSubJniObj eventSubJniObj;
private boolean stopped;
private boolean destroyed;

private final ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();

public EventSubscribeImp(Client client, ConfigOption configOption) throws JniException {
this(client, configOption, false);
}

EventSubscribeImp(Client client, ConfigOption configOption, boolean ownsClient)
throws JniException {
this.groupId = client.getGroup();
this.configOption = configOption;
this.cryptoSuite = client.getCryptoSuite();
this.ownerClient = client;
this.ownsClient = ownsClient;
this.eventSubJniObj = EventSubJniObj.build(client.getNativePointer());
this.configOption = client.getConfigOption();

Expand Down Expand Up @@ -186,30 +198,63 @@ public String subscribeEvent(EventSubParams params, EventSubCallback callback) {

@Override
public void unsubscribeEvent(String eventId) {
eventSubJniObj.unsubscribeEvent(eventId);
if (eventSubJniObj != null) {
eventSubJniObj.unsubscribeEvent(eventId);
}
}

@Override
public Set<String> getAllSubscribedEvents() {
// TODO: impl
return null;
if (eventSubJniObj == null) {
return Collections.emptySet();
}
Set<String> subscribedEvents = eventSubJniObj.getAllSubscribedEvents();
return subscribedEvents == null ? Collections.emptySet() : subscribedEvents;
}

@Override
public void start() {
eventSubJniObj.start();
public synchronized void start() {
if (destroyed) {
return;
}
ownerClient.start();
stopped = false;
}
Comment on lines 215 to 222

@Override
public void stop() {
eventSubJniObj.stop();
public synchronized void stop() {
if (destroyed || stopped) {
return;
}
unsubscribeAllEvents();
if (ownsClient) {
ownerClient.stop();
}
stopped = true;
}

@Override
public void destroy() {
public synchronized void destroy() {
if (destroyed) {
return;
}
stop();
if (eventSubJniObj != null) {
BcosSDKJniObj.destroy(eventSubJniObj.getNativePointer());
eventSubJniObj = null;
}
if (ownsClient) {
ownerClient.destroy();
}
destroyed = true;
}

private void unsubscribeAllEvents() {
Set<String> subscribedEvents = getAllSubscribedEvents();
if (subscribedEvents.isEmpty()) {
return;
}
for (String eventId : new HashSet<>(subscribedEvents)) {
unsubscribeEvent(eventId);
}
}
}
58 changes: 58 additions & 0 deletions src/test/java/org/fisco/bcos/sdk/v3/client/ClientImplTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.fisco.bcos.sdk.v3.client;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.lang.reflect.Field;
import org.fisco.bcos.sdk.jni.rpc.RpcJniObj;
import org.fisco.bcos.sdk.v3.crypto.CryptoSuite;
import org.junit.Test;
import org.objenesis.ObjenesisStd;

public class ClientImplTest {

private static ClientImpl allocateClientImpl() {
return new ObjenesisStd().newInstance(ClientImpl.class);
}

@Test
public void testStopIsIdempotent() throws Exception {
ClientImpl client = allocateClientImpl();
RpcJniObj rpcJniObj = mock(RpcJniObj.class);
setField(client, "rpcJniObj", rpcJniObj);
setBooleanField(client, "started", true);

client.stop();
client.stop();

verify(rpcJniObj, times(1)).stop();
}

@Test
public void testDestroyIsIdempotent() throws Exception {
ClientImpl client = allocateClientImpl();
CryptoSuite cryptoSuite = mock(CryptoSuite.class);
// rpcJniObj is left null so BcosSDKJniObj.destroy() static native call is skipped
setField(client, "cryptoSuite", cryptoSuite);
setBooleanField(client, "started", true);

client.destroy();
client.destroy();

verify(cryptoSuite, times(1)).destroy();
}

private static void setField(Object target, String fieldName, Object value) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
}

private static void setBooleanField(Object target, String fieldName, boolean value)
throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.setBoolean(target, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.fisco.bcos.sdk.v3.eventsub;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import org.fisco.bcos.sdk.jni.event.EventSubJniObj;
import org.fisco.bcos.sdk.v3.client.Client;
import org.junit.Test;
import org.objenesis.ObjenesisStd;

public class EventSubscribeImpTest {

/**
* Allocates an {@link EventSubscribeImp} without running its constructor (which would trigger a
* JNI call to {@code EventSubJniObj.build}), then injects the supplied fields via reflection.
*/
private static EventSubscribeImp allocateEventSubscribeImp(
Client client, EventSubJniObj eventSubJniObj, boolean ownsClient) throws Exception {
EventSubscribeImp instance = new ObjenesisStd().newInstance(EventSubscribeImp.class);
setField(instance, "ownerClient", client);
setBooleanField(instance, "ownsClient", ownsClient);
setField(instance, "eventSubJniObj", eventSubJniObj);
setField(instance, "groupId", "group0");
return instance;
}

@Test
public void testStartActivatesEventChannelForBothCases() throws Exception {
Client client = mock(Client.class);

// Borrowed client: start() should still delegate to ownerClient.start()
EventSubscribeImp borrowed =
allocateEventSubscribeImp(client, mock(EventSubJniObj.class), false);
borrowed.start();
verify(client, times(1)).start();

// Owned client: start() should also delegate to ownerClient.start()
EventSubscribeImp owned =
allocateEventSubscribeImp(client, mock(EventSubJniObj.class), true);
owned.start();
verify(client, times(2)).start();
}

@Test
public void testStopOnSharedClientOnlyUnsubscribesOnce() throws Exception {
Client client = mock(Client.class);
EventSubJniObj eventSubJniObj = mock(EventSubJniObj.class);
when(eventSubJniObj.getAllSubscribedEvents())
.thenReturn(new HashSet<>(Arrays.asList("event-a", "event-b")));

EventSubscribeImp eventSubscribe =
allocateEventSubscribeImp(client, eventSubJniObj, false);

eventSubscribe.stop();
eventSubscribe.stop();

verify(eventSubJniObj, times(1)).getAllSubscribedEvents();
verify(eventSubJniObj, times(1)).unsubscribeEvent("event-a");
verify(eventSubJniObj, times(1)).unsubscribeEvent("event-b");
verify(eventSubJniObj, never()).stop();
verify(client, never()).stop();
}

@Test
public void testDestroyOnOwnedClientDelegatesLifecycleOnce() throws Exception {
Client client = mock(Client.class);
EventSubJniObj eventSubJniObj = mock(EventSubJniObj.class);
when(eventSubJniObj.getAllSubscribedEvents())
.thenReturn(new HashSet<>(Arrays.asList("event-a")));

EventSubscribeImp eventSubscribe =
allocateEventSubscribeImp(client, eventSubJniObj, true);

eventSubscribe.destroy();
eventSubscribe.destroy();

verify(eventSubJniObj, times(1)).unsubscribeEvent("event-a");
verify(eventSubJniObj, never()).stop();
verify(client, times(1)).stop();
verify(client, times(1)).destroy();
}

private static void setField(Object target, String fieldName, Object value) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
}

private static void setBooleanField(Object target, String fieldName, boolean value)
throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.setBoolean(target, value);
}
}
Loading