diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 1539293..edf85ea 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -16,7 +16,7 @@ jobs:
services:
throttr:
- image: ghcr.io/throttr/throttr:4.0.17-debug-${{ matrix.size }}
+ image: ghcr.io/throttr/throttr:5.0.8-debug-${{ matrix.size }}-AMD64-metrics-enabled
ports:
- 9000:9000
diff --git a/pom.xml b/pom.xml
index 13d1fde..443717f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,12 +1,11 @@
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
cl.throttr
sdk
- 4.1.1
+ 5.0.0
jar
Throttr SDK for Java
@@ -20,13 +19,6 @@
-
- 21
- 21
- UTF-8
-
-
-
zen0x7
@@ -49,12 +41,11 @@
-
-
- central
- https://repo.maven.apache.org/maven2
-
-
+
+ 21
+ 21
+ UTF-8
+
@@ -68,7 +59,6 @@
javax.annotation
javax.annotation-api
1.3.2
- compile
@@ -79,56 +69,40 @@
- org.jacoco
- org.jacoco.agent
- 0.8.13
- runtime
- test
+ io.netty
+ netty-all
+ 4.1.122.Final
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.5.3
+
+
org.jacoco
jacoco-maven-plugin
0.8.13
- instrument
+ prepare-agent
- instrument
+ prepare-agent
-
- restore-classes
-
- restore-instrumented-classes
-
- prepare-package
-
report
verify
report
-
- ${project.basedir}/jacoco.exec
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
- 3.5.3
-
- false
- 1
-
-
-
\ No newline at end of file
+
diff --git a/src/main/java/cl/throttr/ByteBufAccumulator.java b/src/main/java/cl/throttr/ByteBufAccumulator.java
new file mode 100644
index 0000000..ffe61a7
--- /dev/null
+++ b/src/main/java/cl/throttr/ByteBufAccumulator.java
@@ -0,0 +1,141 @@
+// Copyright (C) 2025 Ian Torres
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cl.throttr;
+
+import cl.throttr.enums.ValueSize;
+import cl.throttr.parsers.*;
+import cl.throttr.requests.PendingRequest;
+import cl.throttr.utils.Binary;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.function.Consumer;
+
+public class ByteBufAccumulator extends SimpleChannelInboundHandler {
+ private final Queue pending;
+ private final Map> subscriptions;
+ private final ValueSize size;
+ private ByteBuf buffer;
+
+ private final Map parsers;
+
+ public ByteBufAccumulator(Queue pending, Map> subscriptions, ValueSize size) {
+ this.pending = pending;
+ this.subscriptions = subscriptions;
+ this.size = size;
+ this.parsers = Map.ofEntries(
+ Map.entry(0x01, new StatusParser()),
+ Map.entry(0x02, new QueryParser(size)),
+ Map.entry(0x03, new StatusParser()),
+ Map.entry(0x04, new StatusParser()),
+ Map.entry(0x05, new StatusParser()),
+ Map.entry(0x06, new GetParser(size)),
+ Map.entry(0x07, new ListParser(size)),
+ Map.entry(0x08, new InfoParser()),
+ Map.entry(0x09, new StatParser()),
+ Map.entry(0x10, new StatsParser()),
+ Map.entry(0x11, new StatusParser()), // SUBSCRIBE
+ Map.entry(0x12, new StatusParser()), // UNSUBSCRIBE
+ Map.entry(0x13, new StatusParser()), // PUBLISH
+ Map.entry(0x14, new ConnectionsParser()),
+ Map.entry(0x15, new ConnectionParser()),
+ Map.entry(0x16, new ChannelsParser()),
+ Map.entry(0x17, new ChannelParser()),
+ Map.entry(0x18, new WhoamiParser())
+ );
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf incoming) {
+ if (buffer == null) {
+ buffer = incoming.alloc().buffer();
+ }
+ buffer.writeBytes(incoming);
+
+ while (true) {
+ if (buffer.readableBytes() < 1) return;
+
+ buffer.markReaderIndex();
+ int type = buffer.getUnsignedByte(buffer.readerIndex());
+
+ if (type == 0x19) {
+ if (!handleChannelMessage()) return;
+ continue;
+ }
+
+ if (!handlePendingRequest()) return;
+ }
+ }
+
+ private boolean handleChannelMessage() {
+ int readerIndex = buffer.readerIndex();
+
+ if (buffer.readableBytes() < 1 + size.getValue()) return false;
+
+ int channelSize = Byte.toUnsignedInt(buffer.getByte(readerIndex + 1));
+ int headerSize = 1 + 1 + size.getValue() + channelSize;
+
+ if (buffer.readableBytes() < headerSize) return false;
+
+ long payloadLength = Binary.read(buffer, readerIndex + 2, size);
+ if (buffer.readableBytes() < headerSize + payloadLength) return false;
+
+ byte[] channelBytes = new byte[channelSize];
+ buffer.getBytes(readerIndex + 2 + size.getValue(), channelBytes);
+ String channel = new String(channelBytes);
+
+ byte[] payloadBytes = new byte[(int) payloadLength];
+ buffer.getBytes(readerIndex + headerSize, payloadBytes);
+ String payload = new String(payloadBytes);
+
+ buffer.readerIndex(readerIndex + headerSize + (int) payloadLength);
+
+ Consumer callback = subscriptions.get(channel);
+ if (callback != null) {
+ callback.accept(payload);
+ }
+
+ return true;
+ }
+
+ private boolean handlePendingRequest() {
+ PendingRequest pendingRequest = pending.peek();
+ if (pendingRequest == null) {
+ buffer.resetReaderIndex();
+ return false;
+ }
+
+ int expectedType = pendingRequest.type();
+ ResponseParser parser = parsers.get(expectedType);
+ ReadResult result = parser.tryParse(buffer);
+ if (result == null) {
+ buffer.resetReaderIndex();
+ return false;
+ }
+
+ buffer.skipBytes(result.consumed());
+ pending.poll().future().complete(result.value());
+ return true;
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) {
+ if (buffer != null) buffer.release();
+ }
+}
diff --git a/src/main/java/cl/throttr/Connection.java b/src/main/java/cl/throttr/Connection.java
index 69055a2..8c086ba 100644
--- a/src/main/java/cl/throttr/Connection.java
+++ b/src/main/java/cl/throttr/Connection.java
@@ -15,248 +15,74 @@
package cl.throttr;
-import cl.throttr.enums.TTLType;
import cl.throttr.enums.ValueSize;
import cl.throttr.requests.*;
-import cl.throttr.responses.GetResponse;
-import cl.throttr.responses.QueryResponse;
-import cl.throttr.responses.StatusResponse;
-import cl.throttr.utils.Binary;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
-/**
- * Connection
- */
public class Connection implements AutoCloseable {
- /**
- * Socket
- */
- private final Socket socket;
-
- /**
- * Size
- */
private final ValueSize size;
+ private final Channel channel;
+ private final EventLoopGroup group;
+ private final Queue pending = new ConcurrentLinkedQueue<>();
+ private final Map> subscriptions = new ConcurrentHashMap<>();
+ private final ByteBufAccumulator accumulator;
- /**
- * Out
- */
- private final OutputStream out;
-
- /**
- * In
- */
- private final InputStream in;
-
- /**
- * Constructor
- *
- * @param host
- * @param port
- * @param size
- * @throws IOException
- */
- public Connection(String host, int port, ValueSize size) throws IOException {
- this.socket = new Socket(host, port);
- this.socket.setTcpNoDelay(true);
- this.out = socket.getOutputStream();
- this.in = socket.getInputStream();
+ public Connection(String host, int port, ValueSize size) throws InterruptedException {
this.size = size;
+ this.accumulator = new ByteBufAccumulator(this.pending, this.subscriptions, size);
+ this.group = new NioEventLoopGroup();
+
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(group)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .handler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ch.pipeline()
+ .addLast(accumulator);
+ }
+ });
+
+ ChannelFuture future = bootstrap.connect(host, port).sync();
+ this.channel = future.channel();
}
- /**
- * Send
- *
- * @param request
- * @return
- * @throws IOException
- */
- public Object send(Object request) throws IOException {
- if (socket.isClosed()) {
- throw new IOException("Socket is already closed");
- }
-
- if (request instanceof List> list) {
- ByteArrayOutputStream totalBuffer = new ByteArrayOutputStream();
- List types = new ArrayList<>();
-
- for (Object req : list) {
- byte[] buffer = getRequestBuffer(req, size);
- totalBuffer.write(buffer);
- types.add(Byte.toUnsignedInt(buffer[0]));
- }
-
- byte[] finalBuffer = totalBuffer.toByteArray();
- out.write(finalBuffer);
- out.flush();
-
- List