From 0b89d7bba71219a5bf6476e9c63ab6f244612be4 Mon Sep 17 00:00:00 2001 From: Jeremy Bernard Date: Mon, 27 Apr 2026 09:06:35 +0200 Subject: [PATCH 1/3] fix: replace WebSocket listener mechanism with a slower polling mechanism --- .../iexec/core/chain/BlockchainListener.java | 67 +++++++++++++ .../chain/WebSocketBlockchainListener.java | 96 ------------------- ...ests.java => BlockchainListenerTests.java} | 4 +- 3 files changed, 69 insertions(+), 98 deletions(-) create mode 100644 src/main/java/com/iexec/core/chain/BlockchainListener.java delete mode 100644 src/main/java/com/iexec/core/chain/WebSocketBlockchainListener.java rename src/test/java/com/iexec/core/chain/{WebSocketBlockchainListenerTests.java => BlockchainListenerTests.java} (96%) diff --git a/src/main/java/com/iexec/core/chain/BlockchainListener.java b/src/main/java/com/iexec/core/chain/BlockchainListener.java new file mode 100644 index 00000000..42fcec96 --- /dev/null +++ b/src/main/java/com/iexec/core/chain/BlockchainListener.java @@ -0,0 +1,67 @@ +/* + * Copyright 2025-2026 IEXEC BLOCKCHAIN TECH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.iexec.core.chain; + +import com.iexec.core.chain.event.LatestBlockEvent; +import io.micrometer.core.instrument.Metrics; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.web3j.protocol.Web3j; +import org.web3j.protocol.core.DefaultBlockParameterName; +import org.web3j.protocol.core.methods.response.EthBlock; +import org.web3j.protocol.http.HttpService; +import org.web3j.utils.Async; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicLong; + +@Slf4j +@Service +public class BlockchainListener { + static final String LATEST_BLOCK_METRIC_NAME = "iexec.chain.block.latest"; + + private final ApplicationEventPublisher applicationEventPublisher; + private final Web3j web3Client; + private final AtomicLong lastSeenBlock; + + public BlockchainListener(final ApplicationEventPublisher applicationEventPublisher, + final ChainConfig chainConfig) { + this.applicationEventPublisher = applicationEventPublisher; + this.web3Client = Web3j.build(new HttpService(chainConfig.getNodeAddress()), + chainConfig.getBlockTime().toMillis(), Async.defaultExecutorService()); + lastSeenBlock = Metrics.gauge(LATEST_BLOCK_METRIC_NAME, new AtomicLong(0)); + } + + @Scheduled(fixedRate = 5000) + private void run() { + try { + final EthBlock.Block ethBlock = web3Client.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false).send().getBlock(); + final long blockNumber = ethBlock.getNumber().longValue(); + final String blockHash = ethBlock.getHash(); + final long blockTimestamp = ethBlock.getTimestamp().longValue(); + final Instant blockTimestampInstant = Instant.ofEpochSecond(blockTimestamp); + log.info("Last seen block [number:{}, hash:{}, timestamp:{}, instant:{}]", + blockNumber, blockHash, blockTimestamp, blockTimestampInstant); + lastSeenBlock.set(blockNumber); + applicationEventPublisher.publishEvent(new LatestBlockEvent(this, blockNumber, blockHash, blockTimestamp)); + } catch (Exception e) { + log.error("An error happened while fetching data on-chain", e); + } + } +} diff --git a/src/main/java/com/iexec/core/chain/WebSocketBlockchainListener.java b/src/main/java/com/iexec/core/chain/WebSocketBlockchainListener.java deleted file mode 100644 index e8ab3fab..00000000 --- a/src/main/java/com/iexec/core/chain/WebSocketBlockchainListener.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2025 IEXEC BLOCKCHAIN TECH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.iexec.core.chain; - -import com.iexec.core.chain.event.LatestBlockEvent; -import io.micrometer.core.instrument.Metrics; -import io.reactivex.Flowable; -import io.reactivex.disposables.Disposable; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; -import org.web3j.protocol.core.Request; -import org.web3j.protocol.core.methods.response.EthSubscribe; -import org.web3j.protocol.websocket.WebSocketService; -import org.web3j.protocol.websocket.events.NewHeadsNotification; -import org.web3j.utils.Numeric; - -import java.net.ConnectException; -import java.time.Instant; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -@Slf4j -@Service -public class WebSocketBlockchainListener { - static final String LATEST_BLOCK_METRIC_NAME = "iexec.chain.block.latest"; - - private static final String SUBSCRIBE_METHOD = "eth_subscribe"; - private static final String UNSUBSCRIBE_METHOD = "eth_unsubscribe"; - - private final ApplicationEventPublisher applicationEventPublisher; - private final WebSocketService webSocketService; - - private final AtomicLong lastSeenBlock; - private Disposable newHeads; - - public WebSocketBlockchainListener(final ApplicationEventPublisher applicationEventPublisher, - final ChainConfig chainConfig) { - this.applicationEventPublisher = applicationEventPublisher; - final String wsUrl = chainConfig.getNodeAddress().replace("http", "ws"); - this.webSocketService = new WebSocketService(wsUrl, false); - lastSeenBlock = Metrics.gauge(LATEST_BLOCK_METRIC_NAME, new AtomicLong(0)); - } - - @Scheduled(fixedRate = 5000) - private void run() throws ConnectException { - if (newHeads != null && !newHeads.isDisposed()) { - return; - } - - log.warn("web socket disconnection detected"); - webSocketService.connect(); - - final Request newHeadsRequest = new Request<>( - SUBSCRIBE_METHOD, - List.of("newHeads", Map.of()), - webSocketService, - EthSubscribe.class - ); - - final Flowable newHeadsEvents = webSocketService.subscribe( - newHeadsRequest, UNSUBSCRIBE_METHOD, NewHeadsNotification.class); - newHeads = newHeadsEvents.subscribe(this::processHead, this::handleError); - } - - private void processHead(final NewHeadsNotification event) { - final long blockNumber = Numeric.toBigInt(event.getParams().getResult().getNumber()).longValue(); - final String blockHash = event.getParams().getResult().getHash(); - final long blockTimestamp = Numeric.toBigInt(event.getParams().getResult().getTimestamp()).longValue(); - final Instant blockTimestampInstant = Instant.ofEpochSecond(blockTimestamp); - log.info("Last seen block [number:{}, hash:{}, timestamp:{}, instant:{}]", - blockNumber, blockHash, blockTimestamp, blockTimestampInstant); - lastSeenBlock.set(blockNumber); - applicationEventPublisher.publishEvent(new LatestBlockEvent(this, blockNumber, blockHash, blockTimestamp)); - } - - private void handleError(final Throwable t) { - log.error("An error happened during subscription", t); - } -} diff --git a/src/test/java/com/iexec/core/chain/WebSocketBlockchainListenerTests.java b/src/test/java/com/iexec/core/chain/BlockchainListenerTests.java similarity index 96% rename from src/test/java/com/iexec/core/chain/WebSocketBlockchainListenerTests.java rename to src/test/java/com/iexec/core/chain/BlockchainListenerTests.java index 82dbf2fe..e5999c4b 100644 --- a/src/test/java/com/iexec/core/chain/WebSocketBlockchainListenerTests.java +++ b/src/test/java/com/iexec/core/chain/BlockchainListenerTests.java @@ -33,13 +33,13 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; -import static com.iexec.core.chain.WebSocketBlockchainListener.LATEST_BLOCK_METRIC_NAME; +import static com.iexec.core.chain.BlockchainListener.LATEST_BLOCK_METRIC_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @Testcontainers @SpringBootTest(properties = "chain.out-of-service-threshold=PT30S") -class WebSocketBlockchainListenerTests { +class BlockchainListenerTests { private static final String CHAIN_SVC_NAME = "chain"; private static final int CHAIN_SVC_PORT = 8545; private static final String CONFIG_SVC_NAME = "config-server"; From 571217f7f1c174228fb3468b361c3eb07c5b80cd Mon Sep 17 00:00:00 2001 From: Jeremy Bernard Date: Mon, 27 Apr 2026 10:46:30 +0200 Subject: [PATCH 2/3] fix: improved algorithm and error handling --- .../iexec/core/chain/BlockchainListener.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/iexec/core/chain/BlockchainListener.java b/src/main/java/com/iexec/core/chain/BlockchainListener.java index 42fcec96..4438b473 100644 --- a/src/main/java/com/iexec/core/chain/BlockchainListener.java +++ b/src/main/java/com/iexec/core/chain/BlockchainListener.java @@ -25,6 +25,7 @@ import org.web3j.protocol.Web3j; import org.web3j.protocol.core.DefaultBlockParameterName; import org.web3j.protocol.core.methods.response.EthBlock; +import org.web3j.protocol.exceptions.JsonRpcError; import org.web3j.protocol.http.HttpService; import org.web3j.utils.Async; @@ -51,15 +52,21 @@ public BlockchainListener(final ApplicationEventPublisher applicationEventPublis @Scheduled(fixedRate = 5000) private void run() { try { - final EthBlock.Block ethBlock = web3Client.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false).send().getBlock(); - final long blockNumber = ethBlock.getNumber().longValue(); - final String blockHash = ethBlock.getHash(); - final long blockTimestamp = ethBlock.getTimestamp().longValue(); - final Instant blockTimestampInstant = Instant.ofEpochSecond(blockTimestamp); - log.info("Last seen block [number:{}, hash:{}, timestamp:{}, instant:{}]", - blockNumber, blockHash, blockTimestamp, blockTimestampInstant); - lastSeenBlock.set(blockNumber); - applicationEventPublisher.publishEvent(new LatestBlockEvent(this, blockNumber, blockHash, blockTimestamp)); + final EthBlock ethBlock = web3Client.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false).send(); + if (ethBlock.hasError()) { + throw new JsonRpcError(ethBlock.getError()); + } + final long blockNumber = ethBlock.getBlock().getNumber().longValue(); + final long lastSeenBlockNumber = lastSeenBlock.get(); + if (blockNumber > lastSeenBlockNumber) { + final String blockHash = ethBlock.getBlock().getHash(); + final long blockTimestamp = ethBlock.getBlock().getTimestamp().longValue(); + final Instant blockTimestampInstant = Instant.ofEpochSecond(blockTimestamp); + log.info("Last seen block [number:{}, hash:{}, timestamp:{}, instant:{}]", + blockNumber, blockHash, blockTimestamp, blockTimestampInstant); + lastSeenBlock.set(blockNumber); + applicationEventPublisher.publishEvent(new LatestBlockEvent(this, blockNumber, blockHash, blockTimestamp)); + } } catch (Exception e) { log.error("An error happened while fetching data on-chain", e); } From cb88d0951151ddaa2d3e64f52350fca3e4bf9a58 Mon Sep 17 00:00:00 2001 From: Jeremy Bernard Date: Mon, 27 Apr 2026 16:10:41 +0200 Subject: [PATCH 3/3] fix: unconstrain out-of-service-threshold value --- src/main/resources/application.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b6623917..53650536 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -52,7 +52,7 @@ wallet: chain: node-address: ${IEXEC_BLOCKCHAIN_NODE_ADDRESS:http://localhost:8545} - out-of-service-threshold: PT5S + out-of-service-threshold: PT15S pool-address: ${POOL_ADDRESS:0x365E7BABAa85eC61Dffe5b520763062e6C29dA27} start-block-number: ${IEXEC_START_BLOCK_NUMBER:0} gas-price-multiplier: ${IEXEC_GAS_PRICE_MULTIPLIER:1.0} # txs will be sent with networkGasPrice*gasPriceMultiplier, 4.0 means super fast