diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index e76788388..9287b76fc 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -40,7 +40,6 @@ import com.clickhouse.data.ClickHouseDataType; import com.clickhouse.data.ClickHouseFormat; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import net.jpountz.lz4.LZ4Factory; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.http.ClassicHttpResponse; @@ -56,7 +55,6 @@ import java.lang.reflect.InvocationTargetException; import java.net.MalformedURLException; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.ZoneId; import java.time.temporal.ChronoUnit; @@ -186,7 +184,6 @@ private Client(Collection endpoints, Map configuration, } this.endpoints = tmpEndpoints.build(); - this.httpClientHelper = new HttpAPIClientHelper(this.configuration, metricsRegistry, initSslContext); String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey()); this.retries = retry == null ? 0 : Integer.parseInt(retry); @@ -197,6 +194,7 @@ private Client(Collection endpoints, Map configuration, this.lz4Factory = LZ4Factory.fastestJavaInstance(); } + this.httpClientHelper = new HttpAPIClientHelper(this.configuration, metricsRegistry, initSslContext, lz4Factory); this.serverVersion = configuration.getOrDefault(ClientConfigProperties.SERVER_VERSION.getKey(), "unknown"); this.dbUser = configuration.getOrDefault(ClientConfigProperties.USER.getKey(), ClientConfigProperties.USER.getDefObjVal()); this.typeHintMapping = (Map>) this.configuration.get(ClientConfigProperties.TYPE_HINT_MAPPING.getKey()); @@ -1073,6 +1071,20 @@ public Builder sslSocketSNI(String sni) { return this; } + /** + * Make sending statement parameters as HTTP Form data (in the body of a request). + * Note: work only with Server side compression. If client compression is enabled it will be disabled + * for query requests with parameters. It is because each parameter is sent as part of multipart content + * what would require compressions of them separately. + * + * @param enable - if feature enabled + * @return this builder instance + */ + public Builder useHttpFormDataForQuery(boolean enable) { + this.configuration.put(ClientConfigProperties.HTTP_SEND_PARAMS_IN_BODY.getKey(), String.valueOf(enable)); + return this; + } + public Client build() { // check if endpoint are empty. so can not initiate client if (this.endpoints.isEmpty()) { @@ -1279,7 +1291,7 @@ public CompletableFuture insert(String tableName, List data, for (int i = 0; i <= maxRetries; i++) { // Execute request try (ClassicHttpResponse httpResponse = - httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(), lz4Factory, + httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(), out -> { out.write("INSERT INTO ".getBytes()); out.write(tableName.getBytes()); @@ -1496,7 +1508,7 @@ public CompletableFuture insert(String tableName, for (int i = 0; i <= retries; i++) { // Execute request try (ClassicHttpResponse httpResponse = - httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(), lz4Factory, + httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(), out -> { writer.onOutput(out); out.close(); @@ -1607,12 +1619,11 @@ public CompletableFuture query(String sqlQuery, Map responseSupplier; + if (queryParams != null) { + requestSettings.setOption(HttpAPIClientHelper.KEY_STATEMENT_PARAMS, queryParams); + } - if (queryParams != null) { - requestSettings.setOption(HttpAPIClientHelper.KEY_STATEMENT_PARAMS, queryParams); - } - responseSupplier = () -> { + Supplier responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node Endpoint selectedEndpoint = getNextAliveNode(); @@ -1620,12 +1631,15 @@ public CompletableFuture query(String sqlQuery, Map { - output.write(sqlQuery.getBytes(StandardCharsets.UTF_8)); - output.close(); - }); - + boolean useMultipart = ClientConfigProperties.HTTP_SEND_PARAMS_IN_BODY.getOrDefault(requestSettings.getAllSettings()); + if (queryParams != null && useMultipart) { + httpResponse = httpClientHelper.executeMultiPartRequest(selectedEndpoint, + requestSettings.getAllSettings(), sqlQuery); + } else { + httpResponse = httpClientHelper.executeRequest(selectedEndpoint, + requestSettings.getAllSettings(), + sqlQuery); + } // Check response if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime)); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java b/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java index 131fc0f20..e4e1d5842 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java @@ -182,6 +182,14 @@ public Object parseValue(String value) { * SNI SSL parameter that will be set for each outbound SSL socket. */ SSL_SOCKET_SNI("ssl_socket_sni", String.class,""), + + /** + * If parameters should be sent in request body. + * Note: work only with Server side compression. If compression is enabled on client level it will be disabled + * for query requests with parameters. + */ + HTTP_SEND_PARAMS_IN_BODY("client.http.use_form_request_for_query", Boolean.class, "false"), + ; private static final Logger LOG = LoggerFactory.getLogger(ClientConfigProperties.class); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/CompressedEntity.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/CompressedEntity.java index 83b2ac885..2e61c7d65 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/CompressedEntity.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/CompressedEntity.java @@ -51,8 +51,8 @@ public void writeTo(OutputStream outStream) throws IOException { throw new UnsupportedOperationException("Unsupported: writing compressed response to elsewhere"); } - try { - httpEntity.writeTo(compressorStreamFactory.createCompressorOutputStream(compressionAlgo, outStream)); + try (OutputStream compressingStream = compressorStreamFactory.createCompressorOutputStream(compressionAlgo, outStream)){ + httpEntity.writeTo(compressingStream); } catch (CompressorException e) { throw new IOException("Failed to create compressing output stream", e); } @@ -75,7 +75,8 @@ public void close() throws IOException { @Override public long getContentLength() { - return httpEntity.getContentLength(); + // compressed request length is unknown even if it is a byte[] + return isResponse ? httpEntity.getContentLength() : -1; } @Override diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 363222c26..675b37fdd 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -21,6 +21,8 @@ import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; +import org.apache.hc.client5.http.entity.mime.MultipartPartBuilder; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; @@ -49,6 +51,7 @@ import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.impl.io.DefaultHttpResponseParserFactory; import org.apache.hc.core5.http.io.SocketConfig; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; import org.apache.hc.core5.http.io.entity.EntityTemplate; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.io.CloseMode; @@ -95,6 +98,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.regex.Pattern; @@ -122,9 +126,13 @@ public class HttpAPIClientHelper { ConnPoolControl poolControl; - public HttpAPIClientHelper(Map configuration, Object metricsRegistry, boolean initSslContext) { + LZ4Factory lz4Factory; + + public HttpAPIClientHelper(Map configuration, Object metricsRegistry, boolean initSslContext, LZ4Factory lz4Factory) { this.metricsRegistry = metricsRegistry; this.httpClient = createHttpClient(initSslContext, configuration); + this.lz4Factory = lz4Factory; + assert this.lz4Factory != null; boolean usingClientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(configuration); boolean usingServerCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(configuration); @@ -410,51 +418,108 @@ public Exception readError(ClassicHttpResponse httpResponse) { private static final long POOL_VENT_TIMEOUT = 10000L; private final AtomicLong timeToPoolVent = new AtomicLong(0); - public ClassicHttpResponse executeRequest(Endpoint server, Map requestConfig, LZ4Factory lz4Factory, - IOCallback writeCallback) throws Exception { + private void doPoolVent() { if (poolControl != null && timeToPoolVent.get() < System.currentTimeMillis()) { timeToPoolVent.set(System.currentTimeMillis() + POOL_VENT_TIMEOUT); poolControl.closeExpired(); } + } - if (requestConfig == null) { - requestConfig = Collections.emptyMap(); - } + private HttpContext createRequestHttpContext(Map requestConfig) { + HttpClientContext context = HttpClientContext.create(); + Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); + Number connectionReqTimeout = ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(requestConfig); + RequestConfig reqHttpConf = RequestConfig.custom() + .setResponseTimeout(responseTimeout.longValue(), TimeUnit.MILLISECONDS) + .setConnectionRequestTimeout(connectionReqTimeout.longValue(), TimeUnit.MILLISECONDS) + .build(); + context.setRequestConfig(reqHttpConf); + + return context; + } + + private URI createRequestURI(Endpoint server, Map requestConfig, boolean addParameters) { URI uri; try { URIBuilder uriBuilder = new URIBuilder(server.getURI()); - addQueryParams(uriBuilder, requestConfig); - uri = uriBuilder.normalizeSyntax().build(); + addRequestParams(requestConfig, uriBuilder::addParameter); + + if (addParameters) { + addStatementParams(requestConfig, uriBuilder::addParameter); + } + + uri = uriBuilder.optimize().build(); } catch (URISyntaxException e) { throw new RuntimeException(e); } + return uri; + } + + private HttpPost createPostRequest(URI uri, Map requestConfig) { HttpPost req = new HttpPost(uri); // req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding addHeaders(req, requestConfig); + return req; + } + + public ClassicHttpResponse executeRequest(Endpoint server, Map requestConfig, + String body) throws Exception { + + final URI uri = createRequestURI(server, requestConfig, true); + final HttpPost req = createPostRequest(uri, requestConfig); + final String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; + + HttpEntity httpEntity = new ByteArrayEntity(body.getBytes(StandardCharsets.UTF_8.name()), CONTENT_TYPE, contentEncoding); + req.setEntity(wrapRequestEntity(httpEntity, requestConfig)); + + return doPostRequest(requestConfig, req); + } + + public ClassicHttpResponse executeMultiPartRequest(Endpoint server, Map requestConfig, String sqlQuery) throws Exception { + + requestConfig.put(ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey(), false); + final URI uri = createRequestURI(server, requestConfig, false); + final HttpPost req = createPostRequest(uri, requestConfig); - // setting entity. wrapping if compression is enabled + MultipartEntityBuilder multipartEntityBuilder = MultipartEntityBuilder.create(); + addStatementParams(requestConfig, multipartEntityBuilder::addTextBody); + multipartEntityBuilder.addTextBody(ClickHouseHttpProto.QPARAM_QUERY_STMT, sqlQuery); + + + HttpEntity httpEntity = multipartEntityBuilder.build(); + req.setHeader(HttpHeaders.CONTENT_TYPE, httpEntity.getContentType()); // set proper content type with generated boundary value + req.setEntity(wrapRequestEntity(httpEntity, requestConfig)); + + return doPostRequest(requestConfig, req); + + + } + + public ClassicHttpResponse executeRequest(Endpoint server, Map requestConfig, + IOCallback writeCallback) throws Exception { + + final URI uri = createRequestURI(server, requestConfig, true); + final HttpPost req = createPostRequest(uri, requestConfig); String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; - req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback), - lz4Factory, + req.setEntity(wrapRequestEntity( + new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback), requestConfig)); - HttpClientContext context = HttpClientContext.create(); - Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); - Number connectionReqTimeout = ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(requestConfig); - RequestConfig reqHttpConf = RequestConfig.custom() - .setResponseTimeout(responseTimeout.longValue(), TimeUnit.MILLISECONDS) - .setConnectionRequestTimeout(connectionReqTimeout.longValue(), TimeUnit.MILLISECONDS) - .build(); - context.setRequestConfig(reqHttpConf); + return doPostRequest(requestConfig, req); + } + + private ClassicHttpResponse doPostRequest(Map requestConfig, HttpPost req) throws Exception { + + doPoolVent(); ClassicHttpResponse httpResponse = null; + HttpContext context = createRequestHttpContext(requestConfig); try { httpResponse = httpClient.executeOpen(null, req, context); httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(), httpResponse.getCode(), - lz4Factory, requestConfig)); if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) { @@ -473,15 +538,15 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r } catch (UnknownHostException e) { closeQuietly(httpResponse); - LOG.warn("Host '{}' unknown", server); + LOG.warn("Host '{}' unknown", req.getAuthority()); throw e; } catch (ConnectException | NoRouteToHostException e) { closeQuietly(httpResponse); - LOG.warn("Failed to connect to '{}': {}", server, e.getMessage()); + LOG.warn("Failed to connect to '{}': {}", req.getAuthority(), e.getMessage()); throw e; } catch (Exception e) { closeQuietly(httpResponse); - LOG.debug("Failed to execute request to '{}': {}", server, e.getMessage(), e); + LOG.debug("Failed to execute request to '{}': {}", req.getAuthority(), e.getMessage(), e); throw e; } } @@ -591,13 +656,9 @@ private void addHeaders(HttpPost req, Map requestConfig) { correctUserAgentHeader(req, requestConfig); } - private void addQueryParams(URIBuilder req, Map requestConfig) { + private void addRequestParams(Map requestConfig, BiConsumer consumer) { if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { - req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); - } - if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { - Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); - params.forEach((k, v) -> req.addParameter("param_" + k, String.valueOf(v))); + consumer.accept(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); } boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); @@ -608,32 +669,39 @@ private void addQueryParams(URIBuilder req, Map requestConfig) { // enable_http_compression make server react on http header // for client side compression Content-Encoding should be set // for server side compression Accept-Encoding should be set - req.addParameter("enable_http_compression", "1"); + consumer.accept("enable_http_compression", "1"); } else { if (serverCompression) { - req.addParameter("compress", "1"); + consumer.accept("compress", "1"); } if (clientCompression) { - req.addParameter("decompress", "1"); + consumer.accept("decompress", "1"); } } Collection sessionRoles = ClientConfigProperties.SESSION_DB_ROLES.getOrDefault(requestConfig); if (!(sessionRoles == null || sessionRoles.isEmpty())) { - sessionRoles.forEach(r -> req.addParameter(ClickHouseHttpProto.QPARAM_ROLE, r)); + sessionRoles.forEach(r -> consumer.accept(ClickHouseHttpProto.QPARAM_ROLE, r)); } for (String key : requestConfig.keySet()) { if (key.startsWith(ClientConfigProperties.SERVER_SETTING_PREFIX)) { Object val = requestConfig.get(key); if (val != null) { - req.addParameter(key.substring(ClientConfigProperties.SERVER_SETTING_PREFIX.length()), String.valueOf(requestConfig.get(key))); + consumer.accept(key.substring(ClientConfigProperties.SERVER_SETTING_PREFIX.length()), String.valueOf(requestConfig.get(key))); } } } } - private HttpEntity wrapRequestEntity(HttpEntity httpEntity, LZ4Factory lz4Factory, Map requestConfig) { + private void addStatementParams(Map requestConfig, BiConsumer consumer) { + if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { + Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); + params.forEach((k, v) -> consumer.accept("param_" + k, String.valueOf(v))); + } + } + + private HttpEntity wrapRequestEntity(HttpEntity httpEntity, Map requestConfig) { boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); @@ -654,7 +722,7 @@ private HttpEntity wrapRequestEntity(HttpEntity httpEntity, LZ4Factory lz4Factor } } - private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, LZ4Factory lz4Factory, Map requestConfig) { + private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, Map requestConfig) { boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java index c9822b1de..1cd75c7e3 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java @@ -77,12 +77,17 @@ public void writeTo(OutputStream outStream) throws IOException { throw new UnsupportedOperationException("Unsupported: writing compressed response to elsewhere"); } else if (clientCompression) { // called by client to send data + OutputStream compressingStream; if (useHttpCompression) { - httpEntity.writeTo(new FramedLZ4CompressorOutputStream(outStream)); + compressingStream = new FramedLZ4CompressorOutputStream(outStream); } else { + compressingStream = new ClickHouseLZ4OutputStream(outStream, lz4Factory.fastCompressor(), bufferSize); + } - httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, lz4Factory.fastCompressor(), - bufferSize)); + try { + httpEntity.writeTo(compressingStream); + } finally { + compressingStream.close(); } } else { httpEntity.writeTo(outStream); @@ -106,7 +111,8 @@ public void close() throws IOException { @Override public long getContentLength() { - return httpEntity.getContentLength(); + // compressed request length is unknown event if it is a byte[] + return isResponse ? httpEntity.getContentLength() : -1; } @Override diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java index 60f2bc59b..6e03ecb2e 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java @@ -223,7 +223,7 @@ public void testDefaultSettings() { Assert.assertEquals(config.get(p.getKey()), p.getDefaultValue(), "Default value doesn't match"); } } - Assert.assertEquals(config.size(), 32); // to check everything is set. Increment when new added. + Assert.assertEquals(config.size(), 33); // to check everything is set. Increment when new added. } try (Client client = new Client.Builder() @@ -256,7 +256,7 @@ public void testDefaultSettings() { .setSocketSndbuf(100000) .build()) { Map config = client.getConfiguration(); - Assert.assertEquals(config.size(), 33); // to check everything is set. Increment when new added. + Assert.assertEquals(config.size(), 34); // to check everything is set. Increment when new added. Assert.assertEquals(config.get(ClientConfigProperties.DATABASE.getKey()), "mydb"); Assert.assertEquals(config.get(ClientConfigProperties.MAX_EXECUTION_TIME.getKey()), "10"); Assert.assertEquals(config.get(ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getKey()), "300000"); @@ -323,7 +323,7 @@ public void testWithOldDefaults() { Assert.assertEquals(config.get(p.getKey()), p.getDefaultValue(), "Default value doesn't match"); } } - Assert.assertEquals(config.size(), 32); // to check everything is set. Increment when new added. + Assert.assertEquals(config.size(), 33); // to check everything is set. Increment when new added. } } diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index c8572e3cc..2da5af59e 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -12,6 +12,7 @@ import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.enums.ProxyType; import com.clickhouse.client.api.insert.InsertResponse; +import com.clickhouse.client.api.internal.DataTypeConverter; import com.clickhouse.client.api.internal.ServerSettings; import com.clickhouse.client.api.query.GenericRecord; import com.clickhouse.client.api.query.QueryResponse; @@ -43,7 +44,9 @@ import java.util.Arrays; import java.util.Base64; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -1185,6 +1188,44 @@ public void testEndpointUrlPathIsPreserved() throws Exception { } } + @Test(groups = {"integration"}) + public void testMultiPartRequest() { + final Map params = new HashMap<>(); + params.put("database_name", "system"); + params.put("table_names", + DataTypeConverter.INSTANCE.arrayToString(Arrays.asList("COLLATIONS", "ENGINES"), "Array(String)")); + + // Use http compression + try (Client client = newClient().useHttpCompression(true).setOption(ClientConfigProperties.HTTP_SEND_PARAMS_IN_BODY.getKey(), "true").build()) { + List records = client.queryAll("SELECT database, name FROM system.tables WHERE name IN {table_names:Array(String)}", + params); + + Assert.assertEquals(records.get(0).getString("name"), "COLLATIONS"); + Assert.assertEquals(records.get(1).getString("name"), "ENGINES"); + } + + // Use http compression + try (Client client = newClient().useHttpCompression(false).setOption(ClientConfigProperties.HTTP_SEND_PARAMS_IN_BODY.getKey(), "true").build()) { + List records = client.queryAll("SELECT database, name FROM system.tables WHERE name IN {table_names:Array(String)}", + params); + + Assert.assertEquals(records.get(0).getString("name"), "COLLATIONS"); + Assert.assertEquals(records.get(1).getString("name"), "ENGINES"); + } + + // compress request + try (Client client = newClient() + .compressClientRequest(true) + .setOption(ClientConfigProperties.HTTP_SEND_PARAMS_IN_BODY.getKey(), "true") + .useHttpCompression(true).build()) { + List records = client.queryAll("SELECT database, name FROM system.tables WHERE name IN {table_names:Array(String)}", + params); + + Assert.assertEquals(records.get(0).getString("name"), "COLLATIONS"); + Assert.assertEquals(records.get(1).getString("name"), "ENGINES"); + } + } + protected Client.Builder newClient() { ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); boolean isSecure = isCloud();