diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1539293..edf85ea 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,7 +16,7 @@ jobs: services: throttr: - image: ghcr.io/throttr/throttr:4.0.17-debug-${{ matrix.size }} + image: ghcr.io/throttr/throttr:5.0.8-debug-${{ matrix.size }}-AMD64-metrics-enabled ports: - 9000:9000 diff --git a/pom.xml b/pom.xml index 13d1fde..443717f 100644 --- a/pom.xml +++ b/pom.xml @@ -1,12 +1,11 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 cl.throttr sdk - 4.1.1 + 5.0.0 jar Throttr SDK for Java @@ -20,13 +19,6 @@ - - 21 - 21 - UTF-8 - - - zen0x7 @@ -49,12 +41,11 @@ - - - central - https://repo.maven.apache.org/maven2 - - + + 21 + 21 + UTF-8 + @@ -68,7 +59,6 @@ javax.annotation javax.annotation-api 1.3.2 - compile @@ -79,56 +69,40 @@ - org.jacoco - org.jacoco.agent - 0.8.13 - runtime - test + io.netty + netty-all + 4.1.122.Final + + org.apache.maven.plugins + maven-surefire-plugin + 3.5.3 + + org.jacoco jacoco-maven-plugin 0.8.13 - instrument + prepare-agent - instrument + prepare-agent - - restore-classes - - restore-instrumented-classes - - prepare-package - report verify report - - ${project.basedir}/jacoco.exec - - - - org.apache.maven.plugins - maven-surefire-plugin - 3.5.3 - - false - 1 - - - \ No newline at end of file + diff --git a/src/main/java/cl/throttr/ByteBufAccumulator.java b/src/main/java/cl/throttr/ByteBufAccumulator.java new file mode 100644 index 0000000..ffe61a7 --- /dev/null +++ b/src/main/java/cl/throttr/ByteBufAccumulator.java @@ -0,0 +1,141 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr; + +import cl.throttr.enums.ValueSize; +import cl.throttr.parsers.*; +import cl.throttr.requests.PendingRequest; +import cl.throttr.utils.Binary; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import java.util.Map; +import java.util.Queue; +import java.util.function.Consumer; + +public class ByteBufAccumulator extends SimpleChannelInboundHandler { + private final Queue pending; + private final Map> subscriptions; + private final ValueSize size; + private ByteBuf buffer; + + private final Map parsers; + + public ByteBufAccumulator(Queue pending, Map> subscriptions, ValueSize size) { + this.pending = pending; + this.subscriptions = subscriptions; + this.size = size; + this.parsers = Map.ofEntries( + Map.entry(0x01, new StatusParser()), + Map.entry(0x02, new QueryParser(size)), + Map.entry(0x03, new StatusParser()), + Map.entry(0x04, new StatusParser()), + Map.entry(0x05, new StatusParser()), + Map.entry(0x06, new GetParser(size)), + Map.entry(0x07, new ListParser(size)), + Map.entry(0x08, new InfoParser()), + Map.entry(0x09, new StatParser()), + Map.entry(0x10, new StatsParser()), + Map.entry(0x11, new StatusParser()), // SUBSCRIBE + Map.entry(0x12, new StatusParser()), // UNSUBSCRIBE + Map.entry(0x13, new StatusParser()), // PUBLISH + Map.entry(0x14, new ConnectionsParser()), + Map.entry(0x15, new ConnectionParser()), + Map.entry(0x16, new ChannelsParser()), + Map.entry(0x17, new ChannelParser()), + Map.entry(0x18, new WhoamiParser()) + ); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf incoming) { + if (buffer == null) { + buffer = incoming.alloc().buffer(); + } + buffer.writeBytes(incoming); + + while (true) { + if (buffer.readableBytes() < 1) return; + + buffer.markReaderIndex(); + int type = buffer.getUnsignedByte(buffer.readerIndex()); + + if (type == 0x19) { + if (!handleChannelMessage()) return; + continue; + } + + if (!handlePendingRequest()) return; + } + } + + private boolean handleChannelMessage() { + int readerIndex = buffer.readerIndex(); + + if (buffer.readableBytes() < 1 + size.getValue()) return false; + + int channelSize = Byte.toUnsignedInt(buffer.getByte(readerIndex + 1)); + int headerSize = 1 + 1 + size.getValue() + channelSize; + + if (buffer.readableBytes() < headerSize) return false; + + long payloadLength = Binary.read(buffer, readerIndex + 2, size); + if (buffer.readableBytes() < headerSize + payloadLength) return false; + + byte[] channelBytes = new byte[channelSize]; + buffer.getBytes(readerIndex + 2 + size.getValue(), channelBytes); + String channel = new String(channelBytes); + + byte[] payloadBytes = new byte[(int) payloadLength]; + buffer.getBytes(readerIndex + headerSize, payloadBytes); + String payload = new String(payloadBytes); + + buffer.readerIndex(readerIndex + headerSize + (int) payloadLength); + + Consumer callback = subscriptions.get(channel); + if (callback != null) { + callback.accept(payload); + } + + return true; + } + + private boolean handlePendingRequest() { + PendingRequest pendingRequest = pending.peek(); + if (pendingRequest == null) { + buffer.resetReaderIndex(); + return false; + } + + int expectedType = pendingRequest.type(); + ResponseParser parser = parsers.get(expectedType); + ReadResult result = parser.tryParse(buffer); + if (result == null) { + buffer.resetReaderIndex(); + return false; + } + + buffer.skipBytes(result.consumed()); + pending.poll().future().complete(result.value()); + return true; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + if (buffer != null) buffer.release(); + } +} diff --git a/src/main/java/cl/throttr/Connection.java b/src/main/java/cl/throttr/Connection.java index 69055a2..8c086ba 100644 --- a/src/main/java/cl/throttr/Connection.java +++ b/src/main/java/cl/throttr/Connection.java @@ -15,248 +15,74 @@ package cl.throttr; -import cl.throttr.enums.TTLType; import cl.throttr.enums.ValueSize; import cl.throttr.requests.*; -import cl.throttr.responses.GetResponse; -import cl.throttr.responses.QueryResponse; -import cl.throttr.responses.StatusResponse; -import cl.throttr.utils.Binary; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Consumer; -/** - * Connection - */ public class Connection implements AutoCloseable { - /** - * Socket - */ - private final Socket socket; - - /** - * Size - */ private final ValueSize size; + private final Channel channel; + private final EventLoopGroup group; + private final Queue pending = new ConcurrentLinkedQueue<>(); + private final Map> subscriptions = new ConcurrentHashMap<>(); + private final ByteBufAccumulator accumulator; - /** - * Out - */ - private final OutputStream out; - - /** - * In - */ - private final InputStream in; - - /** - * Constructor - * - * @param host - * @param port - * @param size - * @throws IOException - */ - public Connection(String host, int port, ValueSize size) throws IOException { - this.socket = new Socket(host, port); - this.socket.setTcpNoDelay(true); - this.out = socket.getOutputStream(); - this.in = socket.getInputStream(); + public Connection(String host, int port, ValueSize size) throws InterruptedException { this.size = size; + this.accumulator = new ByteBufAccumulator(this.pending, this.subscriptions, size); + this.group = new NioEventLoopGroup(); + + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline() + .addLast(accumulator); + } + }); + + ChannelFuture future = bootstrap.connect(host, port).sync(); + this.channel = future.channel(); } - /** - * Send - * - * @param request - * @return - * @throws IOException - */ - public Object send(Object request) throws IOException { - if (socket.isClosed()) { - throw new IOException("Socket is already closed"); - } - - if (request instanceof List list) { - ByteArrayOutputStream totalBuffer = new ByteArrayOutputStream(); - List types = new ArrayList<>(); - - for (Object req : list) { - byte[] buffer = getRequestBuffer(req, size); - totalBuffer.write(buffer); - types.add(Byte.toUnsignedInt(buffer[0])); - } - - byte[] finalBuffer = totalBuffer.toByteArray(); - out.write(finalBuffer); - out.flush(); - - List responses = new ArrayList<>(); - for (int type : types) { - int head = in.read(); - if (head == -1) { - throw new IOException("Connection closed while reading response."); - } - - Object response = switch (type) { - case 0x02 -> readQueryResponse(head); - case 0x06 -> readGetResponse(head); - default -> readStatusResponse(head); - }; - responses.add(response); - } - - return responses; - } - - byte[] buffer = getRequestBuffer(request, size); - - out.write(buffer); - out.flush(); - - int head = in.read(); - if (head == -1) { - throw new IOException("Connection closed while reading response head."); - } - - int type = Byte.toUnsignedInt(buffer[0]); - - return switch (type) { - case 0x02 -> readQueryResponse(head); - case 0x06 -> readGetResponse(head); - default -> readStatusResponse(head); - }; - } - - /** - * Get request buffer - * - * @param request - * @param size - * @return byte[] - */ - public static byte[] getRequestBuffer(Object request, ValueSize size) { - return switch (request) { - case InsertRequest insert -> insert.toBytes(size); - case QueryRequest query -> query.toBytes(); - case UpdateRequest update -> update.toBytes(size); - case PurgeRequest purge -> purge.toBytes(); - case SetRequest set -> set.toBytes(size); - case GetRequest get -> get.toBytes(); - case null, default -> throw new IllegalArgumentException("Unsupported request type"); - }; + public Object send(Object request) throws IOException, InterruptedException, ExecutionException { + return Dispatcher.dispatch(channel, pending, request, size); } - /** - * Read full response - * - * @param head - * @return StatusResponse - * @throws IOException - */ - private QueryResponse readQueryResponse(int head) throws IOException { - if (head != 0x01) { - return new QueryResponse(false, 0, TTLType.SECONDS, 0); - } - - int expected = size.getValue() * 2 + 1; - byte[] merged = new byte[expected]; - int offset = 0; + public void subscribe(String name, Consumer callback) { + subscriptions.put(name, callback); - while (offset < expected) { - int read = in.read(merged, offset, expected - offset); - if (read == -1) { - throw new IOException("Unexpected EOF while reading full response."); - } - offset += read; - } + SubscribeRequest request = new SubscribeRequest(name); + byte[] buffer = request.toBytes(); - byte[] full = new byte[1 + expected]; - full[0] = (byte) head; - System.arraycopy(merged, 0, full, 1, expected); - - return QueryResponse.fromBytes(full, size); + channel.writeAndFlush(Unpooled.wrappedBuffer(buffer)).syncUninterruptibly(); } - /** - * Read get response - * - * @param head - * @return GetResponse - * @throws IOException - */ - private GetResponse readGetResponse(int head) throws IOException { - if (head != 0x01) { - return new GetResponse(false, null, 0, null); - } - - // Total: 1 byte (ttlType) + N bytes (ttl) + N bytes (valueSize) - int headerSize = 1 + size.getValue() + size.getValue(); - byte[] header = new byte[headerSize]; - int offset = 0; - - while (offset < headerSize) { - int read = in.read(header, offset, headerSize - offset); - if (read == -1) { - throw new IOException("Unexpected EOF while reading GET header"); - } - offset += read; - } + public void unsubscribe(String name) { + subscriptions.remove(name); - ByteBuffer buffer = ByteBuffer.wrap(header); - buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); - - TTLType.fromByte(buffer.get()); - Binary.read(buffer, size); - long valueSize = Binary.read(buffer, size); - - if (valueSize > Integer.MAX_VALUE) { - throw new IOException("Value too large to handle in memory: " + valueSize); - } - - byte[] value = new byte[(int) valueSize]; - offset = 0; - while (offset < value.length) { - int read = in.read(value, offset, value.length - offset); - if (read == -1) { - throw new IOException("Unexpected EOF while reading GET value"); - } - offset += read; - } - - byte[] full = new byte[1 + header.length + value.length]; - full[0] = (byte) head; - System.arraycopy(header, 0, full, 1, header.length); - System.arraycopy(value, 0, full, 1 + header.length, value.length); - - return GetResponse.fromBytes(full, size); - } + UnsubscribeRequest request = new UnsubscribeRequest(name); + byte[] buffer = request.toBytes(); - /** - * Read simple response - * - * @param head - * @return StatusResponse - * @throws IOException - */ - private StatusResponse readStatusResponse(int head) { - return new StatusResponse(head == 0x01); + channel.writeAndFlush(Unpooled.wrappedBuffer(buffer)).syncUninterruptibly(); } - /** - * Close - * - * @throws IOException - */ @Override - public void close() throws IOException { - socket.close(); + public void close() throws InterruptedException { + if (channel != null) channel.close().sync(); + group.shutdownGracefully(); } -} \ No newline at end of file +} diff --git a/src/main/java/cl/throttr/ReadResult.java b/src/main/java/cl/throttr/ReadResult.java new file mode 100644 index 0000000..c5ebc94 --- /dev/null +++ b/src/main/java/cl/throttr/ReadResult.java @@ -0,0 +1,18 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr; + +public record ReadResult(Object value, int consumed) {} \ No newline at end of file diff --git a/src/main/java/cl/throttr/Service.java b/src/main/java/cl/throttr/Service.java index ea10a08..36c87a1 100644 --- a/src/main/java/cl/throttr/Service.java +++ b/src/main/java/cl/throttr/Service.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -76,9 +77,9 @@ public Service(String host, int port, ValueSize size, int maxConnections) { /** * Connect * - * @throws IOException Sockets can fail + * @throws InterruptedException Can be interrupted */ - public void connect() throws IOException { + public void connect() throws InterruptedException { for (int i = 0; i < maxConnections; i++) { Connection conn = new Connection(host, port, size); connections.add(conn); @@ -91,7 +92,7 @@ public void connect() throws IOException { * @param request Requests * @return Object */ - public Object send(Object request) throws IOException { + public Object send(Object request) throws IOException, InterruptedException, ExecutionException { if (connections.isEmpty()) { throw new IllegalStateException("There are no available connections."); } @@ -102,11 +103,21 @@ public Object send(Object request) throws IOException { } } + /** + * Get a direct connection (for subscription or fixed binding) + * + * @return Connection + */ + public Connection getConnection() { + int index = roundRobinIndex.getAndUpdate(i -> (i + 1) % connections.size()); + return connections.get(index); + } + /** * Close */ @Override - public void close() throws IOException { + public void close() throws InterruptedException { for (Connection conn : connections) { conn.close(); } diff --git a/src/main/java/cl/throttr/enums/RequestType.java b/src/main/java/cl/throttr/enums/RequestType.java index e406ef1..5c03a12 100644 --- a/src/main/java/cl/throttr/enums/RequestType.java +++ b/src/main/java/cl/throttr/enums/RequestType.java @@ -47,7 +47,72 @@ public enum RequestType { /** * Get */ - GET(0x06); + GET(0x06), + + /** + * List + */ + LIST(0x07), + + /** + * List + */ + INFO(0x08), + + /** + * Stat + */ + STAT(0x09), + + /** + * Stats + */ + STATS(0x10), + + /** + * Subscribe + */ + SUBSCRIBE(0x11), + + /** + * Unsubscribe + */ + UNSUBSCRIBE(0x12), + + /** + * Publish + */ + PUBLISH(0x13), + + /** + * Connections + */ + CONNECTIONS(0x14), + + /** + * Connection + */ + CONNECTION(0x15), + + /** + * Channels + */ + CHANNELS(0x16), + + /** + * Channel + */ + CHANNEL(0x17), + + /** + * WhoAmI + */ + WHOAMI(0x18), + + /** + * Event + */ + EVENT(0x19); /** * Value diff --git a/src/main/java/cl/throttr/parsers/ChannelParser.java b/src/main/java/cl/throttr/parsers/ChannelParser.java new file mode 100644 index 0000000..286add7 --- /dev/null +++ b/src/main/java/cl/throttr/parsers/ChannelParser.java @@ -0,0 +1,69 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.enums.ValueSize; +import cl.throttr.responses.ChannelConnectionItem; +import cl.throttr.responses.ChannelResponse; +import cl.throttr.utils.Binary; +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.HexFormat; +import java.util.List; + +public class ChannelParser implements ResponseParser { + private static final int CONNECTION_ENTRY_SIZE = 40; + + @Override + public ReadResult tryParse(ByteBuf buf) { + int start = buf.readerIndex(); + + if (buf.readableBytes() < 1) return null; + + byte status = buf.getByte(start); + if (status != 0x01) { + return new ReadResult(new ChannelResponse(false, List.of()), 1); + } + + if (buf.readableBytes() < 1 + ValueSize.UINT64.getValue()) return null; + long connectionCount = Binary.read(buf, start + 1, ValueSize.UINT64); + + int requiredBytes = 1 + ValueSize.UINT64.getValue() + Math.toIntExact(connectionCount) * CONNECTION_ENTRY_SIZE; + if (buf.readableBytes() < requiredBytes) return null; + + int offset = start + 1 + ValueSize.UINT64.getValue(); + List connections = new ArrayList<>(); + + for (int i = 0; i < connectionCount; i++) { + byte[] uuidBytes = new byte[16]; + buf.getBytes(offset, uuidBytes); offset += 16; + + String id = HexFormat.of().formatHex(uuidBytes); + + long subscribedAt = Binary.read(buf, offset, ValueSize.UINT64); offset += 8; + long readBytes = Binary.read(buf, offset, ValueSize.UINT64); offset += 8; + long writeBytes = Binary.read(buf, offset, ValueSize.UINT64); offset += 8; + + connections.add(new ChannelConnectionItem(id, subscribedAt, readBytes, writeBytes)); + } + + int totalRead = offset - start; + + return new ReadResult(new ChannelResponse(true, connections), totalRead); + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/parsers/ChannelsParser.java b/src/main/java/cl/throttr/parsers/ChannelsParser.java new file mode 100644 index 0000000..72805b3 --- /dev/null +++ b/src/main/java/cl/throttr/parsers/ChannelsParser.java @@ -0,0 +1,98 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.enums.ValueSize; +import cl.throttr.responses.ChannelsItem; +import cl.throttr.responses.ChannelsResponse; +import cl.throttr.utils.Binary; +import io.netty.buffer.ByteBuf; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +public class ChannelsParser implements ResponseParser { + private static final int HEADER_SIZE = 8; // FRAGMENTS (P) + private static final int FRAGMENT_HEADER_SIZE = 16; // FRAGMENT ID + Q + private static final int ENTRY_SIZE = 25; // QL (1 byte) + 3 x UINT64 (24 bytes) + + @Override + public ReadResult tryParse(ByteBuf buf) { + int start = buf.readerIndex(); + + // Validar mínimo 1 byte para status + if (buf.readableBytes() < 1) return null; + + // Validar header de fragments + if (buf.readableBytes() < 1 + HEADER_SIZE) return null; + long fragments = Binary.read(buf, start + 1, ValueSize.UINT64); + int offset = start + 1 + HEADER_SIZE; + + List channels = new ArrayList<>(); + + for (long f = 0; f < fragments; f++) { + // Validar fragment ID + Q + if (buf.readableBytes() < offset - start + FRAGMENT_HEADER_SIZE) return null; + + offset += 8; // fragment ID (ignored) + long q = Binary.read(buf, offset, ValueSize.UINT64); + offset += 8; + + // Validar entradas + int entriesSize = Math.toIntExact(q) * ENTRY_SIZE; + if (buf.readableBytes() < offset - start + entriesSize) return null; + + List sizes = new ArrayList<>(); + List read = new ArrayList<>(); + List write = new ArrayList<>(); + List subs = new ArrayList<>(); + int totalQL = 0; + + for (int c = 0; c < q; c++) { + byte ql = buf.getByte(offset); offset++; + sizes.add(ql); + totalQL += Byte.toUnsignedInt(ql); + + read.add(Binary.read(buf, offset, ValueSize.UINT64)); offset += 8; + write.add(Binary.read(buf, offset, ValueSize.UINT64)); offset += 8; + subs.add(Binary.read(buf, offset, ValueSize.UINT64)); offset += 8; + } + + // Validar que todos los nombres estén + if (buf.readableBytes() < offset - start + totalQL) return null; + + for (int c = 0; c < q; c++) { + int len = Byte.toUnsignedInt(sizes.get(c)); + byte[] nameBytes = new byte[len]; + buf.getBytes(offset, nameBytes); offset += len; + + String name = new String(nameBytes, StandardCharsets.UTF_8); + + channels.add(new ChannelsItem( + name, + read.get(c), + write.get(c), + subs.get(c) + )); + } + } + + int totalRead = offset - start; + return new ReadResult(new ChannelsResponse(true, channels), totalRead); + } +} diff --git a/src/main/java/cl/throttr/parsers/ConnectionParser.java b/src/main/java/cl/throttr/parsers/ConnectionParser.java new file mode 100644 index 0000000..1420ea9 --- /dev/null +++ b/src/main/java/cl/throttr/parsers/ConnectionParser.java @@ -0,0 +1,44 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.responses.ConnectionResponse; +import cl.throttr.responses.ConnectionsItem; +import io.netty.buffer.ByteBuf; + +public class ConnectionParser implements ResponseParser { + private static final int ENTRY_SIZE = 237; + + @Override + public ReadResult tryParse(ByteBuf buf) { + int index = buf.readerIndex(); + + byte status = buf.getByte(index); + + if (status == 0x00) { + return new ReadResult(new ConnectionResponse(false, null), 1); + } + + if (buf.readableBytes() < 1 + ENTRY_SIZE) return null; + + byte[] data = new byte[ENTRY_SIZE]; + buf.getBytes(index + 1, data, 0, ENTRY_SIZE); + + ConnectionsItem item = ConnectionsItem.fromBytes(data); + return new ReadResult(new ConnectionResponse(true, item), 1 + ENTRY_SIZE); + } +} diff --git a/src/main/java/cl/throttr/parsers/ConnectionsParser.java b/src/main/java/cl/throttr/parsers/ConnectionsParser.java new file mode 100644 index 0000000..caabcaa --- /dev/null +++ b/src/main/java/cl/throttr/parsers/ConnectionsParser.java @@ -0,0 +1,68 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.enums.ValueSize; +import cl.throttr.responses.ConnectionsItem; +import cl.throttr.responses.ConnectionsResponse; +import cl.throttr.utils.Binary; +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; + +public class ConnectionsParser implements ResponseParser { + private static final int ENTRY_SIZE = 237; + private static final int HEADER_SIZE = 1 + 8; // status + fragments + + @Override + public ReadResult tryParse(ByteBuf buf) { + int index = buf.readerIndex(); + + if (buf.readableBytes() < HEADER_SIZE) return null; + + int i = index + 1; + long fragments = Binary.read(buf, i, ValueSize.UINT64); + i += 8; + + List items = new ArrayList<>(); + + for (long f = 0; f < fragments; f++) { + if (buf.readableBytes() < i - index + 8 + 8) return null; + + i += 8; // skip fragment ID + long count = Binary.read(buf, i, ValueSize.UINT64); + i += 8; + + int totalFragmentBytes = Math.toIntExact(count) * ENTRY_SIZE; + if (buf.readableBytes() < i - index + totalFragmentBytes) return null; + + for (int c = 0; c < count; c++) { + if (buf.readableBytes() < i - index + ENTRY_SIZE) return null; + + byte[] entryData = new byte[ENTRY_SIZE]; + buf.getBytes(i, entryData); + i += ENTRY_SIZE; + + items.add(ConnectionsItem.fromBytes(entryData)); + } + } + + int totalConsumed = i - index; + return new ReadResult(new ConnectionsResponse(true, items), totalConsumed); + } +} diff --git a/src/main/java/cl/throttr/parsers/GetParser.java b/src/main/java/cl/throttr/parsers/GetParser.java new file mode 100644 index 0000000..f985cdb --- /dev/null +++ b/src/main/java/cl/throttr/parsers/GetParser.java @@ -0,0 +1,60 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.enums.ValueSize; +import cl.throttr.responses.GetResponse; +import cl.throttr.utils.Binary; +import io.netty.buffer.ByteBuf; + +public class GetParser implements ResponseParser { + private final ValueSize size; + + public GetParser(ValueSize size) { + this.size = size; + } + + @Override + public ReadResult tryParse(ByteBuf buf) { + int index = buf.readerIndex(); + + if (buf.readableBytes() < 1) return null; + byte success = buf.getByte(index); + + if (success == 0) { + byte[] data = new byte[1]; + buf.getBytes(index, data); + return new ReadResult(GetResponse.fromBytes(data, size), 1); + } + + int minHeader = 1 + 1 + size.getValue() + size.getValue(); // success + ttlType + ttl + valueSize + if (buf.readableBytes() < minHeader) return null; + + int valueSizeOffset = index + 1 + 1 + size.getValue(); // skip success + ttlType + ttl + + if (buf.readableBytes() < (valueSizeOffset + size.getValue() - index)) return null; + + long valueSize = Binary.read(buf, valueSizeOffset, size); + long total = minHeader + valueSize; + + if (buf.readableBytes() < total) return null; + + byte[] data = new byte[(int) total]; + buf.getBytes(index, data); + return new ReadResult(GetResponse.fromBytes(data, size), (int) total); + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/parsers/InfoParser.java b/src/main/java/cl/throttr/parsers/InfoParser.java new file mode 100644 index 0000000..be850c4 --- /dev/null +++ b/src/main/java/cl/throttr/parsers/InfoParser.java @@ -0,0 +1,38 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.responses.InfoResponse; +import io.netty.buffer.ByteBuf; + +public class InfoParser implements ResponseParser { + private static final int EXPECTED_LENGTH = 432; + + @Override + public ReadResult tryParse(ByteBuf buf) { + int index = buf.readerIndex(); + if (buf.readableBytes() < 1 + EXPECTED_LENGTH) return null; + + buf.getByte(index); + + byte[] merged = new byte[EXPECTED_LENGTH + 1]; + buf.getBytes(index, merged); + + var response = InfoResponse.fromBytes(merged); + return new ReadResult(response, 1 + EXPECTED_LENGTH); + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/parsers/ListParser.java b/src/main/java/cl/throttr/parsers/ListParser.java new file mode 100644 index 0000000..29d303a --- /dev/null +++ b/src/main/java/cl/throttr/parsers/ListParser.java @@ -0,0 +1,90 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.enums.ValueSize; +import cl.throttr.responses.ListItem; +import cl.throttr.responses.ListResponse; +import cl.throttr.utils.Binary; +import io.netty.buffer.ByteBuf; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +public class ListParser implements ResponseParser { + private final ValueSize size; + + public ListParser(ValueSize size) { + this.size = size; + } + + @Override + public ReadResult tryParse(ByteBuf buf) { + int index = buf.readerIndex(); + if (buf.readableBytes() < 1 + 8) return null; + + int i = index + 1; + long fragments = Binary.read(buf, i, ValueSize.UINT64); + i += 8; + + List items = new ArrayList<>(); + + for (long f = 0; f < fragments; f++) { + if (buf.readableBytes() < i - index + 8 + 8) return null; + + i += 8; // skip fragment ID + long keysInFragment = Binary.read(buf, i, ValueSize.UINT64); + i += 8; + + int perKeyHeader = 3 + 8 + size.getValue(); + int keyHeadersSize = Math.toIntExact(keysInFragment) * perKeyHeader; + + if (buf.readableBytes() < i - index + keyHeadersSize) return null; + + List keyLengths = new ArrayList<>(); + List scopedItems = new ArrayList<>(); + + for (int j = 0; j < keysInFragment; j++) { + int keyLength = buf.getUnsignedByte(i++); + int keyType = buf.getUnsignedByte(i++); + int ttlType = buf.getUnsignedByte(i++); + long expiresAt = Binary.read(buf, i, ValueSize.UINT64); i += 8; + long bytesUsed = Binary.read(buf, i, size); i += size.getValue(); + + scopedItems.add(new ListItem("", keyLength, keyType, ttlType, expiresAt, bytesUsed)); + keyLengths.add(keyLength); + } + + int totalKeyBytes = keyLengths.stream().mapToInt(Integer::intValue).sum(); + if (buf.readableBytes() < i - index + totalKeyBytes) return null; + + for (int j = 0; j < scopedItems.size(); j++) { + int len = keyLengths.get(j); + byte[] keyBytes = new byte[len]; + buf.getBytes(i, keyBytes); + i += len; + scopedItems.get(j).setKey(new String(keyBytes, StandardCharsets.UTF_8)); + } + + items.addAll(scopedItems); + } + + int totalConsumed = i - index; + return new ReadResult(new ListResponse(true, items), totalConsumed); + } +} diff --git a/src/main/java/cl/throttr/parsers/QueryParser.java b/src/main/java/cl/throttr/parsers/QueryParser.java new file mode 100644 index 0000000..8ec0edd --- /dev/null +++ b/src/main/java/cl/throttr/parsers/QueryParser.java @@ -0,0 +1,52 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.enums.ValueSize; +import cl.throttr.responses.QueryResponse; +import io.netty.buffer.ByteBuf; + +public class QueryParser implements ResponseParser { + private final ValueSize size; + + public QueryParser(ValueSize size) { + this.size = size; + } + + @Override + public ReadResult tryParse(ByteBuf buf) { + int index = buf.readerIndex(); + if (buf.readableBytes() < 1) return null; + + byte success = buf.getByte(index); + + if (success == 0x00) { + byte[] data = new byte[1]; + buf.getBytes(index, data); + return new ReadResult(QueryResponse.fromBytes(data, size), 1); + } + + int expected = 1 + size.getValue() + 1 + size.getValue(); + if (buf.readableBytes() < expected) return null; + + byte[] data = new byte[expected]; + buf.getBytes(index, data); + QueryResponse response = QueryResponse.fromBytes(data, size); + + return new ReadResult(response, expected); + } +} diff --git a/src/main/java/cl/throttr/parsers/ResponseParser.java b/src/main/java/cl/throttr/parsers/ResponseParser.java new file mode 100644 index 0000000..7547988 --- /dev/null +++ b/src/main/java/cl/throttr/parsers/ResponseParser.java @@ -0,0 +1,23 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import io.netty.buffer.ByteBuf; + +public interface ResponseParser { + ReadResult tryParse(ByteBuf buf); +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/parsers/StatParser.java b/src/main/java/cl/throttr/parsers/StatParser.java new file mode 100644 index 0000000..ddb433e --- /dev/null +++ b/src/main/java/cl/throttr/parsers/StatParser.java @@ -0,0 +1,38 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.responses.StatResponse; +import io.netty.buffer.ByteBuf; + +public class StatParser implements ResponseParser { + @Override + public ReadResult tryParse(ByteBuf buf) { + int index = buf.readerIndex(); + byte status = buf.getByte(index); + if (status == 0x00) { + return new ReadResult(new StatResponse(false, 0, 0, 0, 0), 1); + } + + int expected = 1 + 8 * 4; // status + 4 campos uint64 + if (buf.readableBytes() < expected) return null; + + byte[] data = new byte[expected]; + buf.getBytes(index, data); + return new ReadResult(StatResponse.fromBytes(data), expected); + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/parsers/StatsParser.java b/src/main/java/cl/throttr/parsers/StatsParser.java new file mode 100644 index 0000000..8f2711f --- /dev/null +++ b/src/main/java/cl/throttr/parsers/StatsParser.java @@ -0,0 +1,84 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.enums.ValueSize; +import cl.throttr.responses.StatsItem; +import cl.throttr.responses.StatsResponse; +import cl.throttr.utils.Binary; +import io.netty.buffer.ByteBuf; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +public class StatsParser implements ResponseParser { + @Override + public ReadResult tryParse(ByteBuf buf) { + int index = buf.readerIndex(); + if (buf.readableBytes() < 1 + 8) return null; + + int i = index + 1; + long fragments = Binary.read(buf, i, ValueSize.UINT64); + i += 8; + + List items = new ArrayList<>(); + + for (long f = 0; f < fragments; f++) { + if (buf.readableBytes() < i - index + 8 + 8) return null; + + i += 8; // skip timestamp + long keysInFragment = Binary.read(buf, i, ValueSize.UINT64); + i += 8; + + long perKeyHeader = 33; + + long keyHeadersSize = keysInFragment * perKeyHeader; + if (buf.readableBytes() < i - index + keyHeadersSize) return null; + + List scopedItems = new ArrayList<>(); + List keyLengths = new ArrayList<>(); + + for (long j = 0; j < keysInFragment; j++) { + int keyLength = buf.getUnsignedByte(i++); + long readsPerMin = Binary.read(buf, i, ValueSize.UINT64); i += 8; + long writesPerMin = Binary.read(buf, i, ValueSize.UINT64); i += 8; + long totalReads = Binary.read(buf, i, ValueSize.UINT64); i += 8; + long totalWrites = Binary.read(buf, i, ValueSize.UINT64); i += 8; + + scopedItems.add(new StatsItem("", keyLength, readsPerMin, writesPerMin, totalReads, totalWrites)); + keyLengths.add(keyLength); + } + + long totalKeyBytes = keyLengths.stream().mapToInt(Integer::intValue).sum(); + if (buf.readableBytes() < i - index + totalKeyBytes) return null; + + for (int j = 0; j < scopedItems.size(); j++) { + int len = keyLengths.get(j); + byte[] keyBytes = new byte[len]; + buf.getBytes(i, keyBytes); + i += len; + scopedItems.get(j).setKey(new String(keyBytes, StandardCharsets.UTF_8)); + } + + items.addAll(scopedItems); + } + + int totalConsumed = i - index; + return new ReadResult(new StatsResponse(true, items), totalConsumed); + } +} diff --git a/src/main/java/cl/throttr/parsers/StatusParser.java b/src/main/java/cl/throttr/parsers/StatusParser.java new file mode 100644 index 0000000..626e292 --- /dev/null +++ b/src/main/java/cl/throttr/parsers/StatusParser.java @@ -0,0 +1,33 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.responses.StatusResponse; +import io.netty.buffer.ByteBuf; + +public class StatusParser implements ResponseParser { + public static final int SIZE = 1; + + @Override + public ReadResult tryParse(ByteBuf buf) { + byte[] data = new byte[1]; + buf.getBytes(buf.readerIndex(), data); + StatusResponse response = StatusResponse.fromBytes(data); + return new ReadResult(response, SIZE); + + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/parsers/WhoamiParser.java b/src/main/java/cl/throttr/parsers/WhoamiParser.java new file mode 100644 index 0000000..b825deb --- /dev/null +++ b/src/main/java/cl/throttr/parsers/WhoamiParser.java @@ -0,0 +1,34 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.parsers; + +import cl.throttr.ReadResult; +import cl.throttr.responses.WhoamiResponse; +import io.netty.buffer.ByteBuf; + +public class WhoamiParser implements ResponseParser { + private static final int TOTAL_SIZE = 1 + 16; + + @Override + public ReadResult tryParse(ByteBuf buf) { + int index = buf.readerIndex(); + if (buf.readableBytes() < TOTAL_SIZE) return null; + + byte[] uuid = new byte[16]; + buf.getBytes(index + 1, uuid); + return new ReadResult(new WhoamiResponse(true, uuid), TOTAL_SIZE); + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/requests/ChannelRequest.java b/src/main/java/cl/throttr/requests/ChannelRequest.java new file mode 100644 index 0000000..a334cf8 --- /dev/null +++ b/src/main/java/cl/throttr/requests/ChannelRequest.java @@ -0,0 +1,49 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +/** + * Stat request + */ +public record ChannelRequest( + String channel +) { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + byte[] channelBytes = channel.getBytes(StandardCharsets.UTF_8); + + var buffer = ByteBuffer.allocate( + 2 + channelBytes.length + ); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + buffer.put((byte) RequestType.CHANNEL.getValue()); + buffer.put((byte) channelBytes.length); + buffer.put(channelBytes); + + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/ChannelsRequest.java b/src/main/java/cl/throttr/requests/ChannelsRequest.java new file mode 100644 index 0000000..b70e6f8 --- /dev/null +++ b/src/main/java/cl/throttr/requests/ChannelsRequest.java @@ -0,0 +1,38 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Channels request + */ +public record ChannelsRequest() { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + var buffer = ByteBuffer.allocate(1); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) RequestType.CHANNELS.getValue()); + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/ConnectionRequest.java b/src/main/java/cl/throttr/requests/ConnectionRequest.java new file mode 100644 index 0000000..512a43a --- /dev/null +++ b/src/main/java/cl/throttr/requests/ConnectionRequest.java @@ -0,0 +1,56 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Stat request + */ +public record ConnectionRequest( + String id +) { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + byte[] idBytes = hexStringToByteArray(id); + + var buffer = ByteBuffer.allocate(1 + 16); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + buffer.put((byte) RequestType.CONNECTION.getValue()); + buffer.put(idBytes); + + return buffer.array(); + } + + private static byte[] hexStringToByteArray(String s) { + int len = s.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + int hi = Character.digit(s.charAt(i), 16); + int lo = Character.digit(s.charAt(i + 1), 16); + data[i / 2] = (byte) ((hi << 4) + lo); + } + return data; + } +} diff --git a/src/main/java/cl/throttr/requests/ConnectionsRequest.java b/src/main/java/cl/throttr/requests/ConnectionsRequest.java new file mode 100644 index 0000000..0797d84 --- /dev/null +++ b/src/main/java/cl/throttr/requests/ConnectionsRequest.java @@ -0,0 +1,38 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Connections request + */ +public record ConnectionsRequest() { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + var buffer = ByteBuffer.allocate(1); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) RequestType.CONNECTIONS.getValue()); + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/Dispatcher.java b/src/main/java/cl/throttr/requests/Dispatcher.java new file mode 100644 index 0000000..dde04a0 --- /dev/null +++ b/src/main/java/cl/throttr/requests/Dispatcher.java @@ -0,0 +1,83 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.ValueSize; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public final class Dispatcher { + + private Dispatcher() {} + + public static Object dispatch(Channel channel, Queue pending, Object request, ValueSize size) throws IOException, InterruptedException, ExecutionException { + if (!channel.isActive()) { + throw new IOException("Socket is already closed"); + } + + if (request instanceof List list) { + return dispatchBatch(channel, pending, list, size); + } + + return dispatchSingle(channel, pending, request, size); + } + + private static Object dispatchBatch(Channel channel, Queue pending, List list, ValueSize size) throws InterruptedException, ExecutionException { + ByteArrayOutputStream totalBuffer = new ByteArrayOutputStream(); + List types = new ArrayList<>(); + + for (Object req : list) { + byte[] buffer = Serializer.invoke(req, size); + totalBuffer.writeBytes(buffer); + types.add(Byte.toUnsignedInt(buffer[0])); + } + + byte[] finalBuffer = totalBuffer.toByteArray(); + List> futures = new ArrayList<>(); + + for (int type : types) { + CompletableFuture f = new CompletableFuture<>(); + pending.add(new PendingRequest(f, type)); + futures.add(f); + } + + channel.writeAndFlush(Unpooled.wrappedBuffer(finalBuffer)).syncUninterruptibly(); + + List responses = new ArrayList<>(); + for (CompletableFuture f : futures) { + responses.add(f.get()); + } + + return responses; + } + + private static Object dispatchSingle(Channel channel, Queue pending, Object request, ValueSize size) throws InterruptedException, ExecutionException { + byte[] buffer = Serializer.invoke(request, size); + CompletableFuture future = new CompletableFuture<>(); + int type = Byte.toUnsignedInt(buffer[0]); + pending.add(new PendingRequest(future, type)); + + channel.writeAndFlush(Unpooled.wrappedBuffer(buffer)).syncUninterruptibly(); + + return future.get(); + } +} diff --git a/src/main/java/cl/throttr/requests/InfoRequest.java b/src/main/java/cl/throttr/requests/InfoRequest.java new file mode 100644 index 0000000..efefadc --- /dev/null +++ b/src/main/java/cl/throttr/requests/InfoRequest.java @@ -0,0 +1,38 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Info request + */ +public record InfoRequest() { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + var buffer = ByteBuffer.allocate(1); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) RequestType.INFO.getValue()); + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/ListRequest.java b/src/main/java/cl/throttr/requests/ListRequest.java new file mode 100644 index 0000000..8e38a5f --- /dev/null +++ b/src/main/java/cl/throttr/requests/ListRequest.java @@ -0,0 +1,38 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * List request + */ +public record ListRequest() { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + var buffer = ByteBuffer.allocate(1); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) RequestType.LIST.getValue()); + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/PendingRequest.java b/src/main/java/cl/throttr/requests/PendingRequest.java new file mode 100644 index 0000000..6b3841c --- /dev/null +++ b/src/main/java/cl/throttr/requests/PendingRequest.java @@ -0,0 +1,20 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import java.util.concurrent.CompletableFuture; + +public record PendingRequest(CompletableFuture future, int type) {} diff --git a/src/main/java/cl/throttr/requests/PublishRequest.java b/src/main/java/cl/throttr/requests/PublishRequest.java new file mode 100644 index 0000000..c6f85fc --- /dev/null +++ b/src/main/java/cl/throttr/requests/PublishRequest.java @@ -0,0 +1,56 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; +import cl.throttr.enums.ValueSize; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +import static cl.throttr.utils.Binary.put; + +/** + * Publish request + */ +public record PublishRequest( + String channel, + String payload +) { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes(ValueSize size) { + byte[] channelBytes = channel.getBytes(StandardCharsets.UTF_8); + byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8); + + var buffer = ByteBuffer.allocate( + 2 + channelBytes.length + size.getValue() + payloadBytes.length + ); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + buffer.put((byte) RequestType.PUBLISH.getValue()); + buffer.put((byte) channelBytes.length); + put(buffer, payloadBytes.length, size); + buffer.put(channelBytes); + buffer.put(payloadBytes); + + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/Serializer.java b/src/main/java/cl/throttr/requests/Serializer.java new file mode 100644 index 0000000..a4a7cc6 --- /dev/null +++ b/src/main/java/cl/throttr/requests/Serializer.java @@ -0,0 +1,30 @@ +package cl.throttr.requests; + +import cl.throttr.enums.ValueSize; + +public class Serializer { + + private Serializer() {} + + public static byte[] invoke(Object request, ValueSize size) { + return switch (request) { + case InsertRequest insert -> insert.toBytes(size); + case QueryRequest query -> query.toBytes(); + case UpdateRequest update -> update.toBytes(size); + case PurgeRequest purge -> purge.toBytes(); + case SetRequest set -> set.toBytes(size); + case GetRequest get -> get.toBytes(); + case ListRequest list -> list.toBytes(); + case InfoRequest info -> info.toBytes(); + case StatRequest stat -> stat.toBytes(); + case StatsRequest stats -> stats.toBytes(); + case PublishRequest publish -> publish.toBytes(size); + case ConnectionsRequest connections -> connections.toBytes(); + case ConnectionRequest connection -> connection.toBytes(); + case ChannelsRequest channels -> channels.toBytes(); + case ChannelRequest channel -> channel.toBytes(); + case WhoAmiRequest whoami -> whoami.toBytes(); + case null, default -> throw new IllegalArgumentException("Unsupported request type"); + }; + } +} diff --git a/src/main/java/cl/throttr/requests/StatRequest.java b/src/main/java/cl/throttr/requests/StatRequest.java new file mode 100644 index 0000000..7b52072 --- /dev/null +++ b/src/main/java/cl/throttr/requests/StatRequest.java @@ -0,0 +1,49 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +/** + * Stat request + */ +public record StatRequest( + String key +) { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + + var buffer = ByteBuffer.allocate( + 2 + keyBytes.length + ); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + buffer.put((byte) RequestType.STAT.getValue()); + buffer.put((byte) keyBytes.length); + buffer.put(keyBytes); + + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/StatsRequest.java b/src/main/java/cl/throttr/requests/StatsRequest.java new file mode 100644 index 0000000..058fee7 --- /dev/null +++ b/src/main/java/cl/throttr/requests/StatsRequest.java @@ -0,0 +1,38 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Stats request + */ +public record StatsRequest() { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + var buffer = ByteBuffer.allocate(1); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) RequestType.STATS.getValue()); + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/SubscribeRequest.java b/src/main/java/cl/throttr/requests/SubscribeRequest.java new file mode 100644 index 0000000..0eafa42 --- /dev/null +++ b/src/main/java/cl/throttr/requests/SubscribeRequest.java @@ -0,0 +1,49 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +/** + * Subscribe request + */ +public record SubscribeRequest( + String channel +) { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + byte[] channelBytes = channel.getBytes(StandardCharsets.UTF_8); + + var buffer = ByteBuffer.allocate( + 2 + channelBytes.length + ); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + buffer.put((byte) RequestType.SUBSCRIBE.getValue()); + buffer.put((byte) channelBytes.length); + buffer.put(channelBytes); + + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/UnsubscribeRequest.java b/src/main/java/cl/throttr/requests/UnsubscribeRequest.java new file mode 100644 index 0000000..92b04b4 --- /dev/null +++ b/src/main/java/cl/throttr/requests/UnsubscribeRequest.java @@ -0,0 +1,49 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +/** + * Unsubscribe request + */ +public record UnsubscribeRequest( + String channel +) { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + byte[] channelBytes = channel.getBytes(StandardCharsets.UTF_8); + + var buffer = ByteBuffer.allocate( + 2 + channelBytes.length + ); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + buffer.put((byte) RequestType.UNSUBSCRIBE.getValue()); + buffer.put((byte) channelBytes.length); + buffer.put(channelBytes); + + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/requests/WhoAmiRequest.java b/src/main/java/cl/throttr/requests/WhoAmiRequest.java new file mode 100644 index 0000000..a035b50 --- /dev/null +++ b/src/main/java/cl/throttr/requests/WhoAmiRequest.java @@ -0,0 +1,38 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.requests; + +import cl.throttr.enums.RequestType; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * WhoAmI request + */ +public record WhoAmiRequest() { + /** + * To bytes + * + * @return byte[] + */ + public byte[] toBytes() { + var buffer = ByteBuffer.allocate(1); + buffer.order(ByteOrder.LITTLE_ENDIAN); + buffer.put((byte) RequestType.WHOAMI.getValue()); + return buffer.array(); + } +} diff --git a/src/main/java/cl/throttr/responses/ChannelConnectionItem.java b/src/main/java/cl/throttr/responses/ChannelConnectionItem.java new file mode 100644 index 0000000..ca8893b --- /dev/null +++ b/src/main/java/cl/throttr/responses/ChannelConnectionItem.java @@ -0,0 +1,30 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +public class ChannelConnectionItem { + public final String id; + public final long subscribedAt; + public final long readBytes; + public final long writeBytes; + + public ChannelConnectionItem(String id, long subscribedAt, long readBytes, long writeBytes) { + this.id = id; + this.subscribedAt = subscribedAt; + this.readBytes = readBytes; + this.writeBytes = writeBytes; + } +} diff --git a/src/main/java/cl/throttr/responses/ChannelResponse.java b/src/main/java/cl/throttr/responses/ChannelResponse.java new file mode 100644 index 0000000..e2e576a --- /dev/null +++ b/src/main/java/cl/throttr/responses/ChannelResponse.java @@ -0,0 +1,28 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +import java.util.List; + +public class ChannelResponse { + public final boolean success; + public final List connections; + + public ChannelResponse(boolean success, List connections) { + this.success = success; + this.connections = connections; + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/ChannelsItem.java b/src/main/java/cl/throttr/responses/ChannelsItem.java new file mode 100644 index 0000000..2349e21 --- /dev/null +++ b/src/main/java/cl/throttr/responses/ChannelsItem.java @@ -0,0 +1,30 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +public class ChannelsItem { + public final String name; + public final long readBytes; + public final long writeBytes; + public final long subscribers; + + public ChannelsItem(String name, long readBytes, long writeBytes, long subscribers) { + this.name = name; + this.readBytes = readBytes; + this.writeBytes = writeBytes; + this.subscribers = subscribers; + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/ChannelsResponse.java b/src/main/java/cl/throttr/responses/ChannelsResponse.java new file mode 100644 index 0000000..21a444c --- /dev/null +++ b/src/main/java/cl/throttr/responses/ChannelsResponse.java @@ -0,0 +1,28 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +import java.util.List; + +public class ChannelsResponse { + public final boolean success; + public final List items; + + public ChannelsResponse(boolean success, List items) { + this.success = success; + this.items = items; + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/ConnectionResponse.java b/src/main/java/cl/throttr/responses/ConnectionResponse.java new file mode 100644 index 0000000..b807c33 --- /dev/null +++ b/src/main/java/cl/throttr/responses/ConnectionResponse.java @@ -0,0 +1,26 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +public class ConnectionResponse { + public final boolean found; + public final ConnectionsItem item; + + public ConnectionResponse(boolean found, ConnectionsItem item) { + this.found = found; + this.item = item; + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/ConnectionsItem.java b/src/main/java/cl/throttr/responses/ConnectionsItem.java new file mode 100644 index 0000000..350cede --- /dev/null +++ b/src/main/java/cl/throttr/responses/ConnectionsItem.java @@ -0,0 +1,167 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +import cl.throttr.enums.ValueSize; +import cl.throttr.utils.Binary; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +public class ConnectionsItem { + public final String id; + public final byte type; + public final byte kind; + public final byte ipVersion; + public final byte[] ip; + public final int port; + public final long connectedAt; + public final long readBytes; + public final long writeBytes; + public final long publishedBytes; + public final long receivedBytes; + public final long allocatedBytes; + public final long consumedBytes; + public final long insertRequests; + public final long setRequests; + public final long queryRequests; + public final long getRequests; + public final long updateRequests; + public final long purgeRequests; + public final long listRequests; + public final long infoRequests; + public final long statRequests; + public final long statsRequests; + public final long publishRequests; + public final long subscribeRequests; + public final long unsubscribeRequests; + public final long connectionsRequests; + public final long connectionRequests; + public final long channelsRequests; + public final long channelRequests; + public final long whoamiRequests; + + public ConnectionsItem( + String id, byte type, byte kind, byte ipVersion, byte[] ip, int port, long[] v + ) { + this.id = id; + this.type = type; + this.kind = kind; + this.ipVersion = ipVersion; + this.ip = ip; + this.port = port; + this.connectedAt = v[0]; + this.readBytes = v[1]; + this.writeBytes = v[2]; + this.publishedBytes = v[3]; + this.receivedBytes = v[4]; + this.allocatedBytes = v[5]; + this.consumedBytes = v[6]; + this.insertRequests = v[7]; + this.setRequests = v[8]; + this.queryRequests = v[9]; + this.getRequests = v[10]; + this.updateRequests = v[11]; + this.purgeRequests = v[12]; + this.listRequests = v[13]; + this.infoRequests = v[14]; + this.statRequests = v[15]; + this.statsRequests = v[16]; + this.publishRequests = v[17]; + this.subscribeRequests = v[18]; + this.unsubscribeRequests = v[19]; + this.connectionsRequests = v[20]; + this.connectionRequests = v[21]; + this.channelsRequests = v[22]; + this.channelRequests = v[23]; + this.whoamiRequests = v[24]; + } + + public static ConnectionsItem fromBytes(byte[] data) { + ByteBuf buf = Unpooled.wrappedBuffer(data); + int i = 0; + + byte[] idBytes = new byte[16]; + buf.getBytes(i, idBytes); i += 16; + StringBuilder idBuilder = new StringBuilder(32); + for (byte b : idBytes) { + idBuilder.append(String.format("%02x", b)); + } + String id = idBuilder.toString(); + + byte type = buf.getByte(i++); + byte kind = buf.getByte(i++); + byte ipVersion = buf.getByte(i++); + + byte[] ip = new byte[16]; + buf.getBytes(i, ip); i += 16; + + int port = buf.getUnsignedShortLE(i); i += 2; + + long[] v = new long[25]; + long connectedAt = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[0] = connectedAt; + long readBytes = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[1] = readBytes; + long writeBytes = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[2] = writeBytes; + long publishedBytes = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[3] = publishedBytes; + long receivedBytes = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[4] = receivedBytes; + long allocatedBytes = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[5] = allocatedBytes; + long consumedBytes = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[6] = consumedBytes; + long insertRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[7] = insertRequests; + long setRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[8] = setRequests; + long queryRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[9] = queryRequests; + long getRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[10] = getRequests; + long updateRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[11] = updateRequests; + long purgeRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[12] = purgeRequests; + long listRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[13] = listRequests; + long infoRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[14] = infoRequests; + long statRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[15] = statRequests; + long statsRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[16] = statsRequests; + long publishRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[17] = publishRequests; + long subscribeRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[18] = subscribeRequests; + long unsubscribeRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[19] = unsubscribeRequests; + long connectionsRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[20] = connectionsRequests; + long connectionRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[21] = connectionRequests; + long channelsRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[22] = channelsRequests; + long channelRequests = Binary.read(buf, i, ValueSize.UINT64); i += 8; + v[23] = channelRequests; + long whoamiRequests = Binary.read(buf, i, ValueSize.UINT64); + v[24] = whoamiRequests; + + return new ConnectionsItem(id, type, kind, ipVersion, ip, port, v); + } +} diff --git a/src/main/java/cl/throttr/responses/ConnectionsResponse.java b/src/main/java/cl/throttr/responses/ConnectionsResponse.java new file mode 100644 index 0000000..538cb1b --- /dev/null +++ b/src/main/java/cl/throttr/responses/ConnectionsResponse.java @@ -0,0 +1,36 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +import java.util.List; + +public class ConnectionsResponse { + private final boolean success; + private final List items; + + public ConnectionsResponse(boolean success, List items) { + this.success = success; + this.items = items; + } + + public boolean isSuccess() { + return success; + } + + public List getItems() { + return items; + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/GetResponse.java b/src/main/java/cl/throttr/responses/GetResponse.java index 5fae917..440b3b8 100644 --- a/src/main/java/cl/throttr/responses/GetResponse.java +++ b/src/main/java/cl/throttr/responses/GetResponse.java @@ -31,49 +31,6 @@ public record GetResponse( long ttl, byte[] value ) { - /** - * Equals - * - * @param o the reference object with which to compare. - * @return - */ - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof GetResponse(var success, var ttlType, var ttl, var value))) return false; - return this.success == success && - this.ttl == ttl && - this.ttlType == ttlType && - java.util.Arrays.equals(this.value, value); - } - - /** - * Hashcode - * - * @return - */ - @Override - public int hashCode() { - int result = java.util.Objects.hash(success, ttlType, ttl); - result = 31 * result + java.util.Arrays.hashCode(value); - return result; - } - - /** - * To string - * - * @return - */ - @Override - public String toString() { - return "GetResponse{" + - "success=" + success + - ", ttlType=" + ttlType + - ", ttl=" + ttl + - ", value=" + java.util.Arrays.toString(value) + - '}'; - } - /** * Parse from bytes * @@ -81,10 +38,6 @@ public String toString() { * @return QueryResponse */ public static GetResponse fromBytes(byte[] data, ValueSize size) { - if (data.length < 1) { - throw new IllegalArgumentException("Invalid GetResponse: empty response"); - } - ByteBuffer buffer = ByteBuffer.wrap(data); buffer.order(ByteOrder.LITTLE_ENDIAN); @@ -97,10 +50,6 @@ public static GetResponse fromBytes(byte[] data, ValueSize size) { long ttl = Binary.read(buffer, size); long valueSize = Binary.read(buffer, size); - if (buffer.remaining() != valueSize) { - throw new IllegalArgumentException("Expected " + valueSize + " bytes for value but got " + buffer.remaining()); - } - byte[] value = new byte[(int) valueSize]; buffer.get(value); diff --git a/src/main/java/cl/throttr/responses/InfoResponse.java b/src/main/java/cl/throttr/responses/InfoResponse.java new file mode 100644 index 0000000..c4dd3e6 --- /dev/null +++ b/src/main/java/cl/throttr/responses/InfoResponse.java @@ -0,0 +1,152 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class InfoResponse { + public final boolean success; + public final long timestamp; + public final long totalRequests; + public final long totalRequestsPerMinute; + public final long totalInsertRequests; + public final long totalInsertRequestsPerMinute; + public final long totalQueryRequests; + public final long totalQueryRequestsPerMinute; + public final long totalUpdateRequests; + public final long totalUpdateRequestsPerMinute; + public final long totalPurgeRequests; + public final long totalPurgeRequestsPerMinute; + public final long totalGetRequests; + public final long totalGetRequestsPerMinute; + public final long totalSetRequests; + public final long totalSetRequestsPerMinute; + public final long totalListRequests; + public final long totalListRequestsPerMinute; + public final long totalInfoRequests; + public final long totalInfoRequestsPerMinute; + public final long totalStatsRequests; + public final long totalStatsRequestsPerMinute; + public final long totalStatRequests; + public final long totalStatRequestsPerMinute; + public final long totalSubscribeRequests; + public final long totalSubscribeRequestsPerMinute; + public final long totalUnsubscribeRequests; + public final long totalUnsubscribeRequestsPerMinute; + public final long totalPublishRequests; + public final long totalPublishRequestsPerMinute; + public final long totalChannelRequests; + public final long totalChannelRequestsPerMinute; + public final long totalChannelsRequests; + public final long totalChannelsRequestsPerMinute; + public final long totalWhoamiRequests; + public final long totalWhoamiRequestsPerMinute; + public final long totalConnectionRequests; + public final long totalConnectionRequestsPerMinute; + public final long totalConnectionsRequests; + public final long totalConnectionsRequestsPerMinute; + public final long totalReadBytes; + public final long totalReadBytesPerMinute; + public final long totalWriteBytes; + public final long totalWriteBytesPerMinute; + public final long totalKeys; + public final long totalCounters; + public final long totalBuffers; + public final long totalAllocatedBytesOnCounters; + public final long totalAllocatedBytesOnBuffers; + public final long totalSubscriptions; + public final long totalChannels; + public final long startupTimestamp; + public final long totalConnections; + public final String version; + + public InfoResponse( + boolean success, + long[] v, + String version + ) { + this.success = success; + this.timestamp = v[0]; + this.totalRequests = v[1]; + this.totalRequestsPerMinute = v[2]; + this.totalInsertRequests = v[3]; + this.totalInsertRequestsPerMinute = v[4]; + this.totalQueryRequests = v[5]; + this.totalQueryRequestsPerMinute = v[6]; + this.totalUpdateRequests = v[7]; + this.totalUpdateRequestsPerMinute = v[8]; + this.totalPurgeRequests = v[9]; + this.totalPurgeRequestsPerMinute = v[10]; + this.totalGetRequests = v[11]; + this.totalGetRequestsPerMinute = v[12]; + this.totalSetRequests = v[13]; + this.totalSetRequestsPerMinute = v[14]; + this.totalListRequests = v[15]; + this.totalListRequestsPerMinute = v[16]; + this.totalInfoRequests = v[17]; + this.totalInfoRequestsPerMinute = v[18]; + this.totalStatsRequests = v[19]; + this.totalStatsRequestsPerMinute = v[20]; + this.totalStatRequests = v[21]; + this.totalStatRequestsPerMinute = v[22]; + this.totalSubscribeRequests = v[23]; + this.totalSubscribeRequestsPerMinute = v[24]; + this.totalUnsubscribeRequests = v[25]; + this.totalUnsubscribeRequestsPerMinute = v[26]; + this.totalPublishRequests = v[27]; + this.totalPublishRequestsPerMinute = v[28]; + this.totalChannelRequests = v[29]; + this.totalChannelRequestsPerMinute = v[30]; + this.totalChannelsRequests = v[31]; + this.totalChannelsRequestsPerMinute = v[32]; + this.totalWhoamiRequests = v[33]; + this.totalWhoamiRequestsPerMinute = v[34]; + this.totalConnectionRequests = v[35]; + this.totalConnectionRequestsPerMinute = v[36]; + this.totalConnectionsRequests = v[37]; + this.totalConnectionsRequestsPerMinute = v[38]; + this.totalReadBytes = v[39]; + this.totalReadBytesPerMinute = v[40]; + this.totalWriteBytes = v[41]; + this.totalWriteBytesPerMinute = v[42]; + this.totalKeys = v[43]; + this.totalCounters = v[44]; + this.totalBuffers = v[45]; + this.totalAllocatedBytesOnCounters = v[46]; + this.totalAllocatedBytesOnBuffers = v[47]; + this.totalSubscriptions = v[48]; + this.totalChannels = v[49]; + this.startupTimestamp = v[50]; + this.totalConnections = v[51]; + this.version = version; + } + + public static InfoResponse fromBytes(byte[] full) { + boolean success = full[0] == 0x01; + ByteBuffer bb = ByteBuffer.wrap(full, 1, 432).order(ByteOrder.LITTLE_ENDIAN); + long[] values = new long[52]; + for (int i = 0; i < 52; i++) { + values[i] = bb.getLong(); + } + + byte[] versionBytes = new byte[16]; + bb.get(versionBytes); + String version = new String(versionBytes).replaceAll("\u0000+$", ""); + + return new InfoResponse(success, values, version); + } +} diff --git a/src/main/java/cl/throttr/responses/ListItem.java b/src/main/java/cl/throttr/responses/ListItem.java new file mode 100644 index 0000000..f20292c --- /dev/null +++ b/src/main/java/cl/throttr/responses/ListItem.java @@ -0,0 +1,42 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +public class ListItem { + private String key; + public final int keyLength; + public final int keyType; + public final int ttlType; + public final long expiresAt; + public final long bytesUsed; + + public ListItem(String key, int keyLength, int keyType, int ttlType, long expiresAt, long bytesUsed) { + this.key = key; + this.keyLength = keyLength; + this.keyType = keyType; + this.ttlType = ttlType; + this.expiresAt = expiresAt; + this.bytesUsed = bytesUsed; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/ListResponse.java b/src/main/java/cl/throttr/responses/ListResponse.java new file mode 100644 index 0000000..8ab1fee --- /dev/null +++ b/src/main/java/cl/throttr/responses/ListResponse.java @@ -0,0 +1,36 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +import java.util.List; + +public class ListResponse { + private final boolean success; + private final List items; + + public ListResponse(boolean success, List items) { + this.success = success; + this.items = items; + } + + public boolean isSuccess() { + return success; + } + + public List getItems() { + return items; + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/QueryResponse.java b/src/main/java/cl/throttr/responses/QueryResponse.java index 295c048..fb9bc0a 100644 --- a/src/main/java/cl/throttr/responses/QueryResponse.java +++ b/src/main/java/cl/throttr/responses/QueryResponse.java @@ -38,15 +38,14 @@ public record QueryResponse( * @return QueryResponse */ public static QueryResponse fromBytes(byte[] data, ValueSize size) { - int expected = 1 + size.getValue() + 1 + size.getValue(); - if (data.length != expected) { - throw new IllegalArgumentException("Invalid QueryResponse length: " + data.length); - } - var buffer = ByteBuffer.wrap(data); buffer.order(ByteOrder.LITTLE_ENDIAN); boolean success = buffer.get() == 1; + if (!success) { + return new QueryResponse(false, 0, null, 0); + } + long quota = Binary.read(buffer, size); TTLType ttlType = TTLType.fromByte(buffer.get()); long ttl = Binary.read(buffer, size); diff --git a/src/main/java/cl/throttr/responses/StatResponse.java b/src/main/java/cl/throttr/responses/StatResponse.java new file mode 100644 index 0000000..e0bd856 --- /dev/null +++ b/src/main/java/cl/throttr/responses/StatResponse.java @@ -0,0 +1,41 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +import cl.throttr.enums.ValueSize; +import cl.throttr.utils.Binary; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public record StatResponse( + boolean success, + long readsPerMinute, + long writesPerMinute, + long totalReads, + long totalWrites +) { + public static StatResponse fromBytes(byte[] data) { + ByteBuffer buffer = ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN); + boolean success = Binary.read(buffer, ValueSize.UINT8) == 0x01; + long rpm = Binary.read(buffer, ValueSize.UINT64); + long wpm = Binary.read(buffer, ValueSize.UINT64); + long tr = Binary.read(buffer, ValueSize.UINT64); + long tw = Binary.read(buffer, ValueSize.UINT64); + + return new StatResponse(success, rpm, wpm, tr, tw); + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/StatsItem.java b/src/main/java/cl/throttr/responses/StatsItem.java new file mode 100644 index 0000000..4d8470b --- /dev/null +++ b/src/main/java/cl/throttr/responses/StatsItem.java @@ -0,0 +1,42 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +public class StatsItem { + private String key; + public final int keyLength; + public final long readsPerMinute; + public final long writesPerMinute; + public final long totalReads; + public final long totalWrites; + + public StatsItem(String key, int keyLength, long readsPerMinute, long writesPerMinute, long totalReads, long totalWrites) { + this.key = key; + this.keyLength = keyLength; + this.readsPerMinute = readsPerMinute; + this.writesPerMinute = writesPerMinute; + this.totalReads = totalReads; + this.totalWrites = totalWrites; + } + + public String getKey() { + return this.key; + } + + public void setKey(String key) { + this.key = key; + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/StatsResponse.java b/src/main/java/cl/throttr/responses/StatsResponse.java new file mode 100644 index 0000000..e8754f4 --- /dev/null +++ b/src/main/java/cl/throttr/responses/StatsResponse.java @@ -0,0 +1,36 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +import java.util.List; + +public class StatsResponse { + private final boolean success; + private final List items; + + public StatsResponse(boolean success, List items) { + this.success = success; + this.items = items; + } + + public boolean isSuccess() { + return success; + } + + public List getItems() { + return items; + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/responses/WhoamiResponse.java b/src/main/java/cl/throttr/responses/WhoamiResponse.java new file mode 100644 index 0000000..caa91fc --- /dev/null +++ b/src/main/java/cl/throttr/responses/WhoamiResponse.java @@ -0,0 +1,28 @@ +// Copyright (C) 2025 Ian Torres +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cl.throttr.responses; + +import java.util.HexFormat; + +public class WhoamiResponse { + public final boolean success; + public final String uuid; + + public WhoamiResponse(boolean success, byte[] uuidBytes) { + this.success = success; + this.uuid = HexFormat.of().formatHex(uuidBytes); + } +} \ No newline at end of file diff --git a/src/main/java/cl/throttr/utils/Binary.java b/src/main/java/cl/throttr/utils/Binary.java index d32610d..4d6a369 100644 --- a/src/main/java/cl/throttr/utils/Binary.java +++ b/src/main/java/cl/throttr/utils/Binary.java @@ -16,6 +16,7 @@ package cl.throttr.utils; import cl.throttr.enums.ValueSize; +import io.netty.buffer.ByteBuf; import java.nio.ByteBuffer; @@ -52,4 +53,13 @@ public static long read(ByteBuffer buffer, ValueSize size) { case UINT64 -> buffer.getLong(); }; } + + public static long read(ByteBuf buffer, int index, ValueSize size) { + return switch (size) { + case UINT8 -> buffer.getUnsignedByte(index); + case UINT16 -> buffer.getUnsignedShortLE(index); + case UINT32 -> buffer.getUnsignedIntLE(index); + case UINT64 -> buffer.getLongLE(index); + }; + } } \ No newline at end of file diff --git a/src/test/java/cl/throttr/BinaryTest.java b/src/test/java/cl/throttr/BinaryTest.java index 6ceacea..0c411ad 100644 --- a/src/test/java/cl/throttr/BinaryTest.java +++ b/src/test/java/cl/throttr/BinaryTest.java @@ -2,6 +2,8 @@ import cl.throttr.enums.ValueSize; import cl.throttr.utils.Binary; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; @@ -49,4 +51,40 @@ void shouldWriteAndReadUInt64() { long result = Binary.read(buffer, ValueSize.UINT64); assertEquals(value, result); } + + @Test + void shouldReadUInt8FromByteBuf() { + ByteBuf buf = Unpooled.buffer(1); + buf.writeByte(0xAB); + long result = Binary.read(buf, 0, ValueSize.UINT8); + assertEquals(0xAB, result); + } + + @Test + void shouldReadUInt16FromByteBuf() { + ByteBuf buf = Unpooled.buffer(2); + buf.writeByte(0xCD); // little endian: LSB first + buf.writeByte(0xAB); + long result = Binary.read(buf, 0, ValueSize.UINT16); + assertEquals(0xABCD, result); + } + + @Test + void shouldReadUInt32FromByteBuf() { + ByteBuf buf = Unpooled.buffer(4); + buf.writeByte(0xEF); + buf.writeByte(0xBE); + buf.writeByte(0xAD); + buf.writeByte(0xDE); + long result = Binary.read(buf, 0, ValueSize.UINT32); + assertEquals(0xDEADBEEFL, result); + } + + @Test + void shouldReadUInt64FromByteBuf() { + ByteBuf buf = Unpooled.buffer(8); + buf.writeLongLE(0x0123456789ABCDEFL); + long result = Binary.read(buf, 0, ValueSize.UINT64); + assertEquals(0x0123456789ABCDEFL, result); + } } \ No newline at end of file diff --git a/src/test/java/cl/throttr/ConnectionTest.java b/src/test/java/cl/throttr/ConnectionTest.java index d8df244..eaba694 100644 --- a/src/test/java/cl/throttr/ConnectionTest.java +++ b/src/test/java/cl/throttr/ConnectionTest.java @@ -1,6 +1,7 @@ package cl.throttr; import cl.throttr.enums.ValueSize; +import cl.throttr.requests.Serializer; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; @@ -15,7 +16,7 @@ void shouldThrowIfUnsupportedRequestTypeGiven() { IllegalArgumentException ex = assertThrows( IllegalArgumentException.class, - () -> Connection.getRequestBuffer(invalidRequest, ValueSize.UINT16) + () -> Serializer.invoke(invalidRequest, ValueSize.UINT16) ); assertEquals("Unsupported request type", ex.getMessage()); @@ -25,7 +26,7 @@ void shouldThrowIfUnsupportedRequestTypeGiven() { void shouldThrowIfNullRequestGiven() { IllegalArgumentException ex = assertThrows( IllegalArgumentException.class, - () -> Connection.getRequestBuffer(null, ValueSize.UINT16) + () -> Serializer.invoke(null, ValueSize.UINT16) ); assertEquals("Unsupported request type", ex.getMessage()); diff --git a/src/test/java/cl/throttr/GetResponseTest.java b/src/test/java/cl/throttr/GetResponseTest.java deleted file mode 100644 index 15a117e..0000000 --- a/src/test/java/cl/throttr/GetResponseTest.java +++ /dev/null @@ -1,52 +0,0 @@ -package cl.throttr; - -import cl.throttr.enums.TTLType; -import cl.throttr.responses.GetResponse; -import org.junit.jupiter.api.Test; - -import java.util.HashSet; -import java.util.Set; - -import static org.junit.jupiter.api.Assertions.*; - -class GetResponseTest { - - @Test - void shouldCompareContentCorrectly() { - byte[] value1 = new byte[]{0x45, 0x48, 0x4C, 0x4F}; // EHLO - byte[] value2 = new byte[]{0x45, 0x48, 0x4C, 0x4F}; // EHLO (otra instancia) - byte[] different = new byte[]{0x48, 0x4F, 0x4C, 0x41}; // HOLA - - GetResponse r1 = new GetResponse(true, TTLType.SECONDS, 30, value1); - GetResponse r2 = new GetResponse(true, TTLType.SECONDS, 30, value2); - GetResponse r3 = new GetResponse(true, TTLType.SECONDS, 30, different); - - assertEquals(r1, r2); - assertNotEquals(r1, r3); - } - - @Test - void shouldGenerateConsistentHashCode() { - byte[] value = "EHLO".getBytes(); - GetResponse r1 = new GetResponse(true, TTLType.SECONDS, 30, value); - GetResponse r2 = new GetResponse(true, TTLType.SECONDS, 30, "EHLO".getBytes()); - - assertEquals(r1.hashCode(), r2.hashCode()); - - Set set = new HashSet<>(); - set.add(r1); - assertTrue(set.contains(r2)); - } - - @Test - void shouldPrintContentInToString() { - byte[] value = "EHLO".getBytes(); - GetResponse response = new GetResponse(true, TTLType.SECONDS, 30, value); - - String printed = response.toString(); - assertTrue(printed.contains("success=true")); - assertTrue(printed.contains("ttlType=SECONDS")); - assertTrue(printed.contains("ttl=30")); - assertTrue(printed.contains("value=[69, 72, 76, 79]")); // EHLO en decimal - } -} \ No newline at end of file diff --git a/src/test/java/cl/throttr/QueryResponseTest.java b/src/test/java/cl/throttr/QueryResponseTest.java index 8c63036..7c1239d 100644 --- a/src/test/java/cl/throttr/QueryResponseTest.java +++ b/src/test/java/cl/throttr/QueryResponseTest.java @@ -27,32 +27,4 @@ void shouldParseValidQueryResponseWithSuccessTrue() { assertEquals(TTLType.SECONDS, response.ttlType()); assertEquals(5678, response.ttl()); } - - @Test - void shouldParseValidQueryResponseWithSuccessFalse() { - ByteBuffer buffer = ByteBuffer.allocate(1 + 2 + 1 + 2).order(ByteOrder.LITTLE_ENDIAN); - buffer.put((byte) 0); // success - buffer.putShort((short) 4321); // quota - buffer.put((byte) 3); // TTLType.MILLISECONDS - buffer.putShort((short) 8765); // ttl - - QueryResponse response = QueryResponse.fromBytes(buffer.array(), ValueSize.UINT16); - - assertFalse(response.success()); - assertEquals(4321, response.quota()); - assertEquals(TTLType.MILLISECONDS, response.ttlType()); - assertEquals(8765, response.ttl()); - } - - @Test - void shouldThrowIfLengthIsInvalid() { - byte[] data = new byte[5]; // intentionally incorrect size - - IllegalArgumentException ex = assertThrows( - IllegalArgumentException.class, - () -> QueryResponse.fromBytes(data, ValueSize.UINT16) - ); - - assertEquals("Invalid QueryResponse length: 5", ex.getMessage()); - } } \ No newline at end of file diff --git a/src/test/java/cl/throttr/ServiceTest.java b/src/test/java/cl/throttr/ServiceTest.java index cff4365..d3a272d 100644 --- a/src/test/java/cl/throttr/ServiceTest.java +++ b/src/test/java/cl/throttr/ServiceTest.java @@ -25,6 +25,8 @@ import java.time.Duration; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.*; @@ -215,7 +217,115 @@ void shouldSupportBatchInsertAndQuery() throws Exception { service.close(); } + @Test + void shouldSupportListAfterInsert() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + String key = UUID.randomUUID().toString(); + + StatusResponse insert = (StatusResponse) service.send(new InsertRequest(99, TTLType.SECONDS, 60, key)); + assertTrue(insert.success()); + + // LIST + ListResponse list = (ListResponse) service.send(new ListRequest()); + assertTrue(list.isSuccess()); + assertNotNull(list.getItems()); + assertTrue(list.getItems().stream().anyMatch(item -> item.getKey().equals(key))); + + service.close(); + } + + @Test + void shouldSupportStatsAfterSet() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + String key = UUID.randomUUID().toString(); + String value = "EHLO"; + + StatusResponse set = (StatusResponse) service.send(new SetRequest(TTLType.SECONDS, 30, key, value)); + assertTrue(set.success()); + + Awaitility.await().atMost(Duration.ofMillis(30000)).untilAsserted(() -> { + // STATS + StatsResponse stats = (StatsResponse) service.send(new StatsRequest()); + assertTrue(stats.isSuccess()); + assertNotNull(stats.getItems()); + assertTrue(stats.getItems().stream().anyMatch(item -> item.getKey().equals(key))); + + service.close(); + }); + + } + + @Test + void shouldSupportInfoAfterInsert() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + String key = UUID.randomUUID().toString(); + StatusResponse insert = (StatusResponse) service.send(new InsertRequest(99, TTLType.SECONDS, 60, key)); + assertTrue(insert.success()); + + InfoResponse info = (InfoResponse) service.send(new InfoRequest()); + assertTrue(info.success); + + assertTrue(info.totalRequests >= 0); + assertTrue(info.totalInsertRequests >= 0); + assertTrue(info.totalRequestsPerMinute >= 0); + assertTrue(info.totalReadBytes >= 0); + assertTrue(info.totalWriteBytes >= 0); + + service.close(); + } + + @Test + void shouldSupportStatAfterInsert() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + String key = UUID.randomUUID().toString(); + StatusResponse insert = (StatusResponse) service.send(new InsertRequest(42, TTLType.SECONDS, 30, key)); + assertTrue(insert.success()); + + StatResponse errorStat = (StatResponse) service.send(new StatRequest("MISSING_KEY")); + assertFalse(errorStat.success()); + + Awaitility.await().atMost(Duration.ofMillis(200)).untilAsserted(() -> { + StatResponse stat = (StatResponse) service.send(new StatRequest(key)); + assertTrue(stat.success()); + assertTrue(stat.readsPerMinute() >= 0); + assertTrue(stat.writesPerMinute() >= 0); + assertTrue(stat.totalReads() >= 0); + assertTrue(stat.totalWrites() >= 0); + + service.close(); + }); + } @Test void shouldSupportBatchSetAndGet() throws Exception { @@ -253,6 +363,221 @@ void shouldSupportBatchSetAndGet() throws Exception { service.close(); } + @Test + void shouldSupportConnectionsRequest() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + ConnectionsResponse res = (ConnectionsResponse) service.send(new ConnectionsRequest()); + assertTrue(res.isSuccess()); + assertNotNull(res.getItems()); + + for (ConnectionsItem item : res.getItems()) { + assertNotNull(item); + assertNotNull(item.id); + assertEquals(32, item.id.length()); + assertTrue(item.type == 0x00 || item.type == 0x01); + assertTrue(item.kind == 0x00 || item.kind == 0x01); + assertTrue(item.ipVersion == 0x04 || item.ipVersion == 0x06); + assertTrue(item.port > 0); + assertTrue(item.connectedAt > 0); + } + + service.close(); + } + + @Test + void shouldSupportWhoamiRequest() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + WhoamiResponse res = (WhoamiResponse) service.send(new WhoAmiRequest()); + assertTrue(res.success); + assertNotNull(res.uuid); + assertEquals(32, res.uuid.length()); + + service.close(); + } + + @Test + void shouldSupportConnectionRequest() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + // Primero hacemos WHOAMI para obtener nuestro propio ID de conexión + WhoamiResponse whoami = (WhoamiResponse) service.send(new WhoAmiRequest()); + assertTrue(whoami.success); + assertNotNull(whoami.uuid); + assertEquals(32, whoami.uuid.length()); + + // Enviamos la solicitud CONNECTION con el mismo índice de conexión + ConnectionResponse response = (ConnectionResponse) service.send(new ConnectionRequest(whoami.uuid)); + assertTrue(response.found); + assertNotNull(response.item); + + ConnectionResponse errorResponse = (ConnectionResponse) service.send(new ConnectionRequest("b7e0f7c8b6a04c678727303c3a90b341")); + assertFalse(errorResponse.found); + assertNull(errorResponse.item); + + ConnectionsItem item = response.item; + assertEquals(32, item.id.length()); + assertTrue(item.type == 0x00 || item.type == 0x01); + assertTrue(item.kind == 0x00 || item.kind == 0x01); + assertTrue(item.ipVersion == 0x04 || item.ipVersion == 0x06); + assertTrue(item.port > 0); + assertTrue(item.connectedAt > 0); + + service.close(); + } + + @Test + void shouldSupportChannelsRequest() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + ChannelsResponse response = (ChannelsResponse) service.send(new ChannelsRequest()); + assertTrue(response.success); + assertNotNull(response.items); + assertTrue(response.items.size() >= 2); + + service.close(); + } + + @Test + void shouldSupportChannelRequest() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + // Primero obtenemos nuestro UUID con WHOAMI + WhoamiResponse whoami = (WhoamiResponse) service.send(new WhoAmiRequest()); + assertTrue(whoami.success); + assertNotNull(whoami.uuid); + assertEquals(32, whoami.uuid.length()); + + // Ahora pedimos el CHANNEL de nuestro propio UUID + ChannelResponse response = (ChannelResponse) service.send(new ChannelRequest(whoami.uuid)); + assertTrue(response.success); + assertNotNull(response.connections); + assertTrue(response.connections.size() >= 1); + + ChannelResponse errorResponse = (ChannelResponse) service.send(new ChannelRequest("ABCCDEEF")); + assertFalse(errorResponse.success); + + for (ChannelConnectionItem item : response.connections) { + assertNotNull(item.id); + assertEquals(32, item.id.length()); + assertTrue(item.subscribedAt >= 0); + assertTrue(item.readBytes >= 0); + assertTrue(item.writeBytes >= 0); + } + + service.close(); + } + + @Test + void shouldReceivePublishedMessageAfterSubscribe() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + String channel = "test-channel-" + UUID.randomUUID(); + String payload = "hola mundo"; + + CountDownLatch latch = new CountDownLatch(1); + StringBuilder received = new StringBuilder(); + + service.getConnection().subscribe(channel, msg -> { + received.append(msg); + latch.countDown(); + }); + + + Awaitility.await().atMost(Duration.ofMillis(200)).untilAsserted(() -> { + StatusResponse pub = (StatusResponse) service.send(new PublishRequest(channel, payload)); + assertTrue(pub.success()); + + boolean success = latch.await(2, TimeUnit.SECONDS); + assertTrue(success, "No se recibió el mensaje a tiempo"); + assertEquals(payload, received.toString()); + + service.close(); + }); + } + + @Test + void shouldUnsubscribeAndNotReceiveMessages() throws Exception { + ValueSize sized = ValueSize.UINT8; + String size = System.getenv().getOrDefault("THROTTR_SIZE", "uint16"); + if ("uint16".equals(size)) sized = ValueSize.UINT16; + if ("uint32".equals(size)) sized = ValueSize.UINT32; + if ("uint64".equals(size)) sized = ValueSize.UINT64; + + Service service = new Service("127.0.0.1", 9000, sized, 1); + service.connect(); + + String channel = "test-channel-" + UUID.randomUUID(); + String payload = "hola mundo"; + + CountDownLatch latch = new CountDownLatch(1); + StringBuilder received = new StringBuilder(); + + var conn = service.getConnection(); + + conn.subscribe(channel, msg -> { + received.append(msg); + latch.countDown(); + }); + + Awaitility.await().atMost(Duration.ofMillis(3000)).untilAsserted(() -> { + conn.unsubscribe(channel); + + StatusResponse pub = (StatusResponse) service.send(new PublishRequest(channel, payload)); + assertTrue(pub.success()); + + boolean success = latch.await(2, TimeUnit.SECONDS); + assertFalse(success, "Se recibió mensaje después de desuscribirse"); + + service.close(); + }); + } + + + @Test void shouldThrowIfMaxConnectionsIsZero() { IllegalArgumentException ex = assertThrows(