diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c45c1f533ce9..9db102852f8f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3998,6 +3998,15 @@ public static enum ConfVars { "true", new StringSet("true", "false", "ignore"), "Whether Tez session pool should allow submitting queries to custom queues. The options\n" + "are true, false (error out), ignore (accept the query but ignore the queue setting)."), + HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS("hive.server2.tez.use.external.sessions", false, + "This flag is used in HiveServer2 to use externally started tez sessions"), + HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE("hive.server2.tez.external.sessions.namespace", "", + "ZK namespace to use for tez external sessions"), + HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS("hive.server2.tez.external.sessions.wait.max.attempts", + 60, "Number of attempts before giving up waiting for external sessions (each attempt is 1 sec long)"), + HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS("hive.server2.tez.external.sessions.registry.class", + "org.apache.hadoop.hive.ql.exec.tez.DummyExternalSessionsRegistry", "Tez external sessions\n" + + "registry implementation to use"), HIVE_MAPRED_JOB_FOLLOW_TEZ_QUEUE("hive.mapred.job.follow.tez.queue", false, "Whether the MR jobs initiated by a query should be enforced to run in the queue denoted by " + "'tez.queue.name', e.g. DistCp jobs."), diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java index e94a842d7636..452c6cb0b6d7 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java @@ -50,7 +50,7 @@ import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton; import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; import org.apache.hadoop.hive.ql.session.SessionState; @@ -358,7 +358,7 @@ public void restartSessions(boolean canReuseSession, CliSessionState ss, Session if (oldSs != null && canReuseSession && clusterType.getCoreClusterType() == CoreClusterType.TEZ) { // Copy the tezSessionState from the old CliSessionState. - TezSessionState tezSessionState = oldSs.getTezSession(); + TezSession tezSessionState = oldSs.getTezSession(); oldSs.setTezSession(null); ss.setTezSession(tezSessionState); oldSs.close(); diff --git a/ql/pom.xml b/ql/pom.xml index 994dbacf4e84..b44a97fc1d0f 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -343,7 +343,6 @@ org.apache.hadoop hadoop-yarn-api - true org.apache.hadoop diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java index 26c7fb8b8f3a..d5bbe1a3a1f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.ddl.process.kill; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ddl.DDLOperation; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -37,7 +38,12 @@ public KillQueriesOperation(DDLOperationContext context, KillQueriesDesc desc) { public int execute() throws HiveException { SessionState sessionState = SessionState.get(); for (String queryId : desc.getQueryIds()) { - sessionState.getKillQuery().killQuery(queryId, KILL_QUERY_MESSAGE, context.getDb().getConf()); + // For now, get the config setting here; we can only have one type of the session present. + // Ideally we should check each session separately. + boolean isExternal = HiveConf.getBoolVar(context.getDb().getConf(), + HiveConf.ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS); + sessionState.getKillQuery().killQuery(queryId, KILL_QUERY_MESSAGE, context.getDb().getConf(), + !isExternal); } LOG.info("kill query called ({})", desc.getQueryIds()); return 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AbstractTriggerValidator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AbstractTriggerValidator.java new file mode 100644 index 000000000000..f2d1a746254c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AbstractTriggerValidator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractTriggerValidator { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTriggerValidator.class); + + private ScheduledExecutorService scheduledExecutorService = null; + abstract Runnable getTriggerValidatorRunnable(); + + void startTriggerValidator(long triggerValidationIntervalMs) { + if (scheduledExecutorService == null) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); + Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + LOG.info("Started trigger validator with interval: {} ms", triggerValidationIntervalMs); + } + } + + void stopTriggerValidator() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + scheduledExecutorService = null; + LOG.info("Stopped trigger validator"); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DummyExternalSessionsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DummyExternalSessionsRegistry.java new file mode 100644 index 000000000000..18e8ce527bfb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DummyExternalSessionsRegistry.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + + +import org.apache.hadoop.hive.conf.HiveConf; + +public class DummyExternalSessionsRegistry implements ExternalSessionsRegistry { + + // This constructor is required. Reflective instantiation will invoke this constructor. + public DummyExternalSessionsRegistry(HiveConf conf) { + } + + @Override + public String getSession() throws Exception { + throw new UnsupportedOperationException("not supported in dummy external session registry"); + } + + @Override + public void returnSession(final String appId) { + throw new UnsupportedOperationException("not supported in dummy external session registry"); + } + + @Override + public void close() { + throw new UnsupportedOperationException("not supported in dummy external session registry"); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java new file mode 100644 index 000000000000..7f279c3648d8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface ExternalSessionsRegistry { + Logger LOG = LoggerFactory.getLogger(ExternalSessionsRegistry.class); + + /** + * Returns application of id of the external session. + * @return application id + * @throws Exception in case of any exceptions + */ + String getSession() throws Exception; + + /** + * Returns external session back to registry. + * @param appId application id + */ + void returnSession(String appId); + + /** + * Closes the external session registry. + */ + void close(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistryFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistryFactory.java new file mode 100644 index 000000000000..d17c5ba6804e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistryFactory.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Factory for creating and managing {@code ExternalSessionsRegistry} clients. + *

+ * Instances created by this factory are cached and may be reused across calls. + * Callers do not need to manage the lifecycle or closure of these clients, + * as this factory is responsible for handling those concerns. + *

+ */ +public abstract class ExternalSessionsRegistryFactory { + private static final Logger LOG = LoggerFactory.getLogger(ExternalSessionsRegistryFactory.class); + + private static final Map CLIENTS = new HashMap<>(); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(() -> CLIENTS.values().forEach(ExternalSessionsRegistry::close))); + } + + private ExternalSessionsRegistryFactory() { + } + + public static ExternalSessionsRegistry getClient(final HiveConf conf) { + HiveConf newConf = prepareConf(conf); + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released. + String namespace = conf.get("tez.am.registry.namespace"); + + ExternalSessionsRegistry registry; + synchronized (CLIENTS) { + registry = CLIENTS.computeIfAbsent(namespace, ns -> { + try { + String clazz = HiveConf.getVar(newConf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS); + + return JavaUtils.newInstance(JavaUtils.getClass(clazz, ExternalSessionsRegistry.class), + new Class[]{HiveConf.class}, new Object[]{newConf}); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }); + } + LOG.info("Returning tez external AM registry ({}) for namespace '{}'", System.identityHashCode(registry), + namespace); + return registry; + } + + private static HiveConf prepareConf(HiveConf conf) { + // HS2 would need to know about all coordinators running on all compute groups for a given compute (namespace) + // Setting this config to false in client, will make registry client listen on paths under @compute instead of + // @compute/compute-group + HiveConf newConf = new HiveConf(conf, ExternalSessionsRegistryFactory.class); + + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS after Tez 1.0.0 is released. + newConf.setBoolean("tez.am.registry.enable.compute.groups", false); + return newConf; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 70241345bfab..ae0927939f57 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; @@ -33,35 +32,33 @@ /** * Handles only Kill Action. */ -public class KillTriggerActionHandler implements TriggerActionHandler { +public class KillTriggerActionHandler implements TriggerActionHandler { private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class); private final HiveConf conf; public KillTriggerActionHandler() { - this.conf = new HiveConf(); + this.conf = new HiveConf(); } @Override - public void applyAction(final Map queriesViolated) { - for (Map.Entry entry : queriesViolated.entrySet()) { - if (entry.getValue().getAction().getType() == Action.Type.KILL_QUERY) { - TezSessionState sessionState = entry.getKey(); - String queryId = sessionState.getWmContext().getQueryId(); - try { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName()); - KillQuery killQuery = sessionState.getKillQuery(); - // if kill query is null then session might have been released to pool or closed already - if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg(), - sessionState.getConf()); - } - } catch (HiveException | IOException e) { - LOG.warn("Unable to kill query {} for trigger violation", queryId); - } - } else { - throw new RuntimeException("Unsupported action: " + entry.getValue()); + public void applyAction(Map queriesViolated) { + for (Map.Entry entry : queriesViolated.entrySet()) { + if (entry.getValue().getAction().getType() == Action.Type.KILL_QUERY) { + TezSession sessionState = entry.getKey(); + String queryId = sessionState.getWmContext().getQueryId(); + try { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName()); + boolean wasKilled = sessionState.killQuery(entry.getValue().getViolationMsg()); + if (!wasKilled) { + LOG.info("Didn't kill the query {}", queryId); + } + } catch (HiveException | IOException e) { + LOG.warn("Unable to kill query {} for trigger violation", queryId, e); } + } else { + throw new RuntimeException("Unsupported action: " + entry.getValue()); + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java new file mode 100644 index 000000000000..b3103d3f5918 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; + +/** + * A {@code TezSession} implementation that represents externally managed Tez sessions. + *

+ * Unlike {@code TezSessionState}, these sessions are not created or owned by HiveServer2. + * Instead, HiveServer2 connects to an already existing Tez session. + *

+ * + *

Lifecycle

+ *
    + *
  1. An instance of {@code TezExternalSessionState} is created.
  2. + *
  3. An artificial application ID is acquired from a registry. This does not + * correspond to a real YARN application, as the session is unmanaged.
  4. + *
  5. A {@code TezClient} is instantiated but not started (unlike in + * {@code TezSessionState}), allowing the rest of the Hive codebase to + * interact with it transparently.
  6. + *
+ * + *

+ * This abstraction enables Hive components to interact with external Tez + * sessions using the same interfaces as internally managed sessions. + *

+ */ +public class TezExternalSessionState extends TezSessionState { + private static final Object DEFAULT_CONF_CREATE_LOCK = new Object(); + private static volatile TezConfiguration defaultTezConfiguration; + + private String externalAppId; + private volatile boolean isOpen = false; + private volatile boolean isDestroying = false; + private final ExternalSessionsRegistry registry; + + public TezExternalSessionState(String sessionId, HiveConf conf) { + super(sessionId, conf); + this.registry = ExternalSessionsRegistryFactory.getClient(conf); + synchronized (DEFAULT_CONF_CREATE_LOCK) { + if (defaultTezConfiguration == null) { + defaultTezConfiguration = createDefaultTezConfig(); + } + } + } + + @Override + public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) { + /* + * No-op implementation. + * External Tez sessions are not backed by a YARN application and therefore + * do not manage or localize resources. As a result, there are no local + * resources to ensure for this session type. + */ + } + + @Override + protected void openInternal(String[] additionalFilesNotFromConf, + boolean isAsync, LogHelper console, HiveResources resources) + throws IOException, TezException { + if (isOpen) { + LOG.info("External Tez session {} is already open, skipping duplicate openInternal call", getSessionId()); + return; + } + + initQueueAndUser(); + + boolean llapMode = isLlapMode(); + + TezConfiguration tezConfig = new TezConfiguration(defaultTezConfiguration); + setupSessionAcls(tezConfig, conf); + ServicePluginsDescriptor spd = createServicePluginDescriptor(llapMode, tezConfig); + Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig); + + final TezClient sessionTezClient = TezClient.newBuilder("HIVE-" + getSessionId(), tezConfig) + .setIsSession(true) + .setCredentials(llapCredentials).setServicePluginDescriptor(spd) + .build(); + + LOG.info("Opening new External Tez Session (id: {})", getSessionId()); + TezJobMonitor.initShutdownHook(); + + // External sessions doesn't support async mode (getClient should be much cheaper than open, + // and the async mode is anyway only used for CLI). + if (isAsync) { + LOG.info("Ignoring the async argument for an external session {}", getSessionId()); + } + try { + externalAppId = registry.getSession(); + } catch (TezException | IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + + sessionTezClient.getClient(ApplicationId.fromString(externalAppId)); + LOG.info("Started an external session; client name {}, app ID {}", sessionTezClient.getClientName(), externalAppId); + setTezClient(sessionTezClient); + isOpen = true; + } + + @Override + public void close(boolean keepDagFilesDir) throws Exception { + // We never close external sessions that don't have errors. + try { + if (externalAppId != null) { + registry.returnSession(externalAppId); + } + } catch (Exception e) { + LOG.warn("Caught exception while trying to return external session {}, moving on with session state closure", + externalAppId, e); + } + + externalAppId = null; + isOpen = false; + if (isDestroying) { + super.close(keepDagFilesDir); + } + } + + @Override + public TezSession reopen() throws Exception { + isDestroying = true; + // Reopen will actually close this session, and get a new external app. + // It could instead somehow communicate to the external manager that the session is bad. + return super.reopen(); + } + + @Override + public void destroy() throws Exception { + isDestroying = true; + // This will actually close the session. We assume the external manager will restart it. + // It could instead somehow communicate to the external manager that the session is bad. + super.destroy(); + } + + @Override + public boolean isOpen(){ + return isOpen; + } + + @Override + public boolean killQuery(String reason) throws HiveException { + if (killQuery == null || wmContext == null) { + return false; + } + String queryId = wmContext.getQueryId(); + if (queryId == null) { + return false; + } + LOG.info("Killing the query {}: {}", queryId, reason); + killQuery.killQuery(queryId, reason, conf, false); + return true; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java index f716c5c1eb28..cfd4116048af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java @@ -41,7 +41,7 @@ public class TezRuntimeContext { // llap/container private String executionMode; - public void init(TezSessionState sessionState) { + public void init(TezSession sessionState) { this.amAddress = sessionState.getAppMasterUri(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java new file mode 100644 index 000000000000..68844bd81728 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.WmContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGStatus; + +/** + * A bogus interface that basically describes the evolved usage patterns of TezSessionStateImpl. + * Needed due to lack of multiple inheritance in Java; probably good to have, too - may make + * TezSessionState interface a little bit clearer or even encourage some future cleanup. + * + * It's implemented in two ways - core implementations (regular session, external session), + * and extra functionality implementation (pool session, WM session, etc.) that wraps an instance + * of the core implementation (i.e. use composition). With MI, each session type would just inherit + * from one of each. + */ +public interface TezSession { + final class HiveResources { + public HiveResources(Path dagResourcesDir) { + this.dagResourcesDir = dagResourcesDir; + } + /** A directory that will contain resources related to DAGs and specified in configs. */ + final Path dagResourcesDir; + final Map additionalFilesNotFromConf = new HashMap<>(); + /** Localized resources of this session; both from conf and not from conf (above). */ + final Set localizedResources = new HashSet<>(); + + @Override + public String toString() { + return dagResourcesDir + "; " + additionalFilesNotFromConf.size() + " additional files, " + + localizedResources.size() + " localized resources"; + } + } + + // Core session operations. + void open() throws IOException, TezException; + void open(HiveResources resources) throws IOException, TezException; + void open(String[] additionalFilesNotFromConf) throws IOException, TezException; + void beginOpen(String[] additionalFiles, LogHelper console) throws IOException, TezException; + void endOpen() throws InterruptedException, CancellationException; + TezSession reopen() throws Exception; + void destroy() throws Exception; + void close(boolean keepTmpDir) throws Exception; + void returnToSessionManager() throws Exception; + + /** This is called during open and update (i.e. internally and externally) to localize conf resources. */ + void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) throws IOException; + HiveResources extractHiveResources(); + Path replaceHiveResources(HiveResources resources, boolean isAsync); + + List getLocalizedResources(); + LocalResource getAppJarLr(); + + HiveConf getConf(); + TezClient getTezClient(); + boolean isOpen(); + boolean isOpening(); + boolean getDoAsEnabled(); + String getSessionId(); + String getUser(); + WmContext getWmContext(); // Necessary for triggers, even for non-WM sessions. + void setWmContext(WmContext ctx); + void setQueueName(String queueName); + String getQueueName(); + void setDefault(); + boolean isDefault(); + boolean getLegacyLlapMode(); + void setLegacyLlapMode(boolean b); + void unsetOwnerThread(); + void setOwnerThread(); + KillQuery getKillQuery(); + void setKillQuery(KillQuery kq); + boolean killQuery(String reason) throws HiveException; + String getAppMasterUri(); + default Map getMetrics() { + return new HashMap<>(); + } + default void updateDagStatus(DAGStatus dagStatus) { + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 33d4210fb226..ae113ff38a3f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -103,7 +103,13 @@ public interface SessionObjectFactory { this.deltaRemaining = new AtomicInteger(initialSize); - final int threadCount = HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS); + int threadCount = 1; + if (!HiveConf.getBoolVar(initConf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS)) { + // Don't use multiple threads for external sessions. + threadCount = Math.min(initialSize, + HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)); + } + this.executorService = MoreExecutors .listeningDecorator(new ThreadPoolExecutor(0, threadCount, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("tez-session-init-%d").build())); @@ -488,7 +494,7 @@ int getCurrentSize() { /** * Should be called when the session is no longer needed, to remove it from bySessionId. */ - public void notifyClosed(TezSessionState session) { + public void notifyClosed(SessionType session) { bySessionId.remove(session.getSessionId()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 70a57a52531d..605a92ebc8f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -55,7 +55,7 @@ * In case the user specifies a queue explicitly, a new session is created * on that queue and assigned to the session state. */ -public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTriggerValidator +public class TezSessionPoolManager extends AbstractTriggerValidator implements Manager, SessionExpirationTracker.RestartImpl { private enum CustomQueueAllowed { @@ -85,13 +85,14 @@ private enum CustomQueueAllowed { private static TezSessionPoolManager instance = null; /** This is used to close non-default sessions, and also all sessions when stopping. */ - private final List openSessions = new LinkedList<>(); + private final List openSessions = new LinkedList<>(); private SessionTriggerProvider sessionTriggerProvider; private TriggerActionHandler triggerActionHandler; private TriggerValidatorRunnable triggerValidatorRunnable; private YarnQueueHelper yarnQueueChecker; private TezSessionPoolManagerMetrics metrics = null; + private boolean useExternalSessions; /** Note: this is not thread-safe. */ public static TezSessionPoolManager getInstance() { @@ -200,6 +201,8 @@ public void setupNonPool(HiveConf conf) { this.yarnQueueChecker = new YarnQueueHelper(conf); } + useExternalSessions = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS); + restrictedConfig = new RestrictedConfigChecker(conf); } @@ -232,7 +235,7 @@ private TezSessionPoolSession createAndInitSession( return sessionState; } - private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Exception { + private TezSession getSession(HiveConf conf, boolean doOpen) throws Exception { // NOTE: this can be called outside of HS2, without calling setupPool. Basically it should be // able to handle not being initialized. Perhaps we should get rid of the instance and // move the setupPool code to ctor. For now, at least hasInitialSessions will be false. @@ -312,7 +315,7 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Excepti * @return * @throws Exception */ - private TezSessionState getNewSessionState(HiveConf conf, + private TezSession getNewSessionState(HiveConf conf, String queueName, boolean doOpen) throws Exception { TezSessionPoolSession retTezSessionState = createAndInitSession(queueName, false, conf); if (queueName != null) { @@ -331,7 +334,7 @@ public void returnAfterUse(TezSessionPoolSession session) throws Exception { returnSession(session); } - void returnSession(TezSessionState tezSessionState) throws Exception { + void returnSession(TezSession tezSessionState) { // Ignore the interrupt status while returning the session, but set it back // on the thread in case anything else needs to deal with it. boolean isInterrupted = Thread.interrupted(); @@ -361,7 +364,7 @@ void returnSession(TezSessionState tezSessionState) throws Exception { } public static void closeIfNotDefault( - TezSessionState tezSessionState, boolean keepTmpDir) throws Exception { + TezSession tezSessionState, boolean keepTmpDir) throws Exception { LOG.info("Closing tez session if not default: " + tezSessionState); if (!tezSessionState.isDefault()) { tezSessionState.close(keepTmpDir); @@ -372,13 +375,13 @@ public void stop() throws Exception { if ((instance == null) || !this.hasInitialSessions) { return; } - List sessionsToClose = null; + List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + sessionsToClose = new ArrayList<>(openSessions); } // we can just stop all the sessions - for (TezSessionState sessionState : sessionsToClose) { + for (TezSession sessionState : sessionsToClose) { if (sessionState.isDefault()) { sessionState.close(false); } @@ -405,7 +408,7 @@ public void stop() throws Exception { * @throws Exception */ @Override - public void destroy(TezSessionState tezSessionState) throws Exception { + public void destroy(TezSession tezSessionState) throws Exception { LOG.warn("We are closing a " + (tezSessionState.isDefault() ? "default" : "non-default") + " session because of retry failure."); tezSessionState.close(false); @@ -417,7 +420,8 @@ TriggerValidatorRunnable getTriggerValidatorRunnable() { } protected TezSessionPoolSession createSession(String sessionId, HiveConf conf) { - return new TezSessionPoolSession(sessionId, this, expirationTracker, conf); + TezSessionState base = TezSessionState.create(sessionId, conf, useExternalSessions); + return new TezSessionPoolSession(this, expirationTracker, base); } /* @@ -426,7 +430,7 @@ protected TezSessionPoolSession createSession(String sessionId, HiveConf conf) { * sessions for e.g. when a CLI session is started. The CLI session could re-use the * same tez session eliminating the latencies of new AM and containers. */ - private static boolean canWorkWithSameSession(TezSessionState session, HiveConf conf) + private static boolean canWorkWithSameSession(TezSession session, HiveConf conf) throws HiveException { if (session == null || conf == null || !session.isOpen()) { return false; @@ -452,7 +456,8 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf boolean doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); // either variables will never be null because a default value is returned in case of absence - if (doAsEnabled != session.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + if (doAsEnabled != session.getConf().getBoolVar( + HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { return false; } @@ -469,12 +474,12 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf return (queueName == null) ? confQueueName == null : queueName.equals(confQueueName); } else { // this session should never be a default session unless something has messed up. - throw new HiveException("The pool session " + session + " should have been returned to the pool"); + throw new HiveException("The pool session " + session + " should have been returned to the pool"); } } - public TezSessionState getSession( - TezSessionState session, HiveConf conf, boolean doOpen, boolean llap) throws Exception { + public TezSession getSession( + TezSession session, HiveConf conf, boolean doOpen, boolean llap) throws Exception { if (llap && (this.numConcurrentLlapQueries > 0)) { llapQueue.acquire(); // blocks if no more llap queries can be submitted. } @@ -495,7 +500,7 @@ public TezSessionState getSession( /** Reopens the session that was found to not be running. */ @Override - public TezSessionState reopen(TezSessionState sessionState) throws Exception { + public TezSession reopen(TezSession sessionState) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionState.getQueueName() != null && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) { @@ -506,7 +511,7 @@ public TezSessionState reopen(TezSessionState sessionState) throws Exception { } static void reopenInternal( - TezSessionState sessionState) throws Exception { + TezSession sessionState) throws Exception { // TODO: close basically resets the object to a bunch of nulls. // We should ideally not reuse the object because it's pointless and error-prone. sessionState.close(true); @@ -516,11 +521,11 @@ static void reopenInternal( public void closeNonDefaultSessions() throws Exception { - List sessionsToClose = null; + List sessionsToClose; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + sessionsToClose = new ArrayList<>(openSessions); } - for (TezSessionState sessionState : sessionsToClose) { + for (TezSession sessionState : sessionsToClose) { System.err.println("Shutting down tez session."); closeIfNotDefault(sessionState, false); } @@ -600,7 +605,7 @@ List getTriggerCounterNames() { return counterNames; } - public List getSessions() { + public List getSessions() { return new LinkedList<>(openSessions); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManagerMetrics.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManagerMetrics.java index 6e153b83ea8d..d33bc67c0a15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManagerMetrics.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManagerMetrics.java @@ -130,7 +130,7 @@ public void stop() { } void collectMetrics() { - List sessions = poolManager.getSessions(); + List sessions = poolManager.getSessions(); LOG.debug("Updating metrics, session count: {}", sessions.size()); if (sessions.isEmpty()) { @@ -148,7 +148,7 @@ void collectMetrics() { new ThreadFactoryBuilder().setNameFormat("TezSessionPoolManagerMetrics collector thread - #%d").build()); long start = Time.monotonicNow(); - for (TezSessionState session : sessions) { + for (TezSession session : sessions) { collectTasks.add(CompletableFuture.runAsync(() -> { Map metrics = session.getMetrics(); LOG.debug("Achieved metrics from Tez session ({}): {}", session.getSessionId(), metrics); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 49a3211e60f0..415072f221da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -19,19 +19,25 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; -import java.net.URISyntaxException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.registry.impl.TezAmInstance; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezException; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.dag.api.client.DAGStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * TezSession that is aware of the session pool, and also keeps track of expiration and use. @@ -41,57 +47,32 @@ * if it's time, the expiration is triggered; in that case, or if it was already triggered, the * caller gets a different session. When the session is in use when it expires, the expiration * thread ignores it and lets the return to the pool take care of the expiration. + * + * Because of the lack of multiple inheritance in Java, this uses composition. */ -@VisibleForTesting -class TezSessionPoolSession extends TezSessionState { +public class TezSessionPoolSession implements TezSession { + protected static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolSession.class); private static final int STATE_NONE = 0, STATE_IN_USE = 1, STATE_EXPIRED = 2; public interface Manager { void registerOpenSession(TezSessionPoolSession session); - void unregisterOpenSession(TezSessionPoolSession session); - void returnAfterUse(TezSessionPoolSession session) throws Exception; - - TezSessionState reopen(TezSessionState session) throws Exception; - - void destroy(TezSessionState session) throws Exception; - } - - public static abstract class AbstractTriggerValidator { - private ScheduledExecutorService scheduledExecutorService = null; - abstract Runnable getTriggerValidatorRunnable(); - - void startTriggerValidator(long triggerValidationIntervalMs) { - if (scheduledExecutorService == null) { - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); - Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); - scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, - triggerValidationIntervalMs, TimeUnit.MILLISECONDS); - LOG.info("Started trigger validator with interval: {} ms", triggerValidationIntervalMs); - } - } - - void stopTriggerValidator() { - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - scheduledExecutorService = null; - LOG.info("Stopped trigger validator"); - } - } + TezSession reopen(TezSession session) throws Exception; + void destroy(TezSession session) throws Exception; } private final AtomicInteger sessionState = new AtomicInteger(STATE_NONE); private Long expirationNs; - private final Manager parent; + private final Manager manager; private final SessionExpirationTracker expirationTracker; + private final TezSession baseSession; - public TezSessionPoolSession(String sessionId, Manager parent, - SessionExpirationTracker tracker, HiveConf conf) { - super(sessionId, conf); - this.parent = parent; + public TezSessionPoolSession(Manager manager, + SessionExpirationTracker tracker, TezSession baseSession) { + this.baseSession = baseSession; + this.manager = manager; this.expirationTracker = tracker; } @@ -104,11 +85,11 @@ Long getExpirationNs() { } @Override - void close(boolean keepTmpDir) throws Exception { + public void close(boolean keepTmpDir) throws Exception { try { - super.close(keepTmpDir); + baseSession.close(keepTmpDir); } finally { - parent.unregisterOpenSession(this); + manager.unregisterOpenSession(this); if (expirationTracker != null) { expirationTracker.removeFromExpirationQueue(this); } @@ -116,23 +97,67 @@ void close(boolean keepTmpDir) throws Exception { } @Override - protected void openInternal(String[] additionalFiles, - boolean isAsync, LogHelper console, HiveResources resources) - throws IOException, URISyntaxException, TezException { - super.openInternal(additionalFiles, isAsync, console, resources); - parent.registerOpenSession(this); + public void open(String[] additionalFilesNotFromConf) + throws IOException, TezException { + baseSession.open(additionalFilesNotFromConf); + afterOpen(); + } + + @Override + public void open() throws IOException, TezException { + baseSession.open(); + afterOpen(); + } + + private void afterOpen() { + manager.registerOpenSession(this); if (expirationTracker != null) { expirationTracker.addToExpirationQueue(this); } } @Override - public String toString() { - if (expirationNs == null) return super.toString(); - long expiresInMs = (expirationNs - System.nanoTime()) / 1000000L; - return super.toString() + ", expires in " + expiresInMs + "ms"; + public void open(HiveResources resources) + throws IOException, TezException { + baseSession.open(resources); + afterOpen(); + } + + // TODO: this is only supported in CLI, might be good to try to remove it. + @Override + public void beginOpen(String[] additionalFiles, LogHelper console) + throws IOException, TezException { + baseSession.beginOpen(additionalFiles, console); + afterOpen(); } + @Override + public void endOpen() throws InterruptedException, CancellationException { + baseSession.endOpen(); + } + + @Override + public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) throws IOException { + baseSession.ensureLocalResources(conf, newFilesNotFromConf); + } + + @Override + public HiveResources extractHiveResources() { + return baseSession.extractHiveResources(); + } + + @Override + public Path replaceHiveResources(HiveResources resources, boolean isAsync) { + return baseSession.replaceHiveResources(resources, isAsync); + } + + @Override + public boolean killQuery(String reason) throws HiveException { + return baseSession.killQuery(reason); + } + + // *********** Methods specific to a pool session. + /** * Tries to use this session. When the session is in use, it will not expire. * @return true if the session can be used; false if it has already expired. @@ -157,7 +182,9 @@ boolean stopUsing() { if (!sessionState.compareAndSet(STATE_IN_USE, finalState)) { throw new AssertionError("Unexpected state change; currently " + sessionState.get()); } - if (finalState == STATE_NONE) return true; + if (finalState == STATE_NONE){ + return true; + } expirationTracker.closeAndRestartExpiredSessionAsync(this); return false; } @@ -190,24 +217,164 @@ private final boolean shouldExpire() { @Override public void returnToSessionManager() throws Exception { - parent.returnAfterUse(this); + manager.returnAfterUse(this); } @Override - public TezSessionState reopen() throws Exception { - return parent.reopen(this); + public TezSession reopen() throws Exception { + return manager.reopen(this); } @Override public void destroy() throws Exception { - parent.destroy(this); + manager.destroy(this); } - boolean isOwnedBy(Manager parent) { - return this.parent == parent; + public boolean isOwnedBy(Manager parent) { + return this.manager == parent; } - void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { + public void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { // Nothing to do. } + + @Override + public String toString() { + return baseSession.toString() + getExpirationString(); + } + + private String getExpirationString() { + if (expirationNs == null){ + return ""; + } + long expiresInMs = (expirationNs - System.nanoTime()) / 1000000L; + return ", expires in " + expiresInMs + "ms"; + } + + // ********** The methods that we redirect to base. + // We could instead have a separate "data" interface that would "return superr" here, and + // "return this" in the actual session implementation; however that would require everyone to + // call session.getData().method() for some arbitrary set of methods. Let's keep all the + // ugliness in one place. + + @Override + public HiveConf getConf() { + return baseSession.getConf(); + } + + @Override + public String getSessionId() { + return baseSession.getSessionId(); + } + + @Override + public String getUser() { + return baseSession.getUser(); + } + + @Override + public boolean isOpen() { + return baseSession.isOpen(); + } + + @Override + public void setQueueName(String queueName) { + baseSession.setQueueName(queueName); + } + + @Override + public String getQueueName() { + return baseSession.getQueueName(); + } + + @Override + public void setDefault() { + baseSession.setDefault(); + } + + @Override + public boolean isDefault() { + return baseSession.isDefault(); + } + + @Override + public boolean getDoAsEnabled() { + return baseSession.getDoAsEnabled(); + } + + @Override + public boolean getLegacyLlapMode() { + return baseSession.getLegacyLlapMode(); + } + + @Override + public void setLegacyLlapMode(boolean b) { + baseSession.setLegacyLlapMode(b); + } + + @Override + public WmContext getWmContext() { + return baseSession.getWmContext(); + } + + @Override + public void setWmContext(WmContext ctx) { + baseSession.setWmContext(ctx); + } + + @Override + public LocalResource getAppJarLr() { + return baseSession.getAppJarLr(); + } + + @Override + public List getLocalizedResources() { + return baseSession.getLocalizedResources(); + } + + @Override + public TezClient getTezClient() { + return baseSession.getTezClient(); + } + + @Override + public boolean isOpening() { + return baseSession.isOpening(); + } + + @Override + public void setOwnerThread() { + baseSession.setOwnerThread(); + } + + @Override + public void unsetOwnerThread() { + baseSession.unsetOwnerThread(); + } + + @Override + public KillQuery getKillQuery() { + return baseSession.getKillQuery(); + } + + @Override + public void setKillQuery(KillQuery kq) { + baseSession.setKillQuery(kq); + } + + @Override + public String getAppMasterUri() { + return baseSession.getAppMasterUri(); + } + + @Override + public Map getMetrics(){ + return baseSession.getMetrics(); + } + + @Override + public void updateDagStatus(DAGStatus dagStatus) { + baseSession.updateDagStatus(dagStatus); + } + // ********** End of the methods that we redirect to base. } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 1b6965018a88..2924416ad480 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -18,18 +18,15 @@ package org.apache.hadoop.hive.ql.exec.tez; import org.apache.hadoop.hive.common.JavaVersionUtils; -import org.apache.hadoop.registry.client.api.RegistryOperations; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -62,12 +59,15 @@ import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -92,7 +92,6 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -101,10 +100,10 @@ import com.google.common.cache.CacheBuilder; /** - * Holds session state related to Tez + * The basic implementation of TezSession. */ @JsonSerialize -public class TezSessionState { +public class TezSessionState implements TezSession { protected static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName()); private static final String TEZ_DIR = "_tez_session_dir"; @@ -113,17 +112,17 @@ public class TezSessionState { private static final String LLAP_LAUNCHER = LlapContainerLauncher.class.getName(); private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName(); - private final HiveConf conf; + protected final HiveConf conf; @VisibleForTesting Path tezScratchDir; - private LocalResource appJarLr; + protected LocalResource appJarLr; private TezClient session; private Future sessionFuture; /** Console used for user feedback during async session opening. */ private LogHelper console; @JsonProperty("sessionId") private String sessionId; - private final DagUtils utils; + protected final DagUtils utils; @JsonProperty("queueName") private String queueName; @JsonProperty("defaultQueue") @@ -133,29 +132,13 @@ public class TezSessionState { private AtomicReference ownerThread = new AtomicReference<>(null); - public static final class HiveResources { - public HiveResources(Path dagResourcesDir) { - this.dagResourcesDir = dagResourcesDir; - } - /** A directory that will contain resources related to DAGs and specified in configs. */ - public final Path dagResourcesDir; - public final Map additionalFilesNotFromConf = new HashMap(); - /** Localized resources of this session; both from conf and not from conf (above). */ - public final Set localizedResources = new HashSet<>(); - - @Override - public String toString() { - return dagResourcesDir + "; " + additionalFilesNotFromConf.size() + " additional files, " - + localizedResources.size() + " localized resources"; - } - } private HiveResources resources; @JsonProperty("doAsEnabled") private boolean doAsEnabled; private boolean isLegacyLlapMode; - private WmContext wmContext; - private KillQuery killQuery; + protected WmContext wmContext; + protected KillQuery killQuery; private static final Cache shaCache = CacheBuilder.newBuilder().maximumSize(100).build(); @@ -185,6 +168,12 @@ public TezSessionState(String sessionId, HiveConf conf) { this.sessionId = sessionId; } + public static TezSessionState create(String sessionId, HiveConf conf, boolean useExternalSessions) { + return useExternalSessions ? new TezExternalSessionState(sessionId, conf) : + new TezSessionState(sessionId, conf); + } + + @Override public boolean isOpening() { if (session != null || sessionFuture == null) { return false; @@ -208,6 +197,7 @@ public boolean isOpening() { return false; } + @Override public boolean isOpen() { if (session != null) { return true; @@ -236,58 +226,45 @@ public static String makeSessionId() { return UUID.randomUUID().toString(); } - public void open() throws IOException, URISyntaxException, TezException { - String[] noFiles = null; - open(noFiles); + @Override + public void open() throws IOException, TezException { + openInternal(null, false, null, null); } /** * Creates a tez session. A session is tied to either a cli/hs2 session. You can * submit multiple DAGs against a session (as long as they are executed serially). */ - public void open(String[] additionalFilesNotFromConf) - throws IOException, URISyntaxException, TezException { + @Override + public void open(String[] additionalFilesNotFromConf) throws IOException, TezException { openInternal(additionalFilesNotFromConf, false, null, null); } - public void open(HiveResources resources) - throws IOException, URISyntaxException, TezException { + @Override + public void open(HiveResources resources) throws IOException, TezException { openInternal(null, false, null, resources); } - public void beginOpen(String[] additionalFiles, LogHelper console) - throws IOException, URISyntaxException, TezException { + @Override + public void beginOpen(String[] additionalFiles, LogHelper console) throws IOException, TezException { openInternal(additionalFiles, true, console, null); } protected void openInternal(String[] additionalFilesNotFromConf, - boolean isAsync, LogHelper console, HiveResources resources) - throws IOException, URISyntaxException, TezException { - // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two. - String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); - if (queueName != null && !queueName.equals(confQueueName)) { - LOG.warn("Resetting a queue name that was already set: was " - + queueName + ", now " + confQueueName); - } - this.queueName = confQueueName; - this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); - - // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ? - UserGroupInformation ugi = Utils.getUGI(); - user = ugi.getShortUserName(); - LOG.info("User of session id " + sessionId + " is " + user); + boolean isAsync, LogHelper console, HiveResources resources) throws IOException, TezException { + initQueueAndUser(); // Create the tez tmp dir and a directory for Hive resources. tezScratchDir = createTezDir(sessionId, null); if (resources != null) { // If we are getting the resources externally, don't relocalize anything. this.resources = resources; - LOG.info("Setting resources to " + resources); + LOG.info("Setting resources to {}", resources); } else { this.resources = new HiveResources(createTezDir(sessionId, "resources")); ensureLocalResources(conf, additionalFilesNotFromConf); - LOG.info("Created new resources: " + this.resources); + LOG.info("Created new resources: {}", this.resources); } // Unless already installed on all the cluster nodes, we'll have to localize hive-exec.jar as well. @@ -333,14 +310,10 @@ Map buildCommonLocalResources() { @VisibleForTesting void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, IOException { final Map commonLocalResources = buildCommonLocalResources(); - final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + final boolean llapMode = isLlapMode(); if (llapMode) { - // localize llap client jars - addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources); - addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources); - addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources); - addJarLRByClass(RegistryOperations.class, commonLocalResources); + addLlapJars(commonLocalResources); } // Create environment for AM. @@ -349,36 +322,11 @@ void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, // and finally we're ready to create and start the session // generate basic tez config - final TezConfiguration tezConfig = new TezConfiguration(true); - tezConfig.addResource(conf); - - setupTezParamsBasedOnMR(tezConfig); - - // set up the staging directory to use + final TezConfiguration tezConfig = createTezConfig(); tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); - conf.stripHiddenConfigurations(tezConfig); - ServicePluginsDescriptor servicePluginsDescriptor; - - Credentials llapCredentials = null; - if (llapMode) { - if (isKerberosEnabled(tezConfig)) { - llapCredentials = new Credentials(); - llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig)); - } - // TODO Change this to not serialize the entire Configuration - minor. - UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig); - // we need plugins to handle llap and uber mode - servicePluginsDescriptor = ServicePluginsDescriptor.create(true, - new TaskSchedulerDescriptor[] { TaskSchedulerDescriptor.create( - LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(servicePluginPayload) }, - new ContainerLauncherDescriptor[] { ContainerLauncherDescriptor.create( - LLAP_SERVICE, LLAP_LAUNCHER) }, - new TaskCommunicatorDescriptor[] { TaskCommunicatorDescriptor.create( - LLAP_SERVICE, LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload) }); - } else { - servicePluginsDescriptor = ServicePluginsDescriptor.create(true); - } + Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig); + ServicePluginsDescriptor spd = createServicePluginDescriptor(llapMode, tezConfig); // container prewarming. tell the am how many containers we need if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { @@ -389,8 +337,6 @@ void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n); } - setupSessionAcls(tezConfig, conf); - /* * Update HADOOP_CREDSTORE_PASSWORD for the TezAM. * If there is a job specific credential store, it will be set. @@ -404,16 +350,15 @@ void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, String tezJobNameFormat = HiveConf.getVar(conf, ConfVars.HIVE_TEZ_JOB_NAME); final TezClient session = TezClient.newBuilder(String.format(tezJobNameFormat, sessionId), tezConfig) .setIsSession(true).setLocalResources(commonLocalResources) - .setCredentials(llapCredentials).setServicePluginDescriptor(servicePluginsDescriptor) + .setCredentials(llapCredentials).setServicePluginDescriptor(spd) .build(); - LOG.info("Opening new Tez Session (id: " + sessionId - + ", scratch dir: " + tezScratchDir + ")"); + LOG.info("Opening new Tez Session (id: {} scratch dir: {})", sessionId, tezScratchDir); TezJobMonitor.initShutdownHook(); if (!isAsync) { startSessionAndContainers(session, conf, commonLocalResources, tezConfig, false); - this.session = session; + setTezClient(session); } else { FutureTask sessionFuture = new FutureTask<>(new Callable() { @Override @@ -425,7 +370,7 @@ public TezClient call() throws Exception { } catch (Throwable t) { // The caller has already stopped the session. LOG.error("Failed to start Tez session", t); - throw (t instanceof Exception) ? (Exception)t : new Exception(t); + throw (t instanceof Exception) ? (Exception) t : new Exception(t); } // Check interrupt at the last moment in case we get cancelled quickly. // This is not bulletproof but should allow us to close session in most cases. @@ -444,21 +389,70 @@ public TezClient call() throws Exception { } } - /** - * Check if Kerberos authentication is enabled. - * This is used by: - * - HS2 (upon Tez session creation) - * In secure scenarios HS2 might either be logged on (by Kerberos) by itself or by a launcher - * script it was forked from. In the latter case UGI.getLoginUser().isFromKeytab() returns false, - * hence UGI.getLoginUser().hasKerberosCredentials() is a tightest setting we can check against. - */ - private boolean isKerberosEnabled(Configuration conf) { - try { - return UserGroupInformation.getLoginUser().hasKerberosCredentials() && - HiveConf.getBoolVar(conf, ConfVars.LLAP_USE_KERBEROS); - } catch (IOException e) { - return false; + protected static ServicePluginsDescriptor createServicePluginDescriptor(boolean llapMode, TezConfiguration tezConfig) + throws IOException { + if (!llapMode) { + return ServicePluginsDescriptor.create(true); + } + // TODO Change this to not serialize the entire Configuration - minor. + UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig); + // we need plugins to handle llap and uber mode + return ServicePluginsDescriptor.create(true, + new TaskSchedulerDescriptor[]{TaskSchedulerDescriptor.create(LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload( + servicePluginPayload)}, + new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create(LLAP_SERVICE, LLAP_LAUNCHER)}, + new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create(LLAP_SERVICE, + LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload)}); + } + + protected final Credentials createLlapCredentials(boolean llapMode, TezConfiguration tezConfig) throws IOException { + if (!llapMode || !UserGroupInformation.isSecurityEnabled()){ + return null; + } + Credentials llapCredentials = new Credentials(); + llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig)); + return llapCredentials; + } + + protected final TezConfiguration createDefaultTezConfig() { + TezConfiguration tezConfig = new TezConfiguration(true); + tezConfig.addResource(conf); + setupTezParamsBasedOnMR(tezConfig); + conf.stripHiddenConfigurations(tezConfig); + return tezConfig; + } + + protected final TezConfiguration createTezConfig() throws IOException { + TezConfiguration tezConfig = createDefaultTezConfig(); + setupSessionAcls(tezConfig, conf); + return tezConfig; + } + + protected boolean isLlapMode() { + return "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + } + + protected final void addLlapJars(Map commonLocalResources) + throws IOException { + addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources); + addJarLRByClass(RegistryOperations.class, commonLocalResources); + } + + protected final void initQueueAndUser() throws IOException { + // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two. + String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); + if (queueName != null && !queueName.equals(confQueueName)) { + LOG.warn("Resetting a queue name that was already set: was {} now {}", queueName, confQueueName); } + this.queueName = confQueueName; + this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); + + // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ? + UserGroupInformation ugi = Utils.getUGI(); + user = ugi.getShortUserName(); + LOG.info("User of session id {} is {}", sessionId, user); } private static Token getLlapToken( @@ -563,7 +557,7 @@ public void endOpen() throws InterruptedException, CancellationException { if (session == null) { throw new RuntimeException("Initialization was interrupted"); } - this.session = session; + setTezClient(session); } catch (ExecutionException e) { throw new RuntimeException(e); } @@ -636,7 +630,7 @@ private void setupTezParamsBasedOnMR(TezConfiguration conf) { } } } - private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws + protected void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws IOException { // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. @@ -664,10 +658,10 @@ private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws } /** This is called in openInternal and in TezTask.updateSession to localize conf resources. */ - public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) - throws IOException, URISyntaxException, TezException { + @Override + public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) throws IOException { if (resources == null) { - throw new AssertionError("Ensure called on an unitialized (or closed) session " + sessionId); + throw new AssertionError("Ensure called on an uninitialized (or closed) session " + sessionId); } String dir = resources.dagResourcesDir.toString(); resources.localizedResources.clear(); @@ -729,20 +723,21 @@ public void ensureLocalResources(Configuration conf, String[] newFilesNotFromCon * whether or not to remove the scratch dir at the same time. * @throws Exception */ - void close(boolean keepDagFilesDir) throws Exception { + @Override + public void close(boolean keepDagFilesDir) throws Exception { console = null; appJarLr = null; try { if (session != null) { - LOG.info("Closing Tez Session"); + LOG.info("Closing Tez Session (sync)"); closeClient(session); session = null; } else if (sessionFuture != null) { sessionFuture.cancel(true); TezClient asyncSession = null; try { - asyncSession = sessionFuture.get(); // In case it was done and noone looked at it. + asyncSession = sessionFuture.get(); // In case it was done and none looked at it. } catch (ExecutionException | CancellationException e) { // ignore } catch (InterruptedException e) { @@ -751,7 +746,7 @@ void close(boolean keepDagFilesDir) throws Exception { } sessionFuture = null; if (asyncSession != null) { - LOG.info("Closing Tez Session"); + LOG.info("Closing Tez Session (async)"); closeClient(asyncSession); } } @@ -793,11 +788,17 @@ protected final void cleanupDagResources() throws IOException { } } + @Override public String getSessionId() { return sessionId; } - public TezClient getSession() { + protected final void setTezClient(TezClient session) { + this.session = session; + } + + @Override + public TezClient getTezClient() { if (session == null && sessionFuture != null) { if (!sessionFuture.isDone()) { console.printInfo("Waiting for Tez session and AM to be ready..."); @@ -819,6 +820,7 @@ public TezClient getSession() { return session; } + @Override public LocalResource getAppJarLr() { return appJarLr; } @@ -918,47 +920,58 @@ private String getSha(final Path localFile) throws IOException, IllegalArgumentE } return sha256; } + + @Override public void setQueueName(String queueName) { this.queueName = queueName; } + @Override public String getQueueName() { return queueName; } + @Override public void setDefault() { defaultQueue = true; } + @Override public boolean isDefault() { return defaultQueue; } + @Override public HiveConf getConf() { return conf; } + @Override public List getLocalizedResources() { - return new ArrayList<>(resources.localizedResources); + return resources == null ? new ArrayList<>() : new ArrayList<>(resources.localizedResources); } + @Override public String getUser() { return user; } + @Override public boolean getDoAsEnabled() { return doAsEnabled; } /** Mark session as free for use from TezTask, for safety/debugging purposes. */ - public void markFree() { + @Override + public void unsetOwnerThread() { if (ownerThread.getAndSet(null) == null) { throw new AssertionError("Not in use"); } } /** Mark session as being in use from TezTask, for safety/debugging purposes. */ - public void markInUse() { + @Override + public void setOwnerThread() { String newName = Thread.currentThread().getName(); do { String oldName = ownerThread.get(); @@ -969,51 +982,62 @@ public void markInUse() { } while (!ownerThread.compareAndSet(null, newName)); } - void setLegacyLlapMode(boolean value) { + @Override + public void setLegacyLlapMode(boolean value) { this.isLegacyLlapMode = value; } - boolean getLegacyLlapMode() { + @Override + public boolean getLegacyLlapMode() { return this.isLegacyLlapMode; } + @Override public void returnToSessionManager() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. TezSessionPoolManager.getInstance().returnSession(this); } - public TezSessionState reopen() throws Exception { + @Override + public TezSession reopen() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. return TezSessionPoolManager.getInstance().reopen(this); } + @Override public void destroy() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. TezSessionPoolManager.getInstance().destroy(this); } + @Override public WmContext getWmContext() { return wmContext; } + @Override public void setWmContext(final WmContext wmContext) { this.wmContext = wmContext; } - public void setKillQuery(final KillQuery killQuery) { - this.killQuery = killQuery; - } - + @Override public KillQuery getKillQuery() { return killQuery; } + @Override + public void setKillQuery(final KillQuery killQuery) { + this.killQuery = killQuery; + } + + @Override public HiveResources extractHiveResources() { HiveResources result = resources; resources = null; return result; } + @Override public Path replaceHiveResources(HiveResources resources, boolean isAsync) { Path dir = null; if (this.resources != null) { @@ -1032,10 +1056,10 @@ public Path replaceHiveResources(HiveResources resources, boolean isAsync) { return dir; } + @Override public String getAppMasterUri() { - return Optional.of(getSession()).map( - tezClient -> tezClient.getAmHost() + ":" + tezClient.getAmPort()) - .get(); + return Optional.ofNullable(session).map( + tezClient -> tezClient.getAmHost() + ":" + tezClient.getAmPort()).orElse(null); } /** @@ -1046,6 +1070,7 @@ public String getAppMasterUri() { * * @return A map containing metrics for TezSessionPoolManagerMetrics. */ + @Override public Map getMetrics() { Map metrics = new HashMap<>(); if (dagStatus == null) { @@ -1063,7 +1088,22 @@ public Map getMetrics() { * TezSessionState receives periodic updates from the current DAG's status. * @param dagStatus status of the current DAG */ + @Override public void updateDagStatus(DAGStatus dagStatus) { this.dagStatus = dagStatus; } + + @Override + public boolean killQuery(String reason) throws HiveException { + if (killQuery == null || wmContext == null) { + return false; + } + String queryId = wmContext.getQueryId(); + if (queryId == null) { + return false; + } + LOG.info("Killing the query {}: {}", queryId, reason); + killQuery.killQuery(queryId, reason, conf, true); + return true; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 75d11a070bef..6bd801dd64c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -157,7 +157,7 @@ public int execute() { int rc = 1; boolean cleanContext = false; Context ctx = null; - Ref sessionRef = Ref.from(null); + Ref sessionRef = Ref.from(null); final String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_QUERY_ID); @@ -177,7 +177,7 @@ public int execute() { SessionState ss = SessionState.get(); // Note: given that we return pool sessions to the pool in the finally block below, and that // we need to set the global to null to do that, this "reuse" may be pointless. - TezSessionState session = sessionRef.value = ss.getTezSession(); + TezSession session = sessionRef.value = ss.getTezSession(); if (session != null && !session.isOpen()) { LOG.warn("The session: " + session + " has not been opened"); } @@ -269,7 +269,7 @@ public int execute() { String dagId = this.dagClient.getDagIdentifierString(); String appId = this.dagClient.getSessionIdentifierString(); LOG.info("HS2 Host: [{}], Query ID: [{}], Dag ID: [{}], DAG App ID: [{}], DAG App address: [{}]", - ServerUtils.hostname(), queryId, dagId, appId, session.getSession().getAmHost()); + ServerUtils.hostname(), queryId, dagId, appId, session.getTezClient().getAmHost()); LogUtils.putToMDC(LogUtils.DAGID_KEY, dagId); this.jobID = dagId; runtimeContext.setDagId(dagId); @@ -469,8 +469,8 @@ private void logResources(List additionalLr) { */ @VisibleForTesting void ensureSessionHasResources( - TezSessionState session, String[] nonConfResources) throws Exception { - TezClient client = session.getSession(); + TezSession session, String[] nonConfResources) throws Exception { + TezClient client = session.getTezClient(); // TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ? if (client == null) { // Note: the only sane case where this can happen is the non-pool one. We should get rid @@ -642,19 +642,19 @@ private static void setAccessControlsForCurrentUser(DAG dag, String queryId, dag.setAccessControls(ac); } - private TezSessionState getNewTezSessionOnError( - TezSessionState oldSession) throws Exception { + private TezSession getNewTezSessionOnError( + TezSession oldSession) throws Exception { // Note: we don't pass the config to reopen. If the session was already open, it would // have kept running with its current config - preserve that behavior. - TezSessionState newSession = oldSession.reopen(); + TezSession newSession = oldSession.reopen(); console.printInfo("Session re-established."); return newSession; } - DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception { + DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception { perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); DAGClient dagClient = null; - TezSessionState sessionState = sessionStateRef.value; + TezSession sessionState = sessionStateRef.value; try { try { // ready to start execution on the cluster @@ -690,13 +690,13 @@ DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception return new SyncDagClient(dagClient); } - private DAGClient submitInternal(DAG dag, TezSessionState sessionState) throws TezException, IOException { + private DAGClient submitInternal(DAG dag, TezSession sessionState) throws TezException, IOException { runtimeContext.init(sessionState); - return sessionState.getSession().submitDAG(dag); + return sessionState.getTezClient().submitDAG(dag); } - private void sessionDestroyOrReturnToPool(Ref sessionStateRef, - TezSessionState sessionState) throws Exception{ + private void sessionDestroyOrReturnToPool(Ref sessionStateRef, + TezSession sessionState) throws Exception{ sessionStateRef.value = null; if (sessionState.isDefault() && sessionState instanceof TezSessionPoolSession) { sessionState.returnToSessionManager(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 867e31c7314c..8c759170c54e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -43,10 +43,10 @@ public class TriggerValidatorRunnable implements Runnable { @Override public void run() { try { - Map violatedSessions = new HashMap<>(); - final List sessions = sessionTriggerProvider.getSessions(); + Map violatedSessions = new HashMap<>(); + final List sessions = sessionTriggerProvider.getSessions(); final List triggers = sessionTriggerProvider.getTriggers(); - for (TezSessionState sessionState : sessions) { + for (TezSession sessionState : sessions) { WmContext wmContext = sessionState.getWmContext(); if (wmContext != null && !wmContext.isQueryCompleted() && !wmContext.getSubscribedCounters().isEmpty()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index cd6bb37a4e27..bf4fce11c874 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -22,7 +22,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hive.common.util.Ref; @@ -56,7 +55,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode private SettableFuture returnFuture = null; private boolean isDelayedMove; - private final WorkloadManager wmParent; + private final WorkloadManager wmManager; /** The actual state of the guaranteed task, and the update state, for the session. */ // Note: hypothetically, a generic WM-aware-session should not know about guaranteed tasks. @@ -68,18 +67,17 @@ private final static class ActualWmState { } private final ActualWmState actualState = new ActualWmState(); - public WmTezSession(String sessionId, WorkloadManager parent, - SessionExpirationTracker expiration, HiveConf conf) { - super(sessionId, parent, expiration, conf); - wmParent = parent; + public WmTezSession( + WorkloadManager manager, SessionExpirationTracker expiration, TezSessionState baseSession) { + super(manager, expiration, baseSession); + wmManager = manager; isDelayedMove = false; } @VisibleForTesting - WmTezSession(String sessionId, Manager testParent, - SessionExpirationTracker expiration, HiveConf conf) { - super(sessionId, testParent, expiration, conf); - wmParent = null; + WmTezSession(Manager testParent, SessionExpirationTracker expiration, TezSessionState baseSession) { + super(testParent, expiration, baseSession); + wmManager = null; isDelayedMove = false; } @@ -107,7 +105,7 @@ public ListenableFuture waitForAmRegistryAsync( @Override - void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { + public void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { updateAmEndpointInfo(si, ephSeqVersion); if (si != null) { handleGuaranteedTasksChange(si.getGuaranteedCount()); @@ -151,7 +149,7 @@ private void handleGuaranteedTasksChange(int guaranteedCount) { doNotify = actualState.target != guaranteedCount; } if (!doNotify) return; - wmParent.notifyOfInconsistentAllocation(this); + wmManager.notifyOfInconsistentAllocation(this); } @Override @@ -238,7 +236,7 @@ boolean setFailedToSendGuaranteed() { } public void handleUpdateError(int endpointVersion) { - wmParent.addUpdateError(this, endpointVersion); + wmManager.addUpdateError(this, endpointVersion); } @Override @@ -305,5 +303,4 @@ public String toString() { return super.toString() + ", WM state poolName=" + poolName + ", clusterFraction=" + clusterFraction + ", queryId=" + queryId + ", killReason=" + killReason; } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index c8c40e386172..148e9159f96e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -69,11 +69,10 @@ import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources; +import org.apache.hadoop.hive.ql.exec.tez.TezSession.HiveResources; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import org.apache.hadoop.hive.ql.exec.tez.WmEvent.EventType; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; @@ -100,7 +99,7 @@ * none of that state can be accessed directly - most changes that touch pool state, or interact * with background operations like init, need to go thru eventstate; see e.g. returnAfterUse. */ -public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator +public class WorkloadManager extends AbstractTriggerValidator implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl, WorkloadManagerMxBean { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); @@ -118,6 +117,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida private final int amRegistryTimeoutMs; private final boolean allowAnyPool; private final MetricsSystem metricsSystem; + private final boolean useExternalSessions; + // Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool // sessions, so the pool itself could internally track the ses sions it gave out, since // calling close on an unopened session is probably harmless. @@ -239,6 +240,8 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso metricsSystem = null; } + useExternalSessions = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS); + wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); wmThread.start(); @@ -305,9 +308,9 @@ private void initTriggers() { public void stop() throws Exception { List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions.keySet()); + sessionsToClose = new ArrayList<>(openSessions.keySet()); } - for (TezSessionState sessionState : sessionsToClose) { + for (TezSessionPoolSession sessionState : sessionsToClose) { sessionState.close(false); } if (expirationTracker != null) { @@ -339,7 +342,7 @@ private void updateSessionTriggerProvidersOnMasterThread() { String poolName = entry.getKey(); PoolState poolState = entry.getValue(); final List triggers = Collections.unmodifiableList(poolState.getTriggers()); - final List sessionStates = Collections.unmodifiableList(poolState.getSessions()); + final List sessionStates = Collections.unmodifiableList(poolState.getSessions()); SessionTriggerProvider sessionTriggerProvider = perPoolProviders.get(poolName); if (sessionTriggerProvider != null) { perPoolProviders.get(poolName).setTriggers(triggers); @@ -479,26 +482,24 @@ private void scheduleWork(WmThreadSyncWork context) { // because we know which exact query we intend to kill. This is valid because we // are not expecting query ID to change - we never reuse the session for which a // query is being killed until both the kill, and the user, return it. - String queryId = toKill.getQueryId(); - KillQuery kq = toKill.getKillQuery(); try { - if (kq != null && queryId != null) { - WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL); - LOG.info("Invoking KillQuery for " + queryId + ": " + reason); - try { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName()); - kq.killQuery(queryId, reason, toKill.getConf()); - addKillQueryResult(toKill, true); - killCtx.killSessionFuture.set(true); - wmEvent.endEvent(toKill); - LOG.debug("Killed " + queryId); - return; - } catch (HiveException|IOException ex) { - LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex); - } + boolean wasKilled = false; + String queryId = toKill.getQueryId(); + WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL); + try { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName()); + wasKilled = toKill.killQuery(reason); + addKillQueryResult(toKill, true); + killCtx.killSessionFuture.set(true); + wmEvent.endEvent(toKill); + } catch (HiveException | IOException ex) { + LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex); + } + if (wasKilled) { + LOG.debug("Killed " + queryId); } else { - LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq); + LOG.info("Will queue restart for {}; queryId {}", toKill, queryId); } } finally { toKill.setQueryId(null); @@ -630,7 +631,8 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw if (!wasReturned) { syncWork.toDestroyNoRestart.add(sessionToReturn); } else { - if (sessionToReturn.getWmContext() != null && sessionToReturn.getWmContext().isQueryCompleted()) { + WmContext ctx = sessionToReturn.getWmContext(); + if (ctx != null && ctx.isQueryCompleted()) { sessionToReturn.resolveReturnFuture(); } wmEvent.endEvent(sessionToReturn); @@ -724,7 +726,8 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw if (!tezAmPool.returnSessionAsync(ctx.session)) { syncWork.toDestroyNoRestart.add(ctx.session); } else { - if (ctx.session.getWmContext() != null && ctx.session.getWmContext().isQueryCompleted()) { + WmContext wmCtx = ctx.session.getWmContext(); + if (wmCtx != null && wmCtx.isQueryCompleted()) { ctx.session.resolveReturnFuture(); } wmEvent.endEvent(ctx.session); @@ -1391,7 +1394,8 @@ private void returnSessionOnFailedReuse( if (!tezAmPool.returnSessionAsync(session)) { syncWork.toDestroyNoRestart.add(session); } else { - if (session.getWmContext() != null && session.getWmContext().isQueryCompleted()) { + WmContext wmCtx = session.getWmContext(); + if (wmCtx != null && wmCtx.isQueryCompleted()) { session.resolveReturnFuture(); } wmEvent.endEvent(session); @@ -1557,11 +1561,11 @@ public String toString() { @VisibleForTesting public WmTezSession getSession( - TezSessionState session, MappingInput input, HiveConf conf) throws Exception { + TezSession session, MappingInput input, HiveConf conf) throws Exception { return getSession(session, input, conf, null); } - public WmTezSession getSession(TezSessionState session, MappingInput input, HiveConf conf, + public WmTezSession getSession(TezSession session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception { WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET); // Note: not actually used for pool sessions; verify some things like doAs are not set. @@ -1593,7 +1597,7 @@ public WmTezSession getSession(TezSessionState session, MappingInput input, Hive } @Override - public void destroy(TezSessionState session) throws Exception { + public void destroy(TezSession session) throws Exception { WmTezSession wmTezSession = ensureOwnedSession(session); resetGlobalTezSession(wmTezSession); currentLock.lock(); @@ -1727,7 +1731,7 @@ public void notifyInitializationCompleted(SessionInitContext initCtx) { @Override - public TezSessionState reopen(TezSessionState session) throws Exception { + public TezSession reopen(TezSession session) throws Exception { WmTezSession wmTezSession = ensureOwnedSession(session); HiveConf sessionConf = wmTezSession.getConf(); if (sessionConf == null) { @@ -1765,7 +1769,7 @@ private void notifyWmThreadUnderLock() { hasChangesCondition.signalAll(); } - private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception { + private WmTezSession checkSessionForReuse(TezSession session) throws Exception { if (session == null) return null; WmTezSession result = null; if (session instanceof WmTezSession) { @@ -1814,10 +1818,11 @@ private WmTezSession createSession(HiveConf conf) { protected WmTezSession createSessionObject(String sessionId, HiveConf conf) { conf = (conf == null) ? new HiveConf(this.conf) : conf; conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true"); - return new WmTezSession(sessionId, this, expirationTracker, conf); + TezSessionState base = TezSessionState.create(sessionId, conf, useExternalSessions); + return new WmTezSession(this, expirationTracker, base); } - private WmTezSession ensureOwnedSession(TezSessionState oldSession) { + private WmTezSession ensureOwnedSession(TezSession oldSession) { if (!(oldSession instanceof WmTezSession) || !((WmTezSession)oldSession).isOwnedBy(this)) { throw new AssertionError("Not a WM session " + oldSession); } @@ -1839,7 +1844,7 @@ public void unregisterOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.remove(session); } - tezAmPool.notifyClosed(session); + tezAmPool.notifyClosed(ensureOwnedSession(session)); } @VisibleForTesting diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java index adde440bc79b..2a9aecb6297d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java @@ -29,7 +29,7 @@ public class WorkloadManagerFederation { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class); - public static TezSessionState getSession(TezSessionState session, HiveConf conf, + public static TezSession getSession(TezSession session, HiveConf conf, MappingInput input, boolean isUnmanagedLlapMode, WmContext wmContext) throws Exception { Set desiredCounters = new HashSet<>(); // 1. If WM is not present just go to unmanaged. @@ -60,8 +60,8 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf, } } - private static TezSessionState getUnmanagedSession( - TezSessionState session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode, + private static TezSession getUnmanagedSession( + TezSession session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode, final WmContext wmContext) throws Exception { TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); session = pm.getSession(session, conf, false, isWorkLlapNode); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java new file mode 100644 index 000000000000..550c77e573ab --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +// TODO: tez should provide this registry +public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class); + + private final HiveConf initConf; + private final Set available = new HashSet<>(); + private final Set taken = new HashSet<>(); + private final Object lock = new Object(); + private final int maxAttempts; + + private CuratorCache cache; + private volatile boolean isInitialized; + + + public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) { + this.initConf = initConf; + this.maxAttempts = HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS); + } + + private static String getApplicationId(final ChildData childData) { + return childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1); + } + + private void init() { + String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); + String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = normalizeZkPath(zkNamespace); + CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3)); + synchronized (lock) { + client.start(); + this.cache = CuratorCache.build(client, effectivePath); + CuratorCacheListener listener = CuratorCacheListener.builder() + .forPathChildrenCache(effectivePath, client, new ExternalSessionsPathListener()) + .build(); + cache.listenable().addListener(listener); + cache.start(); + cache.stream() + .filter(childData -> childData.getPath() != null + && childData.getPath().startsWith(effectivePath + "/")) + .forEach(childData -> available.add(getApplicationId(childData))); + LOG.info("Initial external sessions: {}", available); + isInitialized = true; + } + } + + @VisibleForTesting + static String normalizeZkPath(String zkNamespace) { + return (zkNamespace.startsWith("/") ? zkNamespace : "/" + zkNamespace); + } + + @Override + public String getSession() throws Exception { + synchronized (lock) { + if (!isInitialized) { + init(); + } + long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts); + while (available.isEmpty() && ((endTimeNs - System.nanoTime()) > 0)) { + lock.wait(1000L); + } + Iterator iter = available.iterator(); + if (!iter.hasNext()) { + throw new IOException("Cannot get a session after " + maxAttempts + " attempts"); + } + String appId = iter.next(); + iter.remove(); + taken.add(appId); + return appId; + } + } + + @Override + public void returnSession(String appId) { + synchronized (lock) { + if (!isInitialized) { + throw new IllegalStateException("Not initialized"); + } + if (!taken.remove(appId)) { + return; // Session has been removed from ZK. + } + available.add(appId); + lock.notifyAll(); + } + } + + @Override + public void close() { + if (cache != null) { + cache.close(); + } + } + + private final class ExternalSessionsPathListener implements PathChildrenCacheListener { + + @Override + public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) throws Exception { + Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED, + "client is not started"); + ChildData childData = event.getData(); + if (childData == null) { + return; + } + String applicationId = getApplicationId(childData); + LOG.info("{} for external session {}", event.getType(), applicationId); + + synchronized (lock) { + switch (event.getType()) { + case CHILD_UPDATED, CHILD_ADDED: + if (available.contains(applicationId) || taken.contains(applicationId)) { + return; // We do not expect updates to existing sessions; ignore them for now. + } + available.add(applicationId); + break; + case CHILD_REMOVED: + if (taken.remove(applicationId)) { + LOG.warn("The session in use has disappeared from the registry ({})", applicationId); + } else if (!available.remove(applicationId)) { + LOG.warn("An unknown session has been removed ({})", applicationId); + } + break; + default: + // Ignore all the other events; logged above. + } + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 81ef890dcb0b..92844f4d5716 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -39,8 +39,8 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.exec.tez.Utils; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -78,7 +78,7 @@ public class TezJobMonitor { private static final int MAX_RETRY_INTERVAL = 2500; private static final int MAX_RETRY_FAILURES = (MAX_RETRY_INTERVAL / MAX_CHECK_INTERVAL) + 1; - private final TezSessionState session; + private final TezSession session; private final PerfLogger perfLogger; private static final List shutdownList; private final List topSortedWorks; @@ -120,7 +120,7 @@ public static void initShutdownHook() { // compile time tez counters private final TezCounters counters; - public TezJobMonitor(TezSessionState session, List topSortedWorks, final DAGClient dagClient, HiveConf conf, + public TezJobMonitor(TezSession session, List topSortedWorks, final DAGClient dagClient, HiveConf conf, DAG dag, Context ctx, final TezCounters counters, PerfLogger perfLogger) { this.session = session; this.topSortedWorks = topSortedWorks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java index 01dc7e2cd794..bbbd457e9ed7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java @@ -22,5 +22,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; public interface KillQuery { - void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException; + void killQuery(String queryId, String errMsg, HiveConf conf, boolean isYarn) + throws HiveException; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java index eac2936719a7..78598e6fdadc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java @@ -23,7 +23,8 @@ public class NullKillQuery implements KillQuery { @Override - public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException { + public void killQuery(String queryId, String errMsg, HiveConf conf, boolean isYarn) + throws HiveException { // Do nothing } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index d921c662ac27..53ee29743449 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -86,7 +86,9 @@ import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.Registry; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezExternalSessionState; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.history.HiveHistoryImpl; @@ -255,7 +257,7 @@ public enum AuthorizationMode{V1, V2}; private Map> localMapRedErrors; - private TezSessionState tezSessionState; + private TezSession tezSessionState; private String currentDatabase; @@ -761,9 +763,16 @@ private static void start(SessionState startSs, boolean isAsync, LogHelper conso return; } + boolean useExternalSessions = HiveConf.getBoolVar(startSs.getConf(), + ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS); + try { if (startSs.tezSessionState == null) { - startSs.setTezSession(new TezSessionState(startSs.getSessionId(), startSs.sessionConf)); + if (useExternalSessions) { + startSs.setTezSession(new TezExternalSessionState(startSs.getSessionId(), startSs.sessionConf)); + } else { + startSs.setTezSession(new TezSessionState(startSs.getSessionId(), startSs.sessionConf)); + } } else { // Only TezTask sets this, and then removes when done, so we don't expect to see it. LOG.warn("Tez session was already present in SessionState before start: " @@ -2083,23 +2092,23 @@ public static PerfLogger getPerfLogger(boolean resetPerfLogger) { } } - public TezSessionState getTezSession() { + public TezSession getTezSession() { return tezSessionState; } /** Called from TezTask to attach a TezSession to use to the threadlocal. Ugly pattern... */ - public void setTezSession(TezSessionState session) { + public void setTezSession(TezSession session) { if (tezSessionState == session) { return; // The same object. } if (tezSessionState != null) { - tezSessionState.markFree(); + tezSessionState.unsetOwnerThread(); tezSessionState.setKillQuery(null); tezSessionState = null; } tezSessionState = session; if (session != null) { - session.markInUse(); + session.setOwnerThread(); tezSessionState.setKillQuery(getKillQuery()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java index 16106f481b8a..616304f3b478 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java @@ -17,21 +17,21 @@ import java.util.List; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; /** * Implementation for providing current open sessions and active trigger. */ public class SessionTriggerProvider { - private List sessions; + private List sessions; private List triggers; - public SessionTriggerProvider(List openSessions, List triggers) { + public SessionTriggerProvider(List openSessions, List triggers) { this.sessions = openSessions; this.triggers = triggers; } - public List getSessions() { + public List getSessions() { return sessions; } @@ -39,7 +39,7 @@ public List getTriggers() { return triggers; } - public void setSessions(final List sessions) { + public void setSessions(final List sessions) { this.sessions = sessions; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java index 7995a8f639b2..da99f0957315 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java @@ -17,8 +17,6 @@ import java.util.Map; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; - /** * Interface for handling rule violations by queries and for performing actions defined in the rules. */ diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java index f108160a80f3..502ee06c2521 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; + import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; import org.apache.hadoop.hive.conf.HiveConf; @@ -31,7 +32,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.dag.api.TezException; - /** * This class is needed for writing junit tests. For testing the multi-session * use case from hive server 2, we need a session simulation. @@ -48,8 +48,9 @@ public class SampleTezSessionState extends WmTezSession { public SampleTezSessionState( String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) { - super(sessionId, parent, (parent instanceof TezSessionPoolManager) - ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf); + super(parent, (parent instanceof TezSessionPoolManager) + ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, + new TezSessionState(sessionId, conf)); this.sessionId = sessionId; this.hiveConf = conf; waitForAmRegFuture = createDefaultWaitForAmRegistryFuture(); @@ -89,7 +90,7 @@ public void open(String[] additionalFiles) throws IOException { } @Override - void close(boolean keepTmpDir) throws TezException, IOException { + public void close(boolean keepTmpDir) throws TezException, IOException { open = keepTmpDir; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionState.java new file mode 100644 index 000000000000..1d4b5b10e5ef --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionState.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import java.util.concurrent.atomic.AtomicInteger; +import java.lang.reflect.Field; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfForTest; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.TezClient; +import org.apache.tez.client.TezClient.TezClientBuilder; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.junit.Test; +import org.mockito.MockedStatic; + +public class TestTezExternalSessionState { + + private static SessionState createSessionState() { + HiveConf hiveConf = new HiveConfForTest(TestTezExternalSessionState.class); + hiveConf.set("hive.security.authorization.manager", + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory"); + return SessionState.start(hiveConf); + } + + public static class CountingExternalSessionsRegistry implements ExternalSessionsRegistry { + private final AtomicInteger getSessionCalls = new AtomicInteger(0); + private final AtomicInteger returnSessionCalls = new AtomicInteger(0); + + public CountingExternalSessionsRegistry(HiveConf conf) { + } + + @Override + public String getSession() { + int callId = getSessionCalls.incrementAndGet(); + return "application_1_" + callId; + } + + @Override + public void returnSession(String appId) { + returnSessionCalls.incrementAndGet(); + } + + @Override + public void close() { + // Testing registry; nothing specific to clean up. + } + + int getGetSessionCalls() { + return getSessionCalls.get(); + } + + int getReturnSessionCalls() { + return returnSessionCalls.get(); + } + } + + @Test + public void testConsecutiveOpenInternalCallsAreIdempotent() throws Exception { + SessionState ss = createSessionState(); + HiveConf conf = ss.getConf(); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS, + CountingExternalSessionsRegistry.class.getName()); + + TezExternalSessionState session = new TezExternalSessionState("test-session", conf); + CountingExternalSessionsRegistry registry = + (CountingExternalSessionsRegistry) ExternalSessionsRegistryFactory.getClient(conf); + assertFalse("Session should start closed", session.isOpen()); + + TezClientBuilder builder = mock(TezClientBuilder.class); + TezClient tezClient = mock(TezClient.class); + when(builder.setIsSession(anyBoolean())).thenReturn(builder); + when(builder.setCredentials(any())).thenReturn(builder); + when(builder.setServicePluginDescriptor(any(ServicePluginsDescriptor.class))).thenReturn(builder); + when(builder.build()).thenReturn(tezClient); + when(tezClient.getClientName()).thenReturn("mock-client"); + when(tezClient.getClient(any(ApplicationId.class))).thenReturn(null); + + try (MockedStatic tezClientMock = mockStatic(TezClient.class)) { + tezClientMock.when(() -> TezClient.newBuilder(anyString(), any(TezConfiguration.class))).thenReturn(builder); + + session.openInternal(null, false, null, null); + TezClient firstClient = session.getTezClient(); + assertNotNull(firstClient); + assertTrue("Session should be open after first openInternal", session.isOpen()); + String firstAppId = getExternalAppId(session); + + session.openInternal(null, false, null, null); + TezClient secondClient = session.getTezClient(); + assertNotNull(secondClient); + assertTrue("Session should remain open after second openInternal", session.isOpen()); + String secondAppId = getExternalAppId(session); + + // Idempotent behavior expectation: second openInternal should be a no-op for externalAppId. + assertSame("Consecutive openInternal calls should keep the same external app id", + firstAppId, secondAppId); + assertSame("Consecutive openInternal calls should keep the same Tez client instance", + firstClient, secondClient); + } + + assertEquals("Idempotent openInternal should only acquire one external session", + 1, registry.getGetSessionCalls()); + assertEquals("openInternal should not return sessions", 0, registry.getReturnSessionCalls()); + } + + private static String getExternalAppId(TezExternalSessionState session) throws Exception { + Field field = TezExternalSessionState.class.getDeclaredField("externalAppId"); + field.setAccessible(true); + return (String) field.get(session); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionsRegistryClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionsRegistryClient.java new file mode 100644 index 000000000000..b0ee7042a122 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionsRegistryClient.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Test; + +public class TestTezExternalSessionsRegistryClient { + @Test + public void testDummyExternalSessionsRegistry() { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS, + DummyExternalSessionsRegistry.class.getName()); + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released. + conf.set("tez.am.registry.namespace", "dummy"); + ExternalSessionsRegistry externalSessionsRegistry = ExternalSessionsRegistryFactory.getClient(conf); + assertEquals(DummyExternalSessionsRegistry.class, externalSessionsRegistry.getClass()); + } + + @Test + public void testTezExternalSessionsRegistry() { + HiveConf conf = new HiveConf(); + // TODO: change this to TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM after Tez 1.0.0 is released. + conf.set("tez.am.zookeeper.quorum", "test-quorum"); + // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released. + conf.set("tez.am.registry.namespace", "tez"); + conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS, + ZookeeperExternalSessionsRegistryClient.class.getName()); + ExternalSessionsRegistry externalSessionsRegistry = ExternalSessionsRegistryFactory.getClient(conf); + assertEquals(ZookeeperExternalSessionsRegistryClient.class, externalSessionsRegistry.getClass()); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index ee32d166101f..0729584aa338 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -22,7 +22,6 @@ import static org.junit.Assert.fail; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Random; @@ -67,13 +66,13 @@ public void setUp() { public void testGetNonDefaultSession() { poolManager = new TestTezSessionPoolManager(); try { - TezSessionState sessionState = poolManager.getSession(null, conf, true, false); - TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true, false); + TezSession sessionState = poolManager.getSession(null, conf, true, false); + TezSession sessionState1 = poolManager.getSession(sessionState, conf, true, false); if (sessionState1 != sessionState) { fail(); } conf.set("tez.queue.name", "nondefault"); - TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true, false); + TezSession sessionState2 = poolManager.getSession(sessionState, conf, true, false); if (sessionState2 == sessionState) { fail(); } @@ -97,7 +96,7 @@ public void testSessionPoolGetInOrder() { // this is now a LIFO operation // draw 1 and replace - TezSessionState sessionState = poolManager.getSession(null, conf, true, false); + TezSession sessionState = poolManager.getSession(null, conf, true, false); assertEquals("a", sessionState.getQueueName()); poolManager.returnSession(sessionState); @@ -108,13 +107,13 @@ public void testSessionPoolGetInOrder() { // [a,b,c,a,b,c] // draw 2 and return in order - further run should return last returned - TezSessionState first = poolManager.getSession(null, conf, true, false); - TezSessionState second = poolManager.getSession(null, conf, true, false); + TezSession first = poolManager.getSession(null, conf, true, false); + TezSession second = poolManager.getSession(null, conf, true, false); assertEquals("a", first.getQueueName()); assertEquals("b", second.getQueueName()); poolManager.returnSession(first); poolManager.returnSession(second); - TezSessionState third = poolManager.getSession(null, conf, true, false); + TezSession third = poolManager.getSession(null, conf, true, false); assertEquals("b", third.getQueueName()); poolManager.returnSession(third); @@ -157,7 +156,7 @@ public void testSessionPoolThreads() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); poolManager.startPool(conf, null); - TezSessionState[] sessions = new TezSessionState[12]; + TezSession[] sessions = new TezSession[12]; int[] queueCounts = new int[3]; for (int i = 0; i < sessions.length; ++i) { sessions[i] = poolManager.getSession(null, conf, true, false); @@ -184,7 +183,7 @@ public void testSessionReopen() { conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 1); poolManager = new TestTezSessionPoolManager(); - TezSessionState session = Mockito.mock(TezSessionState.class); + TezSession session = Mockito.mock(TezSession.class); Mockito.when(session.getQueueName()).thenReturn("default"); Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); @@ -192,7 +191,7 @@ public void testSessionReopen() { poolManager.reopen(session); Mockito.verify(session).close(true); - Mockito.verify(session).open(Mockito.any()); + Mockito.verify(session).open(Mockito.any()); // mocked session starts with default queue assertEquals("default", session.getQueueName()); @@ -278,7 +277,7 @@ public void run() { tmpConf.set("tez.queue.name", ""); } - TezSessionState session = poolManager.getSession(null, tmpConf, true, llap); + TezSession session = poolManager.getSession(null, tmpConf, true, llap); Thread.sleep((random.nextInt(9) % 10) * 1000); session.setLegacyLlapMode(llap); poolManager.returnSession(session); @@ -323,20 +322,20 @@ public void testReturn() { @Test public void testCloseAndOpenDefault() throws Exception { poolManager = new TestTezSessionPoolManager(); - TezSessionState session = Mockito.mock(TezSessionState.class); + TezSession session = Mockito.mock(TezSession.class); Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); poolManager.reopen(session); Mockito.verify(session).close(true); - Mockito.verify(session).open(Mockito.any()); + Mockito.verify(session).open(Mockito.any()); } @Test public void testSessionDestroy() throws Exception { poolManager = new TestTezSessionPoolManager(); - TezSessionState session = Mockito.mock(TezSessionState.class); + TezSession session = Mockito.mock(TezSession.class); Mockito.when(session.isDefault()).thenReturn(false); poolManager.destroy(session); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPoolManagerMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPoolManagerMetrics.java index 3c79a34bee08..a34a1e2a27b9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPoolManagerMetrics.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPoolManagerMetrics.java @@ -42,7 +42,7 @@ public class TestTezSessionPoolManagerMetrics { @Test public void testRefreshMetrics(){ TezSessionPoolManager poolManager = mock(TezSessionPoolManager.class); - List sessions = testSessions(); + List sessions = testSessions(); when(poolManager.getSessions()).thenReturn(sessions); TezSessionPoolManagerMetrics metrics = new TezSessionPoolManagerMetrics(poolManager); @@ -56,7 +56,7 @@ public void testRefreshMetrics(){ @Test public void testBasicMetricsUpdate() { TezSessionPoolManager poolManager = mock(TezSessionPoolManager.class); - List sessions = testSessions(); + List sessions = testSessions(); when(poolManager.getSessions()).thenReturn(sessions); TezSessionPoolManagerMetrics metrics = new TezSessionPoolManagerMetrics(poolManager); @@ -93,9 +93,9 @@ public void testMetricsBeforeFirstUpdateAndStart(){ Assert.assertEquals(0.0, metrics.taskBacklogRatio.value, 0.0001); } - private List testSessions() { - List sessions = new ArrayList<>(); - TezSessionState session = mock(TezSessionState.class); + private List testSessions() { + List sessions = new ArrayList<>(); + TezSession session = mock(TezSession.class); when(session.getMetrics()).thenReturn(testMetrics()); IntStream.range(0, NO_OF_SESSIONS).forEach(i -> sessions.add(session)); return sessions; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java index bafd0e547dc1..ffd1081d56f6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java @@ -75,7 +75,8 @@ public void testScratchDirDeletedInTheEventOfExceptionWhileOpeningSession() thro TezSessionState sessionState = new TezSessionState(ss.getSessionId(), hiveConf) { @Override - void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) throws TezException, IOException { + void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) + throws TezException, IOException { super.openInternalUnsafe(isAsync, console); // save scratch dir here as it's nullified while calling the cleanup scratchDirPath.set(tezScratchDir.toUri().getPath()); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 5afa13c6da30..fa71845ece27 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -178,7 +178,7 @@ public Edge answer(InvocationOnMock invocation) throws Throwable { SessionState.start(hiveConf); session = mock(TezClient.class); sessionState = mock(TezSessionState.class); - when(sessionState.getSession()).thenReturn(session); + when(sessionState.getTezClient()).thenReturn(session); when(sessionState.reopen()).thenReturn(sessionState); when(session.submitDAG(any(DAG.class))) .thenThrow(new SessionNotRunning("")) @@ -240,7 +240,7 @@ public void testSubmitOnNonPoolSession() throws Exception { TezSessionState tezSessionState = mock(TezSessionState.class); TezClient tezClient = mock(TezClient.class); when(tezSessionState.reopen()).thenThrow(new HiveException("Dag cannot be submitted")); - when(tezSessionState.getSession()).thenReturn(tezClient); + when(tezSessionState.getTezClient()).thenReturn(tezClient); when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); doNothing().when(tezSessionState).destroy(); boolean isException = false; @@ -264,7 +264,7 @@ public void testSubmitOnPoolSession() throws Exception { TezClient tezClient = mock(TezClient.class); when(tezSessionPoolSession.reopen()).thenThrow(new HiveException("Dag cannot be submitted")); doNothing().when(tezSessionPoolSession).returnToSessionManager(); - when(tezSessionPoolSession.getSession()).thenReturn(tezClient); + when(tezSessionPoolSession.getTezClient()).thenReturn(tezClient); when(tezSessionPoolSession.isDefault()).thenReturn(true); when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); boolean isException = false; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 8ce58bb45cc2..082a8b8b97d4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -62,7 +62,6 @@ @RunWith(RetryTestRunner.class) public class TestWorkloadManager { - @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class); private final class GetSessionRunnable implements Runnable { @@ -227,7 +226,7 @@ protected WmTezSession createSessionObject(String sessionId, HiveConf conf) { @Override public WmTezSession getSession( - TezSessionState session, MappingInput input, HiveConf conf, + TezSession session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception { // We want to wait for the iteration to finish and set the cluster fraction. WmTezSession state = super.getSession(session, input, conf, null); @@ -236,7 +235,7 @@ public WmTezSession getSession( } @Override - public void destroy(TezSessionState session) throws Exception { + public void destroy(TezSession session) throws Exception { super.destroy(session); ensureWm(); } @@ -252,7 +251,7 @@ public void returnAfterUse(TezSessionPoolSession session) throws Exception { } @Override - public TezSessionState reopen(TezSessionState session) throws Exception { + public TezSession reopen(TezSession session) throws Exception { session = super.reopen(session); ensureWm(); return session; @@ -269,10 +268,10 @@ public void testReuse() throws Exception { MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); - TezSessionState nonPool = mock(TezSessionState.class); + TezSession nonPool = mock(TezSession.class); when(nonPool.getConf()).thenReturn(conf); doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, mappingInput("user"), conf); + TezSession session = wm.getSession(nonPool, mappingInput("user"), conf); verify(nonPool).close(anyBoolean()); assertNotSame(nonPool, session); session.returnToSessionManager(); @@ -282,7 +281,7 @@ public void testReuse() throws Exception { session = wm.getSession(diffPool, mappingInput("user"), conf); verify(diffPool).returnToSessionManager(); assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, mappingInput("user"), conf); + TezSession session2 = wm.getSession(session, mappingInput("user"), conf); assertSame(session, session2); } @@ -294,7 +293,7 @@ public void testQueueName() throws Exception { wm.start(); // The queue should be ignored. conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, mappingInput("user"), conf); + TezSession session = wm.getSession(null, mappingInput("user"), conf); assertEquals("test", session.getQueueName()); assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); session.setQueueName("test2"); @@ -415,7 +414,7 @@ public void testMappings() throws Exception { verifyMapping(wm, conf, mappingInput("zzz", groups("g0", "g1"), "g1"), "g1"); // Explicit pool specification - invalid - there's no mapping that matches. try { - TezSessionState r = wm.getSession( + TezSession r = wm.getSession( null, mappingInput("u0", groups("g0", "g1"), "u2"), conf); fail("Expected failure, but got " + r); } catch (Exception ex) { @@ -428,7 +427,7 @@ public void testMappings() throws Exception { verifyMapping(wm, conf, mappingInput("u0", groups("g0", "g1"), "u2"), "u2"); // The mapping that doesn't exist still shouldn't work. try { - TezSessionState r = wm.getSession( + TezSession r = wm.getSession( null, mappingInput("u0", groups("g0", "g1"), "zzz"), conf); fail("Expected failure, but got " + r); } catch (Exception ex) { @@ -824,7 +823,7 @@ public void testDisableEnable() throws Exception { assertEquals(0, tezAmPool.getCurrentSize()); try { - TezSessionState r = wm.getSession(null, mappingInput("A", null), conf, null); + TezSession r = wm.getSession(null, mappingInput("A", null), conf, null); fail("Expected an error but got " + r); } catch (WorkloadManager.NoPoolMappingException ex) { // Ignore, this particular error is expected. @@ -1316,7 +1315,7 @@ public void testAsyncSessionInitFailures() throws Exception { SettableFuture failedWait = SettableFuture.create(); failedWait.setException(new Exception("foo")); theOnlySession.setWaitForAmRegistryFuture(failedWait); - TezSessionState retriedSession = wm.getSession(null, mappingInput("A"), conf); + TezSession retriedSession = wm.getSession(null, mappingInput("A"), conf); assertNotNull(retriedSession); assertNotSame(theOnlySession, retriedSession); // Should have been replaced. retriedSession.returnToSessionManager(); @@ -1326,7 +1325,7 @@ public void testAsyncSessionInitFailures() throws Exception { theOnlySession.setWaitForAmRegistryFuture(failedWait); wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry. try { - TezSessionState r = wm.getSession(null, mappingInput("A"), conf); + TezSession r = wm.getSession(null, mappingInput("A"), conf); fail("Expected an error but got " + r); } catch (Exception ex) { // Expected. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java new file mode 100644 index 000000000000..8274e87187b0 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.junit.Test; + +/** + * Tests for {@link ZookeeperExternalSessionsRegistryClient}. + */ +public class TestZookeeperExternalSessionsRegistryClient { + + /** + * Integration-style unit test that ensures {@link ZookeeperExternalSessionsRegistryClient} + * can discover sessions from ZooKeeper and hand them out via {@link ExternalSessionsRegistry#getSession()}. + */ + @Test + public void testGetAndReturnSession() throws Exception { + CuratorFramework client = null; + ZookeeperExternalSessionsRegistryClient registry = null; + + try (TestingServer server = new TestingServer()) { + String connectString = server.getConnectString(); + + HiveConf conf = new HiveConf(); + conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns"); + conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 1); + + String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + client = builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + + client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_1"); + client.create().forPath(effectivePath + "/app_2"); + + registry = new ZookeeperExternalSessionsRegistryClient(conf); + + String first = registry.getSession(); + assertTrue("app_1".equals(first) || "app_2".equals(first)); + + registry.returnSession(first); + String second = registry.getSession(); + assertNotNull(second); + assertTrue("app_1".equals(second) || "app_2".equals(second)); + } finally { + if (registry != null) { + registry.close(); + } + if (client != null) { + client.close(); + } + } + } + + /** + * Verifies that the same external session ID can be obtained and returned + * multiple times in a row from {@link ZookeeperExternalSessionsRegistryClient}. + */ + @Test + public void testReuseSameSession() throws Exception { + CuratorFramework client = null; + ZookeeperExternalSessionsRegistryClient registry = null; + + try (TestingServer server = new TestingServer()) { + String connectString = server.getConnectString(); + + HiveConf conf = new HiveConf(); + conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_reuse"); + conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 1); + + String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + client = builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + + client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_reuse"); + + registry = new ZookeeperExternalSessionsRegistryClient(conf); + + String first = registry.getSession(); + assertEquals("app_reuse", first); + + registry.returnSession(first); + String second = registry.getSession(); + assertEquals("app_reuse", second); + + registry.returnSession(second); + String third = registry.getSession(); + assertEquals("app_reuse", third); + } finally { + if (registry != null) { + registry.close(); + } + if (client != null) { + client.close(); + } + } + } +} + diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java index a237bf16a98e..5d9a7649465f 100644 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -165,16 +165,16 @@ public boolean isLocalQuery(String queryIdOrTag) { } @Override - public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) throws HiveException { - killQuery(queryIdOrTag, errMsg, conf, false, SessionState.get().getUserName(), isAdmin()); + public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolean isYarn) throws HiveException { + killQuery(queryIdOrTag, errMsg, conf, false, SessionState.get().getUserName(), isAdmin(), isYarn); } public void killLocalQuery(String queryIdOrTag, HiveConf conf, String doAs, boolean doAsAdmin) throws HiveException { - killQuery(queryIdOrTag, null, conf, true, doAs, doAsAdmin); + killQuery(queryIdOrTag, null, conf, true, doAs, doAsAdmin, true); } private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolean onlyLocal, String doAs, - boolean doAsAdmin) throws HiveException { + boolean doAsAdmin, boolean isYarn) throws HiveException { errMsg = StringUtils.defaultString(errMsg, KillQueriesOperation.KILL_QUERY_MESSAGE); TagOrId tagOrId = TagOrId.UNKNOWN; Set operationsToKill = new HashSet<>(); @@ -190,7 +190,7 @@ private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolea } } if (!operationsToKill.isEmpty()) { - killOperations(queryIdOrTag, errMsg, conf, tagOrId, operationsToKill, doAs, doAsAdmin); + killOperations(queryIdOrTag, errMsg, conf, tagOrId, operationsToKill, doAs, doAsAdmin, isYarn); } else { LOG.debug("Query not found with tag/id: {}", queryIdOrTag); if (!onlyLocal && killQueryZookeeperManager != null && @@ -210,7 +210,7 @@ private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolea } private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, TagOrId tagOrId, - Set operationsToKill, String doAs, boolean doAsAdmin) throws HiveException { + Set operationsToKill, String doAs, boolean doAsAdmin, boolean isYarn) throws HiveException { try { switch (tagOrId) { case ID: @@ -221,7 +221,9 @@ private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, T if (queryTag == null) { queryTag = queryIdOrTag; } - killChildYarnJobs(conf, queryTag, doAs, doAsAdmin); + if (isYarn) { + killChildYarnJobs(conf, queryTag, doAs, doAsAdmin); + } } else { // no privilege to cancel throw new HiveSQLException("No privilege to kill query id"); @@ -237,7 +239,9 @@ private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, T if (numCanceled == 0) { throw new HiveSQLException("No privilege to kill query tag"); } else { - killChildYarnJobs(conf, queryIdOrTag, doAs, doAsAdmin); + if (isYarn) { + killChildYarnJobs(conf, queryIdOrTag, doAs, doAsAdmin); + } } break; case UNKNOWN: