entry : queriesViolated.entrySet()) {
+ if (entry.getValue().getAction().getType() == Action.Type.KILL_QUERY) {
+ TezSession sessionState = entry.getKey();
+ String queryId = sessionState.getWmContext().getQueryId();
+ try {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName());
+ boolean wasKilled = sessionState.killQuery(entry.getValue().getViolationMsg());
+ if (!wasKilled) {
+ LOG.info("Didn't kill the query {}", queryId);
+ }
+ } catch (HiveException | IOException e) {
+ LOG.warn("Unable to kill query {} for trigger violation", queryId, e);
}
+ } else {
+ throw new RuntimeException("Unsupported action: " + entry.getValue());
+ }
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java
new file mode 100644
index 000000000000..b3103d3f5918
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+
+/**
+ * A {@code TezSession} implementation that represents externally managed Tez sessions.
+ *
+ * Unlike {@code TezSessionState}, these sessions are not created or owned by HiveServer2.
+ * Instead, HiveServer2 connects to an already existing Tez session.
+ *
+ *
+ * Lifecycle
+ *
+ * - An instance of {@code TezExternalSessionState} is created.
+ * - An artificial application ID is acquired from a registry. This does not
+ * correspond to a real YARN application, as the session is unmanaged.
+ * - A {@code TezClient} is instantiated but not started (unlike in
+ * {@code TezSessionState}), allowing the rest of the Hive codebase to
+ * interact with it transparently.
+ *
+ *
+ *
+ * This abstraction enables Hive components to interact with external Tez
+ * sessions using the same interfaces as internally managed sessions.
+ *
+ */
+public class TezExternalSessionState extends TezSessionState {
+ private static final Object DEFAULT_CONF_CREATE_LOCK = new Object();
+ private static volatile TezConfiguration defaultTezConfiguration;
+
+ private String externalAppId;
+ private volatile boolean isOpen = false;
+ private volatile boolean isDestroying = false;
+ private final ExternalSessionsRegistry registry;
+
+ public TezExternalSessionState(String sessionId, HiveConf conf) {
+ super(sessionId, conf);
+ this.registry = ExternalSessionsRegistryFactory.getClient(conf);
+ synchronized (DEFAULT_CONF_CREATE_LOCK) {
+ if (defaultTezConfiguration == null) {
+ defaultTezConfiguration = createDefaultTezConfig();
+ }
+ }
+ }
+
+ @Override
+ public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) {
+ /*
+ * No-op implementation.
+ * External Tez sessions are not backed by a YARN application and therefore
+ * do not manage or localize resources. As a result, there are no local
+ * resources to ensure for this session type.
+ */
+ }
+
+ @Override
+ protected void openInternal(String[] additionalFilesNotFromConf,
+ boolean isAsync, LogHelper console, HiveResources resources)
+ throws IOException, TezException {
+ if (isOpen) {
+ LOG.info("External Tez session {} is already open, skipping duplicate openInternal call", getSessionId());
+ return;
+ }
+
+ initQueueAndUser();
+
+ boolean llapMode = isLlapMode();
+
+ TezConfiguration tezConfig = new TezConfiguration(defaultTezConfiguration);
+ setupSessionAcls(tezConfig, conf);
+ ServicePluginsDescriptor spd = createServicePluginDescriptor(llapMode, tezConfig);
+ Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig);
+
+ final TezClient sessionTezClient = TezClient.newBuilder("HIVE-" + getSessionId(), tezConfig)
+ .setIsSession(true)
+ .setCredentials(llapCredentials).setServicePluginDescriptor(spd)
+ .build();
+
+ LOG.info("Opening new External Tez Session (id: {})", getSessionId());
+ TezJobMonitor.initShutdownHook();
+
+ // External sessions doesn't support async mode (getClient should be much cheaper than open,
+ // and the async mode is anyway only used for CLI).
+ if (isAsync) {
+ LOG.info("Ignoring the async argument for an external session {}", getSessionId());
+ }
+ try {
+ externalAppId = registry.getSession();
+ } catch (TezException | IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ sessionTezClient.getClient(ApplicationId.fromString(externalAppId));
+ LOG.info("Started an external session; client name {}, app ID {}", sessionTezClient.getClientName(), externalAppId);
+ setTezClient(sessionTezClient);
+ isOpen = true;
+ }
+
+ @Override
+ public void close(boolean keepDagFilesDir) throws Exception {
+ // We never close external sessions that don't have errors.
+ try {
+ if (externalAppId != null) {
+ registry.returnSession(externalAppId);
+ }
+ } catch (Exception e) {
+ LOG.warn("Caught exception while trying to return external session {}, moving on with session state closure",
+ externalAppId, e);
+ }
+
+ externalAppId = null;
+ isOpen = false;
+ if (isDestroying) {
+ super.close(keepDagFilesDir);
+ }
+ }
+
+ @Override
+ public TezSession reopen() throws Exception {
+ isDestroying = true;
+ // Reopen will actually close this session, and get a new external app.
+ // It could instead somehow communicate to the external manager that the session is bad.
+ return super.reopen();
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ isDestroying = true;
+ // This will actually close the session. We assume the external manager will restart it.
+ // It could instead somehow communicate to the external manager that the session is bad.
+ super.destroy();
+ }
+
+ @Override
+ public boolean isOpen(){
+ return isOpen;
+ }
+
+ @Override
+ public boolean killQuery(String reason) throws HiveException {
+ if (killQuery == null || wmContext == null) {
+ return false;
+ }
+ String queryId = wmContext.getQueryId();
+ if (queryId == null) {
+ return false;
+ }
+ LOG.info("Killing the query {}: {}", queryId, reason);
+ killQuery.killQuery(queryId, reason, conf, false);
+ return true;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java
index f716c5c1eb28..cfd4116048af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java
@@ -41,7 +41,7 @@ public class TezRuntimeContext {
// llap/container
private String executionMode;
- public void init(TezSessionState sessionState) {
+ public void init(TezSession sessionState) {
this.amAddress = sessionState.getAppMasterUri();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java
new file mode 100644
index 000000000000..68844bd81728
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.wm.WmContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGStatus;
+
+/**
+ * A bogus interface that basically describes the evolved usage patterns of TezSessionStateImpl.
+ * Needed due to lack of multiple inheritance in Java; probably good to have, too - may make
+ * TezSessionState interface a little bit clearer or even encourage some future cleanup.
+ *
+ * It's implemented in two ways - core implementations (regular session, external session),
+ * and extra functionality implementation (pool session, WM session, etc.) that wraps an instance
+ * of the core implementation (i.e. use composition). With MI, each session type would just inherit
+ * from one of each.
+ */
+public interface TezSession {
+ final class HiveResources {
+ public HiveResources(Path dagResourcesDir) {
+ this.dagResourcesDir = dagResourcesDir;
+ }
+ /** A directory that will contain resources related to DAGs and specified in configs. */
+ final Path dagResourcesDir;
+ final Map additionalFilesNotFromConf = new HashMap<>();
+ /** Localized resources of this session; both from conf and not from conf (above). */
+ final Set localizedResources = new HashSet<>();
+
+ @Override
+ public String toString() {
+ return dagResourcesDir + "; " + additionalFilesNotFromConf.size() + " additional files, "
+ + localizedResources.size() + " localized resources";
+ }
+ }
+
+ // Core session operations.
+ void open() throws IOException, TezException;
+ void open(HiveResources resources) throws IOException, TezException;
+ void open(String[] additionalFilesNotFromConf) throws IOException, TezException;
+ void beginOpen(String[] additionalFiles, LogHelper console) throws IOException, TezException;
+ void endOpen() throws InterruptedException, CancellationException;
+ TezSession reopen() throws Exception;
+ void destroy() throws Exception;
+ void close(boolean keepTmpDir) throws Exception;
+ void returnToSessionManager() throws Exception;
+
+ /** This is called during open and update (i.e. internally and externally) to localize conf resources. */
+ void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) throws IOException;
+ HiveResources extractHiveResources();
+ Path replaceHiveResources(HiveResources resources, boolean isAsync);
+
+ List getLocalizedResources();
+ LocalResource getAppJarLr();
+
+ HiveConf getConf();
+ TezClient getTezClient();
+ boolean isOpen();
+ boolean isOpening();
+ boolean getDoAsEnabled();
+ String getSessionId();
+ String getUser();
+ WmContext getWmContext(); // Necessary for triggers, even for non-WM sessions.
+ void setWmContext(WmContext ctx);
+ void setQueueName(String queueName);
+ String getQueueName();
+ void setDefault();
+ boolean isDefault();
+ boolean getLegacyLlapMode();
+ void setLegacyLlapMode(boolean b);
+ void unsetOwnerThread();
+ void setOwnerThread();
+ KillQuery getKillQuery();
+ void setKillQuery(KillQuery kq);
+ boolean killQuery(String reason) throws HiveException;
+ String getAppMasterUri();
+ default Map getMetrics() {
+ return new HashMap<>();
+ }
+ default void updateDagStatus(DAGStatus dagStatus) {
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 33d4210fb226..ae113ff38a3f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -103,7 +103,13 @@ public interface SessionObjectFactory {
this.deltaRemaining = new AtomicInteger(initialSize);
- final int threadCount = HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS);
+ int threadCount = 1;
+ if (!HiveConf.getBoolVar(initConf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS)) {
+ // Don't use multiple threads for external sessions.
+ threadCount = Math.min(initialSize,
+ HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
+ }
+
this.executorService = MoreExecutors
.listeningDecorator(new ThreadPoolExecutor(0, threadCount, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("tez-session-init-%d").build()));
@@ -488,7 +494,7 @@ int getCurrentSize() {
/**
* Should be called when the session is no longer needed, to remove it from bySessionId.
*/
- public void notifyClosed(TezSessionState session) {
+ public void notifyClosed(SessionType session) {
bySessionId.remove(session.getSessionId());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 70a57a52531d..605a92ebc8f5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -55,7 +55,7 @@
* In case the user specifies a queue explicitly, a new session is created
* on that queue and assigned to the session state.
*/
-public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTriggerValidator
+public class TezSessionPoolManager extends AbstractTriggerValidator
implements Manager, SessionExpirationTracker.RestartImpl {
private enum CustomQueueAllowed {
@@ -85,13 +85,14 @@ private enum CustomQueueAllowed {
private static TezSessionPoolManager instance = null;
/** This is used to close non-default sessions, and also all sessions when stopping. */
- private final List openSessions = new LinkedList<>();
+ private final List openSessions = new LinkedList<>();
private SessionTriggerProvider sessionTriggerProvider;
private TriggerActionHandler> triggerActionHandler;
private TriggerValidatorRunnable triggerValidatorRunnable;
private YarnQueueHelper yarnQueueChecker;
private TezSessionPoolManagerMetrics metrics = null;
+ private boolean useExternalSessions;
/** Note: this is not thread-safe. */
public static TezSessionPoolManager getInstance() {
@@ -200,6 +201,8 @@ public void setupNonPool(HiveConf conf) {
this.yarnQueueChecker = new YarnQueueHelper(conf);
}
+ useExternalSessions = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS);
+
restrictedConfig = new RestrictedConfigChecker(conf);
}
@@ -232,7 +235,7 @@ private TezSessionPoolSession createAndInitSession(
return sessionState;
}
- private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Exception {
+ private TezSession getSession(HiveConf conf, boolean doOpen) throws Exception {
// NOTE: this can be called outside of HS2, without calling setupPool. Basically it should be
// able to handle not being initialized. Perhaps we should get rid of the instance and
// move the setupPool code to ctor. For now, at least hasInitialSessions will be false.
@@ -312,7 +315,7 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Excepti
* @return
* @throws Exception
*/
- private TezSessionState getNewSessionState(HiveConf conf,
+ private TezSession getNewSessionState(HiveConf conf,
String queueName, boolean doOpen) throws Exception {
TezSessionPoolSession retTezSessionState = createAndInitSession(queueName, false, conf);
if (queueName != null) {
@@ -331,7 +334,7 @@ public void returnAfterUse(TezSessionPoolSession session) throws Exception {
returnSession(session);
}
- void returnSession(TezSessionState tezSessionState) throws Exception {
+ void returnSession(TezSession tezSessionState) {
// Ignore the interrupt status while returning the session, but set it back
// on the thread in case anything else needs to deal with it.
boolean isInterrupted = Thread.interrupted();
@@ -361,7 +364,7 @@ void returnSession(TezSessionState tezSessionState) throws Exception {
}
public static void closeIfNotDefault(
- TezSessionState tezSessionState, boolean keepTmpDir) throws Exception {
+ TezSession tezSessionState, boolean keepTmpDir) throws Exception {
LOG.info("Closing tez session if not default: " + tezSessionState);
if (!tezSessionState.isDefault()) {
tezSessionState.close(keepTmpDir);
@@ -372,13 +375,13 @@ public void stop() throws Exception {
if ((instance == null) || !this.hasInitialSessions) {
return;
}
- List sessionsToClose = null;
+ List sessionsToClose = null;
synchronized (openSessions) {
- sessionsToClose = new ArrayList(openSessions);
+ sessionsToClose = new ArrayList<>(openSessions);
}
// we can just stop all the sessions
- for (TezSessionState sessionState : sessionsToClose) {
+ for (TezSession sessionState : sessionsToClose) {
if (sessionState.isDefault()) {
sessionState.close(false);
}
@@ -405,7 +408,7 @@ public void stop() throws Exception {
* @throws Exception
*/
@Override
- public void destroy(TezSessionState tezSessionState) throws Exception {
+ public void destroy(TezSession tezSessionState) throws Exception {
LOG.warn("We are closing a " + (tezSessionState.isDefault() ? "default" : "non-default")
+ " session because of retry failure.");
tezSessionState.close(false);
@@ -417,7 +420,8 @@ TriggerValidatorRunnable getTriggerValidatorRunnable() {
}
protected TezSessionPoolSession createSession(String sessionId, HiveConf conf) {
- return new TezSessionPoolSession(sessionId, this, expirationTracker, conf);
+ TezSessionState base = TezSessionState.create(sessionId, conf, useExternalSessions);
+ return new TezSessionPoolSession(this, expirationTracker, base);
}
/*
@@ -426,7 +430,7 @@ protected TezSessionPoolSession createSession(String sessionId, HiveConf conf) {
* sessions for e.g. when a CLI session is started. The CLI session could re-use the
* same tez session eliminating the latencies of new AM and containers.
*/
- private static boolean canWorkWithSameSession(TezSessionState session, HiveConf conf)
+ private static boolean canWorkWithSameSession(TezSession session, HiveConf conf)
throws HiveException {
if (session == null || conf == null || !session.isOpen()) {
return false;
@@ -452,7 +456,8 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf
boolean doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
// either variables will never be null because a default value is returned in case of absence
- if (doAsEnabled != session.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+ if (doAsEnabled != session.getConf().getBoolVar(
+ HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
return false;
}
@@ -469,12 +474,12 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf
return (queueName == null) ? confQueueName == null : queueName.equals(confQueueName);
} else {
// this session should never be a default session unless something has messed up.
- throw new HiveException("The pool session " + session + " should have been returned to the pool");
+ throw new HiveException("The pool session " + session + " should have been returned to the pool");
}
}
- public TezSessionState getSession(
- TezSessionState session, HiveConf conf, boolean doOpen, boolean llap) throws Exception {
+ public TezSession getSession(
+ TezSession session, HiveConf conf, boolean doOpen, boolean llap) throws Exception {
if (llap && (this.numConcurrentLlapQueries > 0)) {
llapQueue.acquire(); // blocks if no more llap queries can be submitted.
}
@@ -495,7 +500,7 @@ public TezSessionState getSession(
/** Reopens the session that was found to not be running. */
@Override
- public TezSessionState reopen(TezSessionState sessionState) throws Exception {
+ public TezSession reopen(TezSession sessionState) throws Exception {
HiveConf sessionConf = sessionState.getConf();
if (sessionState.getQueueName() != null
&& sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
@@ -506,7 +511,7 @@ public TezSessionState reopen(TezSessionState sessionState) throws Exception {
}
static void reopenInternal(
- TezSessionState sessionState) throws Exception {
+ TezSession sessionState) throws Exception {
// TODO: close basically resets the object to a bunch of nulls.
// We should ideally not reuse the object because it's pointless and error-prone.
sessionState.close(true);
@@ -516,11 +521,11 @@ static void reopenInternal(
public void closeNonDefaultSessions() throws Exception {
- List sessionsToClose = null;
+ List sessionsToClose;
synchronized (openSessions) {
- sessionsToClose = new ArrayList(openSessions);
+ sessionsToClose = new ArrayList<>(openSessions);
}
- for (TezSessionState sessionState : sessionsToClose) {
+ for (TezSession sessionState : sessionsToClose) {
System.err.println("Shutting down tez session.");
closeIfNotDefault(sessionState, false);
}
@@ -600,7 +605,7 @@ List getTriggerCounterNames() {
return counterNames;
}
- public List getSessions() {
+ public List getSessions() {
return new LinkedList<>(openSessions);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManagerMetrics.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManagerMetrics.java
index 6e153b83ea8d..d33bc67c0a15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManagerMetrics.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManagerMetrics.java
@@ -130,7 +130,7 @@ public void stop() {
}
void collectMetrics() {
- List sessions = poolManager.getSessions();
+ List sessions = poolManager.getSessions();
LOG.debug("Updating metrics, session count: {}", sessions.size());
if (sessions.isEmpty()) {
@@ -148,7 +148,7 @@ void collectMetrics() {
new ThreadFactoryBuilder().setNameFormat("TezSessionPoolManagerMetrics collector thread - #%d").build());
long start = Time.monotonicNow();
- for (TezSessionState session : sessions) {
+ for (TezSession session : sessions) {
collectTasks.add(CompletableFuture.runAsync(() -> {
Map metrics = session.getMetrics();
LOG.debug("Achieved metrics from Tez session ({}): {}", session.getSessionId(), metrics);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 49a3211e60f0..415072f221da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -19,19 +19,25 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.KillQuery;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.TezException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TezSession that is aware of the session pool, and also keeps track of expiration and use.
@@ -41,57 +47,32 @@
* if it's time, the expiration is triggered; in that case, or if it was already triggered, the
* caller gets a different session. When the session is in use when it expires, the expiration
* thread ignores it and lets the return to the pool take care of the expiration.
+ *
+ * Because of the lack of multiple inheritance in Java, this uses composition.
*/
-@VisibleForTesting
-class TezSessionPoolSession extends TezSessionState {
+public class TezSessionPoolSession implements TezSession {
+ protected static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolSession.class);
private static final int STATE_NONE = 0, STATE_IN_USE = 1, STATE_EXPIRED = 2;
public interface Manager {
void registerOpenSession(TezSessionPoolSession session);
-
void unregisterOpenSession(TezSessionPoolSession session);
-
void returnAfterUse(TezSessionPoolSession session) throws Exception;
-
- TezSessionState reopen(TezSessionState session) throws Exception;
-
- void destroy(TezSessionState session) throws Exception;
- }
-
- public static abstract class AbstractTriggerValidator {
- private ScheduledExecutorService scheduledExecutorService = null;
- abstract Runnable getTriggerValidatorRunnable();
-
- void startTriggerValidator(long triggerValidationIntervalMs) {
- if (scheduledExecutorService == null) {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build());
- Runnable triggerValidatorRunnable = getTriggerValidatorRunnable();
- scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs,
- triggerValidationIntervalMs, TimeUnit.MILLISECONDS);
- LOG.info("Started trigger validator with interval: {} ms", triggerValidationIntervalMs);
- }
- }
-
- void stopTriggerValidator() {
- if (scheduledExecutorService != null) {
- scheduledExecutorService.shutdownNow();
- scheduledExecutorService = null;
- LOG.info("Stopped trigger validator");
- }
- }
+ TezSession reopen(TezSession session) throws Exception;
+ void destroy(TezSession session) throws Exception;
}
private final AtomicInteger sessionState = new AtomicInteger(STATE_NONE);
private Long expirationNs;
- private final Manager parent;
+ private final Manager manager;
private final SessionExpirationTracker expirationTracker;
+ private final TezSession baseSession;
- public TezSessionPoolSession(String sessionId, Manager parent,
- SessionExpirationTracker tracker, HiveConf conf) {
- super(sessionId, conf);
- this.parent = parent;
+ public TezSessionPoolSession(Manager manager,
+ SessionExpirationTracker tracker, TezSession baseSession) {
+ this.baseSession = baseSession;
+ this.manager = manager;
this.expirationTracker = tracker;
}
@@ -104,11 +85,11 @@ Long getExpirationNs() {
}
@Override
- void close(boolean keepTmpDir) throws Exception {
+ public void close(boolean keepTmpDir) throws Exception {
try {
- super.close(keepTmpDir);
+ baseSession.close(keepTmpDir);
} finally {
- parent.unregisterOpenSession(this);
+ manager.unregisterOpenSession(this);
if (expirationTracker != null) {
expirationTracker.removeFromExpirationQueue(this);
}
@@ -116,23 +97,67 @@ void close(boolean keepTmpDir) throws Exception {
}
@Override
- protected void openInternal(String[] additionalFiles,
- boolean isAsync, LogHelper console, HiveResources resources)
- throws IOException, URISyntaxException, TezException {
- super.openInternal(additionalFiles, isAsync, console, resources);
- parent.registerOpenSession(this);
+ public void open(String[] additionalFilesNotFromConf)
+ throws IOException, TezException {
+ baseSession.open(additionalFilesNotFromConf);
+ afterOpen();
+ }
+
+ @Override
+ public void open() throws IOException, TezException {
+ baseSession.open();
+ afterOpen();
+ }
+
+ private void afterOpen() {
+ manager.registerOpenSession(this);
if (expirationTracker != null) {
expirationTracker.addToExpirationQueue(this);
}
}
@Override
- public String toString() {
- if (expirationNs == null) return super.toString();
- long expiresInMs = (expirationNs - System.nanoTime()) / 1000000L;
- return super.toString() + ", expires in " + expiresInMs + "ms";
+ public void open(HiveResources resources)
+ throws IOException, TezException {
+ baseSession.open(resources);
+ afterOpen();
+ }
+
+ // TODO: this is only supported in CLI, might be good to try to remove it.
+ @Override
+ public void beginOpen(String[] additionalFiles, LogHelper console)
+ throws IOException, TezException {
+ baseSession.beginOpen(additionalFiles, console);
+ afterOpen();
}
+ @Override
+ public void endOpen() throws InterruptedException, CancellationException {
+ baseSession.endOpen();
+ }
+
+ @Override
+ public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) throws IOException {
+ baseSession.ensureLocalResources(conf, newFilesNotFromConf);
+ }
+
+ @Override
+ public HiveResources extractHiveResources() {
+ return baseSession.extractHiveResources();
+ }
+
+ @Override
+ public Path replaceHiveResources(HiveResources resources, boolean isAsync) {
+ return baseSession.replaceHiveResources(resources, isAsync);
+ }
+
+ @Override
+ public boolean killQuery(String reason) throws HiveException {
+ return baseSession.killQuery(reason);
+ }
+
+ // *********** Methods specific to a pool session.
+
/**
* Tries to use this session. When the session is in use, it will not expire.
* @return true if the session can be used; false if it has already expired.
@@ -157,7 +182,9 @@ boolean stopUsing() {
if (!sessionState.compareAndSet(STATE_IN_USE, finalState)) {
throw new AssertionError("Unexpected state change; currently " + sessionState.get());
}
- if (finalState == STATE_NONE) return true;
+ if (finalState == STATE_NONE){
+ return true;
+ }
expirationTracker.closeAndRestartExpiredSessionAsync(this);
return false;
}
@@ -190,24 +217,164 @@ private final boolean shouldExpire() {
@Override
public void returnToSessionManager() throws Exception {
- parent.returnAfterUse(this);
+ manager.returnAfterUse(this);
}
@Override
- public TezSessionState reopen() throws Exception {
- return parent.reopen(this);
+ public TezSession reopen() throws Exception {
+ return manager.reopen(this);
}
@Override
public void destroy() throws Exception {
- parent.destroy(this);
+ manager.destroy(this);
}
- boolean isOwnedBy(Manager parent) {
- return this.parent == parent;
+ public boolean isOwnedBy(Manager parent) {
+ return this.manager == parent;
}
- void updateFromRegistry(TezAmInstance si, int ephSeqVersion) {
+ public void updateFromRegistry(TezAmInstance si, int ephSeqVersion) {
// Nothing to do.
}
+
+ @Override
+ public String toString() {
+ return baseSession.toString() + getExpirationString();
+ }
+
+ private String getExpirationString() {
+ if (expirationNs == null){
+ return "";
+ }
+ long expiresInMs = (expirationNs - System.nanoTime()) / 1000000L;
+ return ", expires in " + expiresInMs + "ms";
+ }
+
+ // ********** The methods that we redirect to base.
+ // We could instead have a separate "data" interface that would "return superr" here, and
+ // "return this" in the actual session implementation; however that would require everyone to
+ // call session.getData().method() for some arbitrary set of methods. Let's keep all the
+ // ugliness in one place.
+
+ @Override
+ public HiveConf getConf() {
+ return baseSession.getConf();
+ }
+
+ @Override
+ public String getSessionId() {
+ return baseSession.getSessionId();
+ }
+
+ @Override
+ public String getUser() {
+ return baseSession.getUser();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return baseSession.isOpen();
+ }
+
+ @Override
+ public void setQueueName(String queueName) {
+ baseSession.setQueueName(queueName);
+ }
+
+ @Override
+ public String getQueueName() {
+ return baseSession.getQueueName();
+ }
+
+ @Override
+ public void setDefault() {
+ baseSession.setDefault();
+ }
+
+ @Override
+ public boolean isDefault() {
+ return baseSession.isDefault();
+ }
+
+ @Override
+ public boolean getDoAsEnabled() {
+ return baseSession.getDoAsEnabled();
+ }
+
+ @Override
+ public boolean getLegacyLlapMode() {
+ return baseSession.getLegacyLlapMode();
+ }
+
+ @Override
+ public void setLegacyLlapMode(boolean b) {
+ baseSession.setLegacyLlapMode(b);
+ }
+
+ @Override
+ public WmContext getWmContext() {
+ return baseSession.getWmContext();
+ }
+
+ @Override
+ public void setWmContext(WmContext ctx) {
+ baseSession.setWmContext(ctx);
+ }
+
+ @Override
+ public LocalResource getAppJarLr() {
+ return baseSession.getAppJarLr();
+ }
+
+ @Override
+ public List getLocalizedResources() {
+ return baseSession.getLocalizedResources();
+ }
+
+ @Override
+ public TezClient getTezClient() {
+ return baseSession.getTezClient();
+ }
+
+ @Override
+ public boolean isOpening() {
+ return baseSession.isOpening();
+ }
+
+ @Override
+ public void setOwnerThread() {
+ baseSession.setOwnerThread();
+ }
+
+ @Override
+ public void unsetOwnerThread() {
+ baseSession.unsetOwnerThread();
+ }
+
+ @Override
+ public KillQuery getKillQuery() {
+ return baseSession.getKillQuery();
+ }
+
+ @Override
+ public void setKillQuery(KillQuery kq) {
+ baseSession.setKillQuery(kq);
+ }
+
+ @Override
+ public String getAppMasterUri() {
+ return baseSession.getAppMasterUri();
+ }
+
+ @Override
+ public Map getMetrics(){
+ return baseSession.getMetrics();
+ }
+
+ @Override
+ public void updateDagStatus(DAGStatus dagStatus) {
+ baseSession.updateDagStatus(dagStatus);
+ }
+ // ********** End of the methods that we redirect to base.
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 1b6965018a88..2924416ad480 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -18,18 +18,15 @@
package org.apache.hadoop.hive.ql.exec.tez;
import org.apache.hadoop.hive.common.JavaVersionUtils;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
@@ -62,12 +59,15 @@
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.KillQuery;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -92,7 +92,6 @@
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@@ -101,10 +100,10 @@
import com.google.common.cache.CacheBuilder;
/**
- * Holds session state related to Tez
+ * The basic implementation of TezSession.
*/
@JsonSerialize
-public class TezSessionState {
+public class TezSessionState implements TezSession {
protected static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName());
private static final String TEZ_DIR = "_tez_session_dir";
@@ -113,17 +112,17 @@ public class TezSessionState {
private static final String LLAP_LAUNCHER = LlapContainerLauncher.class.getName();
private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName();
- private final HiveConf conf;
+ protected final HiveConf conf;
@VisibleForTesting
Path tezScratchDir;
- private LocalResource appJarLr;
+ protected LocalResource appJarLr;
private TezClient session;
private Future sessionFuture;
/** Console used for user feedback during async session opening. */
private LogHelper console;
@JsonProperty("sessionId")
private String sessionId;
- private final DagUtils utils;
+ protected final DagUtils utils;
@JsonProperty("queueName")
private String queueName;
@JsonProperty("defaultQueue")
@@ -133,29 +132,13 @@ public class TezSessionState {
private AtomicReference ownerThread = new AtomicReference<>(null);
- public static final class HiveResources {
- public HiveResources(Path dagResourcesDir) {
- this.dagResourcesDir = dagResourcesDir;
- }
- /** A directory that will contain resources related to DAGs and specified in configs. */
- public final Path dagResourcesDir;
- public final Map additionalFilesNotFromConf = new HashMap();
- /** Localized resources of this session; both from conf and not from conf (above). */
- public final Set localizedResources = new HashSet<>();
-
- @Override
- public String toString() {
- return dagResourcesDir + "; " + additionalFilesNotFromConf.size() + " additional files, "
- + localizedResources.size() + " localized resources";
- }
- }
private HiveResources resources;
@JsonProperty("doAsEnabled")
private boolean doAsEnabled;
private boolean isLegacyLlapMode;
- private WmContext wmContext;
- private KillQuery killQuery;
+ protected WmContext wmContext;
+ protected KillQuery killQuery;
private static final Cache shaCache = CacheBuilder.newBuilder().maximumSize(100).build();
@@ -185,6 +168,12 @@ public TezSessionState(String sessionId, HiveConf conf) {
this.sessionId = sessionId;
}
+ public static TezSessionState create(String sessionId, HiveConf conf, boolean useExternalSessions) {
+ return useExternalSessions ? new TezExternalSessionState(sessionId, conf) :
+ new TezSessionState(sessionId, conf);
+ }
+
+ @Override
public boolean isOpening() {
if (session != null || sessionFuture == null) {
return false;
@@ -208,6 +197,7 @@ public boolean isOpening() {
return false;
}
+ @Override
public boolean isOpen() {
if (session != null) {
return true;
@@ -236,58 +226,45 @@ public static String makeSessionId() {
return UUID.randomUUID().toString();
}
- public void open() throws IOException, URISyntaxException, TezException {
- String[] noFiles = null;
- open(noFiles);
+ @Override
+ public void open() throws IOException, TezException {
+ openInternal(null, false, null, null);
}
/**
* Creates a tez session. A session is tied to either a cli/hs2 session. You can
* submit multiple DAGs against a session (as long as they are executed serially).
*/
- public void open(String[] additionalFilesNotFromConf)
- throws IOException, URISyntaxException, TezException {
+ @Override
+ public void open(String[] additionalFilesNotFromConf) throws IOException, TezException {
openInternal(additionalFilesNotFromConf, false, null, null);
}
- public void open(HiveResources resources)
- throws IOException, URISyntaxException, TezException {
+ @Override
+ public void open(HiveResources resources) throws IOException, TezException {
openInternal(null, false, null, resources);
}
- public void beginOpen(String[] additionalFiles, LogHelper console)
- throws IOException, URISyntaxException, TezException {
+ @Override
+ public void beginOpen(String[] additionalFiles, LogHelper console) throws IOException, TezException {
openInternal(additionalFiles, true, console, null);
}
protected void openInternal(String[] additionalFilesNotFromConf,
- boolean isAsync, LogHelper console, HiveResources resources)
- throws IOException, URISyntaxException, TezException {
- // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two.
- String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
- if (queueName != null && !queueName.equals(confQueueName)) {
- LOG.warn("Resetting a queue name that was already set: was "
- + queueName + ", now " + confQueueName);
- }
- this.queueName = confQueueName;
- this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
-
- // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ?
- UserGroupInformation ugi = Utils.getUGI();
- user = ugi.getShortUserName();
- LOG.info("User of session id " + sessionId + " is " + user);
+ boolean isAsync, LogHelper console, HiveResources resources) throws IOException, TezException {
+ initQueueAndUser();
// Create the tez tmp dir and a directory for Hive resources.
tezScratchDir = createTezDir(sessionId, null);
if (resources != null) {
// If we are getting the resources externally, don't relocalize anything.
this.resources = resources;
- LOG.info("Setting resources to " + resources);
+ LOG.info("Setting resources to {}", resources);
} else {
this.resources = new HiveResources(createTezDir(sessionId, "resources"));
ensureLocalResources(conf, additionalFilesNotFromConf);
- LOG.info("Created new resources: " + this.resources);
+ LOG.info("Created new resources: {}", this.resources);
}
// Unless already installed on all the cluster nodes, we'll have to localize hive-exec.jar as well.
@@ -333,14 +310,10 @@ Map buildCommonLocalResources() {
@VisibleForTesting
void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException, IOException {
final Map commonLocalResources = buildCommonLocalResources();
- final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+ final boolean llapMode = isLlapMode();
if (llapMode) {
- // localize llap client jars
- addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources);
- addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources);
- addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources);
- addJarLRByClass(RegistryOperations.class, commonLocalResources);
+ addLlapJars(commonLocalResources);
}
// Create environment for AM.
@@ -349,36 +322,11 @@ void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException,
// and finally we're ready to create and start the session
// generate basic tez config
- final TezConfiguration tezConfig = new TezConfiguration(true);
- tezConfig.addResource(conf);
-
- setupTezParamsBasedOnMR(tezConfig);
-
- // set up the staging directory to use
+ final TezConfiguration tezConfig = createTezConfig();
tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
- conf.stripHiddenConfigurations(tezConfig);
- ServicePluginsDescriptor servicePluginsDescriptor;
-
- Credentials llapCredentials = null;
- if (llapMode) {
- if (isKerberosEnabled(tezConfig)) {
- llapCredentials = new Credentials();
- llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig));
- }
- // TODO Change this to not serialize the entire Configuration - minor.
- UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
- // we need plugins to handle llap and uber mode
- servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
- new TaskSchedulerDescriptor[] { TaskSchedulerDescriptor.create(
- LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(servicePluginPayload) },
- new ContainerLauncherDescriptor[] { ContainerLauncherDescriptor.create(
- LLAP_SERVICE, LLAP_LAUNCHER) },
- new TaskCommunicatorDescriptor[] { TaskCommunicatorDescriptor.create(
- LLAP_SERVICE, LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload) });
- } else {
- servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
- }
+ Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig);
+ ServicePluginsDescriptor spd = createServicePluginDescriptor(llapMode, tezConfig);
// container prewarming. tell the am how many containers we need
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
@@ -389,8 +337,6 @@ void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException,
tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
}
- setupSessionAcls(tezConfig, conf);
-
/*
* Update HADOOP_CREDSTORE_PASSWORD for the TezAM.
* If there is a job specific credential store, it will be set.
@@ -404,16 +350,15 @@ void openInternalUnsafe(boolean isAsync, LogHelper console) throws TezException,
String tezJobNameFormat = HiveConf.getVar(conf, ConfVars.HIVE_TEZ_JOB_NAME);
final TezClient session = TezClient.newBuilder(String.format(tezJobNameFormat, sessionId), tezConfig)
.setIsSession(true).setLocalResources(commonLocalResources)
- .setCredentials(llapCredentials).setServicePluginDescriptor(servicePluginsDescriptor)
+ .setCredentials(llapCredentials).setServicePluginDescriptor(spd)
.build();
- LOG.info("Opening new Tez Session (id: " + sessionId
- + ", scratch dir: " + tezScratchDir + ")");
+ LOG.info("Opening new Tez Session (id: {} scratch dir: {})", sessionId, tezScratchDir);
TezJobMonitor.initShutdownHook();
if (!isAsync) {
startSessionAndContainers(session, conf, commonLocalResources, tezConfig, false);
- this.session = session;
+ setTezClient(session);
} else {
FutureTask sessionFuture = new FutureTask<>(new Callable() {
@Override
@@ -425,7 +370,7 @@ public TezClient call() throws Exception {
} catch (Throwable t) {
// The caller has already stopped the session.
LOG.error("Failed to start Tez session", t);
- throw (t instanceof Exception) ? (Exception)t : new Exception(t);
+ throw (t instanceof Exception) ? (Exception) t : new Exception(t);
}
// Check interrupt at the last moment in case we get cancelled quickly.
// This is not bulletproof but should allow us to close session in most cases.
@@ -444,21 +389,70 @@ public TezClient call() throws Exception {
}
}
- /**
- * Check if Kerberos authentication is enabled.
- * This is used by:
- * - HS2 (upon Tez session creation)
- * In secure scenarios HS2 might either be logged on (by Kerberos) by itself or by a launcher
- * script it was forked from. In the latter case UGI.getLoginUser().isFromKeytab() returns false,
- * hence UGI.getLoginUser().hasKerberosCredentials() is a tightest setting we can check against.
- */
- private boolean isKerberosEnabled(Configuration conf) {
- try {
- return UserGroupInformation.getLoginUser().hasKerberosCredentials() &&
- HiveConf.getBoolVar(conf, ConfVars.LLAP_USE_KERBEROS);
- } catch (IOException e) {
- return false;
+ protected static ServicePluginsDescriptor createServicePluginDescriptor(boolean llapMode, TezConfiguration tezConfig)
+ throws IOException {
+ if (!llapMode) {
+ return ServicePluginsDescriptor.create(true);
+ }
+ // TODO Change this to not serialize the entire Configuration - minor.
+ UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
+ // we need plugins to handle llap and uber mode
+ return ServicePluginsDescriptor.create(true,
+ new TaskSchedulerDescriptor[]{TaskSchedulerDescriptor.create(LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(
+ servicePluginPayload)},
+ new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create(LLAP_SERVICE, LLAP_LAUNCHER)},
+ new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create(LLAP_SERVICE,
+ LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload)});
+ }
+
+ protected final Credentials createLlapCredentials(boolean llapMode, TezConfiguration tezConfig) throws IOException {
+ if (!llapMode || !UserGroupInformation.isSecurityEnabled()){
+ return null;
+ }
+ Credentials llapCredentials = new Credentials();
+ llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig));
+ return llapCredentials;
+ }
+
+ protected final TezConfiguration createDefaultTezConfig() {
+ TezConfiguration tezConfig = new TezConfiguration(true);
+ tezConfig.addResource(conf);
+ setupTezParamsBasedOnMR(tezConfig);
+ conf.stripHiddenConfigurations(tezConfig);
+ return tezConfig;
+ }
+
+ protected final TezConfiguration createTezConfig() throws IOException {
+ TezConfiguration tezConfig = createDefaultTezConfig();
+ setupSessionAcls(tezConfig, conf);
+ return tezConfig;
+ }
+
+ protected boolean isLlapMode() {
+ return "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+ }
+
+ protected final void addLlapJars(Map commonLocalResources)
+ throws IOException {
+ addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources);
+ addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources);
+ addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources);
+ addJarLRByClass(RegistryOperations.class, commonLocalResources);
+ }
+
+ protected final void initQueueAndUser() throws IOException {
+ // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two.
+ String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
+ if (queueName != null && !queueName.equals(confQueueName)) {
+ LOG.warn("Resetting a queue name that was already set: was {} now {}", queueName, confQueueName);
}
+ this.queueName = confQueueName;
+ this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
+
+ // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ?
+ UserGroupInformation ugi = Utils.getUGI();
+ user = ugi.getShortUserName();
+ LOG.info("User of session id {} is {}", sessionId, user);
}
private static Token getLlapToken(
@@ -563,7 +557,7 @@ public void endOpen() throws InterruptedException, CancellationException {
if (session == null) {
throw new RuntimeException("Initialization was interrupted");
}
- this.session = session;
+ setTezClient(session);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
@@ -636,7 +630,7 @@ private void setupTezParamsBasedOnMR(TezConfiguration conf) {
}
}
}
- private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws
+ protected void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws
IOException {
// TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
@@ -664,10 +658,10 @@ private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws
}
/** This is called in openInternal and in TezTask.updateSession to localize conf resources. */
- public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf)
- throws IOException, URISyntaxException, TezException {
+ @Override
+ public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) throws IOException {
if (resources == null) {
- throw new AssertionError("Ensure called on an unitialized (or closed) session " + sessionId);
+ throw new AssertionError("Ensure called on an uninitialized (or closed) session " + sessionId);
}
String dir = resources.dagResourcesDir.toString();
resources.localizedResources.clear();
@@ -729,20 +723,21 @@ public void ensureLocalResources(Configuration conf, String[] newFilesNotFromCon
* whether or not to remove the scratch dir at the same time.
* @throws Exception
*/
- void close(boolean keepDagFilesDir) throws Exception {
+ @Override
+ public void close(boolean keepDagFilesDir) throws Exception {
console = null;
appJarLr = null;
try {
if (session != null) {
- LOG.info("Closing Tez Session");
+ LOG.info("Closing Tez Session (sync)");
closeClient(session);
session = null;
} else if (sessionFuture != null) {
sessionFuture.cancel(true);
TezClient asyncSession = null;
try {
- asyncSession = sessionFuture.get(); // In case it was done and noone looked at it.
+ asyncSession = sessionFuture.get(); // In case it was done and none looked at it.
} catch (ExecutionException | CancellationException e) {
// ignore
} catch (InterruptedException e) {
@@ -751,7 +746,7 @@ void close(boolean keepDagFilesDir) throws Exception {
}
sessionFuture = null;
if (asyncSession != null) {
- LOG.info("Closing Tez Session");
+ LOG.info("Closing Tez Session (async)");
closeClient(asyncSession);
}
}
@@ -793,11 +788,17 @@ protected final void cleanupDagResources() throws IOException {
}
}
+ @Override
public String getSessionId() {
return sessionId;
}
- public TezClient getSession() {
+ protected final void setTezClient(TezClient session) {
+ this.session = session;
+ }
+
+ @Override
+ public TezClient getTezClient() {
if (session == null && sessionFuture != null) {
if (!sessionFuture.isDone()) {
console.printInfo("Waiting for Tez session and AM to be ready...");
@@ -819,6 +820,7 @@ public TezClient getSession() {
return session;
}
+ @Override
public LocalResource getAppJarLr() {
return appJarLr;
}
@@ -918,47 +920,58 @@ private String getSha(final Path localFile) throws IOException, IllegalArgumentE
}
return sha256;
}
+
+ @Override
public void setQueueName(String queueName) {
this.queueName = queueName;
}
+ @Override
public String getQueueName() {
return queueName;
}
+ @Override
public void setDefault() {
defaultQueue = true;
}
+ @Override
public boolean isDefault() {
return defaultQueue;
}
+ @Override
public HiveConf getConf() {
return conf;
}
+ @Override
public List getLocalizedResources() {
- return new ArrayList<>(resources.localizedResources);
+ return resources == null ? new ArrayList<>() : new ArrayList<>(resources.localizedResources);
}
+ @Override
public String getUser() {
return user;
}
+ @Override
public boolean getDoAsEnabled() {
return doAsEnabled;
}
/** Mark session as free for use from TezTask, for safety/debugging purposes. */
- public void markFree() {
+ @Override
+ public void unsetOwnerThread() {
if (ownerThread.getAndSet(null) == null) {
throw new AssertionError("Not in use");
}
}
/** Mark session as being in use from TezTask, for safety/debugging purposes. */
- public void markInUse() {
+ @Override
+ public void setOwnerThread() {
String newName = Thread.currentThread().getName();
do {
String oldName = ownerThread.get();
@@ -969,51 +982,62 @@ public void markInUse() {
} while (!ownerThread.compareAndSet(null, newName));
}
- void setLegacyLlapMode(boolean value) {
+ @Override
+ public void setLegacyLlapMode(boolean value) {
this.isLegacyLlapMode = value;
}
- boolean getLegacyLlapMode() {
+ @Override
+ public boolean getLegacyLlapMode() {
return this.isLegacyLlapMode;
}
+ @Override
public void returnToSessionManager() throws Exception {
// By default, TezSessionPoolManager handles this for both pool and non-pool session.
TezSessionPoolManager.getInstance().returnSession(this);
}
- public TezSessionState reopen() throws Exception {
+ @Override
+ public TezSession reopen() throws Exception {
// By default, TezSessionPoolManager handles this for both pool and non-pool session.
return TezSessionPoolManager.getInstance().reopen(this);
}
+ @Override
public void destroy() throws Exception {
// By default, TezSessionPoolManager handles this for both pool and non-pool session.
TezSessionPoolManager.getInstance().destroy(this);
}
+ @Override
public WmContext getWmContext() {
return wmContext;
}
+ @Override
public void setWmContext(final WmContext wmContext) {
this.wmContext = wmContext;
}
- public void setKillQuery(final KillQuery killQuery) {
- this.killQuery = killQuery;
- }
-
+ @Override
public KillQuery getKillQuery() {
return killQuery;
}
+ @Override
+ public void setKillQuery(final KillQuery killQuery) {
+ this.killQuery = killQuery;
+ }
+
+ @Override
public HiveResources extractHiveResources() {
HiveResources result = resources;
resources = null;
return result;
}
+ @Override
public Path replaceHiveResources(HiveResources resources, boolean isAsync) {
Path dir = null;
if (this.resources != null) {
@@ -1032,10 +1056,10 @@ public Path replaceHiveResources(HiveResources resources, boolean isAsync) {
return dir;
}
+ @Override
public String getAppMasterUri() {
- return Optional.of(getSession()).map(
- tezClient -> tezClient.getAmHost() + ":" + tezClient.getAmPort())
- .get();
+ return Optional.ofNullable(session).map(
+ tezClient -> tezClient.getAmHost() + ":" + tezClient.getAmPort()).orElse(null);
}
/**
@@ -1046,6 +1070,7 @@ public String getAppMasterUri() {
*
* @return A map containing metrics for TezSessionPoolManagerMetrics.
*/
+ @Override
public Map getMetrics() {
Map metrics = new HashMap<>();
if (dagStatus == null) {
@@ -1063,7 +1088,22 @@ public Map getMetrics() {
* TezSessionState receives periodic updates from the current DAG's status.
* @param dagStatus status of the current DAG
*/
+ @Override
public void updateDagStatus(DAGStatus dagStatus) {
this.dagStatus = dagStatus;
}
+
+ @Override
+ public boolean killQuery(String reason) throws HiveException {
+ if (killQuery == null || wmContext == null) {
+ return false;
+ }
+ String queryId = wmContext.getQueryId();
+ if (queryId == null) {
+ return false;
+ }
+ LOG.info("Killing the query {}: {}", queryId, reason);
+ killQuery.killQuery(queryId, reason, conf, true);
+ return true;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 75d11a070bef..6bd801dd64c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -157,7 +157,7 @@ public int execute() {
int rc = 1;
boolean cleanContext = false;
Context ctx = null;
- Ref sessionRef = Ref.from(null);
+ Ref sessionRef = Ref.from(null);
final String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_QUERY_ID);
@@ -177,7 +177,7 @@ public int execute() {
SessionState ss = SessionState.get();
// Note: given that we return pool sessions to the pool in the finally block below, and that
// we need to set the global to null to do that, this "reuse" may be pointless.
- TezSessionState session = sessionRef.value = ss.getTezSession();
+ TezSession session = sessionRef.value = ss.getTezSession();
if (session != null && !session.isOpen()) {
LOG.warn("The session: " + session + " has not been opened");
}
@@ -269,7 +269,7 @@ public int execute() {
String dagId = this.dagClient.getDagIdentifierString();
String appId = this.dagClient.getSessionIdentifierString();
LOG.info("HS2 Host: [{}], Query ID: [{}], Dag ID: [{}], DAG App ID: [{}], DAG App address: [{}]",
- ServerUtils.hostname(), queryId, dagId, appId, session.getSession().getAmHost());
+ ServerUtils.hostname(), queryId, dagId, appId, session.getTezClient().getAmHost());
LogUtils.putToMDC(LogUtils.DAGID_KEY, dagId);
this.jobID = dagId;
runtimeContext.setDagId(dagId);
@@ -469,8 +469,8 @@ private void logResources(List additionalLr) {
*/
@VisibleForTesting
void ensureSessionHasResources(
- TezSessionState session, String[] nonConfResources) throws Exception {
- TezClient client = session.getSession();
+ TezSession session, String[] nonConfResources) throws Exception {
+ TezClient client = session.getTezClient();
// TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ?
if (client == null) {
// Note: the only sane case where this can happen is the non-pool one. We should get rid
@@ -642,19 +642,19 @@ private static void setAccessControlsForCurrentUser(DAG dag, String queryId,
dag.setAccessControls(ac);
}
- private TezSessionState getNewTezSessionOnError(
- TezSessionState oldSession) throws Exception {
+ private TezSession getNewTezSessionOnError(
+ TezSession oldSession) throws Exception {
// Note: we don't pass the config to reopen. If the session was already open, it would
// have kept running with its current config - preserve that behavior.
- TezSessionState newSession = oldSession.reopen();
+ TezSession newSession = oldSession.reopen();
console.printInfo("Session re-established.");
return newSession;
}
- DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception {
+ DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception {
perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
DAGClient dagClient = null;
- TezSessionState sessionState = sessionStateRef.value;
+ TezSession sessionState = sessionStateRef.value;
try {
try {
// ready to start execution on the cluster
@@ -690,13 +690,13 @@ DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception
return new SyncDagClient(dagClient);
}
- private DAGClient submitInternal(DAG dag, TezSessionState sessionState) throws TezException, IOException {
+ private DAGClient submitInternal(DAG dag, TezSession sessionState) throws TezException, IOException {
runtimeContext.init(sessionState);
- return sessionState.getSession().submitDAG(dag);
+ return sessionState.getTezClient().submitDAG(dag);
}
- private void sessionDestroyOrReturnToPool(Ref sessionStateRef,
- TezSessionState sessionState) throws Exception{
+ private void sessionDestroyOrReturnToPool(Ref sessionStateRef,
+ TezSession sessionState) throws Exception{
sessionStateRef.value = null;
if (sessionState.isDefault() && sessionState instanceof TezSessionPoolSession) {
sessionState.returnToSessionManager();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
index 867e31c7314c..8c759170c54e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java
@@ -43,10 +43,10 @@ public class TriggerValidatorRunnable implements Runnable {
@Override
public void run() {
try {
- Map violatedSessions = new HashMap<>();
- final List sessions = sessionTriggerProvider.getSessions();
+ Map violatedSessions = new HashMap<>();
+ final List sessions = sessionTriggerProvider.getSessions();
final List triggers = sessionTriggerProvider.getTriggers();
- for (TezSessionState sessionState : sessions) {
+ for (TezSession sessionState : sessions) {
WmContext wmContext = sessionState.getWmContext();
if (wmContext != null && !wmContext.isQueryCompleted()
&& !wmContext.getSubscribedCounters().isEmpty()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index cd6bb37a4e27..bf4fce11c874 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -22,7 +22,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
import org.apache.hive.common.util.Ref;
@@ -56,7 +55,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
private SettableFuture returnFuture = null;
private boolean isDelayedMove;
- private final WorkloadManager wmParent;
+ private final WorkloadManager wmManager;
/** The actual state of the guaranteed task, and the update state, for the session. */
// Note: hypothetically, a generic WM-aware-session should not know about guaranteed tasks.
@@ -68,18 +67,17 @@ private final static class ActualWmState {
}
private final ActualWmState actualState = new ActualWmState();
- public WmTezSession(String sessionId, WorkloadManager parent,
- SessionExpirationTracker expiration, HiveConf conf) {
- super(sessionId, parent, expiration, conf);
- wmParent = parent;
+ public WmTezSession(
+ WorkloadManager manager, SessionExpirationTracker expiration, TezSessionState baseSession) {
+ super(manager, expiration, baseSession);
+ wmManager = manager;
isDelayedMove = false;
}
@VisibleForTesting
- WmTezSession(String sessionId, Manager testParent,
- SessionExpirationTracker expiration, HiveConf conf) {
- super(sessionId, testParent, expiration, conf);
- wmParent = null;
+ WmTezSession(Manager testParent, SessionExpirationTracker expiration, TezSessionState baseSession) {
+ super(testParent, expiration, baseSession);
+ wmManager = null;
isDelayedMove = false;
}
@@ -107,7 +105,7 @@ public ListenableFuture waitForAmRegistryAsync(
@Override
- void updateFromRegistry(TezAmInstance si, int ephSeqVersion) {
+ public void updateFromRegistry(TezAmInstance si, int ephSeqVersion) {
updateAmEndpointInfo(si, ephSeqVersion);
if (si != null) {
handleGuaranteedTasksChange(si.getGuaranteedCount());
@@ -151,7 +149,7 @@ private void handleGuaranteedTasksChange(int guaranteedCount) {
doNotify = actualState.target != guaranteedCount;
}
if (!doNotify) return;
- wmParent.notifyOfInconsistentAllocation(this);
+ wmManager.notifyOfInconsistentAllocation(this);
}
@Override
@@ -238,7 +236,7 @@ boolean setFailedToSendGuaranteed() {
}
public void handleUpdateError(int endpointVersion) {
- wmParent.addUpdateError(this, endpointVersion);
+ wmManager.addUpdateError(this, endpointVersion);
}
@Override
@@ -305,5 +303,4 @@ public String toString() {
return super.toString() + ", WM state poolName=" + poolName + ", clusterFraction="
+ clusterFraction + ", queryId=" + queryId + ", killReason=" + killReason;
}
-
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index c8c40e386172..148e9159f96e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -69,11 +69,10 @@
import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
-import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources;
+import org.apache.hadoop.hive.ql.exec.tez.TezSession.HiveResources;
import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
import org.apache.hadoop.hive.ql.exec.tez.WmEvent.EventType;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.KillQuery;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
@@ -100,7 +99,7 @@
* none of that state can be accessed directly - most changes that touch pool state, or interact
* with background operations like init, need to go thru eventstate; see e.g. returnAfterUse.
*/
-public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator
+public class WorkloadManager extends AbstractTriggerValidator
implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl,
WorkloadManagerMxBean {
private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
@@ -118,6 +117,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private final int amRegistryTimeoutMs;
private final boolean allowAnyPool;
private final MetricsSystem metricsSystem;
+ private final boolean useExternalSessions;
+
// Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool
// sessions, so the pool itself could internally track the ses sions it gave out, since
// calling close on an unopened session is probably harmless.
@@ -239,6 +240,8 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso
metricsSystem = null;
}
+ useExternalSessions = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS);
+
wmThread = new Thread(() -> runWmThread(), "Workload management master");
wmThread.setDaemon(true);
wmThread.start();
@@ -305,9 +308,9 @@ private void initTriggers() {
public void stop() throws Exception {
List sessionsToClose = null;
synchronized (openSessions) {
- sessionsToClose = new ArrayList(openSessions.keySet());
+ sessionsToClose = new ArrayList<>(openSessions.keySet());
}
- for (TezSessionState sessionState : sessionsToClose) {
+ for (TezSessionPoolSession sessionState : sessionsToClose) {
sessionState.close(false);
}
if (expirationTracker != null) {
@@ -339,7 +342,7 @@ private void updateSessionTriggerProvidersOnMasterThread() {
String poolName = entry.getKey();
PoolState poolState = entry.getValue();
final List triggers = Collections.unmodifiableList(poolState.getTriggers());
- final List sessionStates = Collections.unmodifiableList(poolState.getSessions());
+ final List sessionStates = Collections.unmodifiableList(poolState.getSessions());
SessionTriggerProvider sessionTriggerProvider = perPoolProviders.get(poolName);
if (sessionTriggerProvider != null) {
perPoolProviders.get(poolName).setTriggers(triggers);
@@ -479,26 +482,24 @@ private void scheduleWork(WmThreadSyncWork context) {
// because we know which exact query we intend to kill. This is valid because we
// are not expecting query ID to change - we never reuse the session for which a
// query is being killed until both the kill, and the user, return it.
- String queryId = toKill.getQueryId();
- KillQuery kq = toKill.getKillQuery();
try {
- if (kq != null && queryId != null) {
- WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL);
- LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
- try {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName());
- kq.killQuery(queryId, reason, toKill.getConf());
- addKillQueryResult(toKill, true);
- killCtx.killSessionFuture.set(true);
- wmEvent.endEvent(toKill);
- LOG.debug("Killed " + queryId);
- return;
- } catch (HiveException|IOException ex) {
- LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex);
- }
+ boolean wasKilled = false;
+ String queryId = toKill.getQueryId();
+ WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL);
+ try {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ DriverUtils.setUpAndStartSessionState(conf, ugi.getShortUserName());
+ wasKilled = toKill.killQuery(reason);
+ addKillQueryResult(toKill, true);
+ killCtx.killSessionFuture.set(true);
+ wmEvent.endEvent(toKill);
+ } catch (HiveException | IOException ex) {
+ LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex);
+ }
+ if (wasKilled) {
+ LOG.debug("Killed " + queryId);
} else {
- LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq);
+ LOG.info("Will queue restart for {}; queryId {}", toKill, queryId);
}
} finally {
toKill.setQueryId(null);
@@ -630,7 +631,8 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw
if (!wasReturned) {
syncWork.toDestroyNoRestart.add(sessionToReturn);
} else {
- if (sessionToReturn.getWmContext() != null && sessionToReturn.getWmContext().isQueryCompleted()) {
+ WmContext ctx = sessionToReturn.getWmContext();
+ if (ctx != null && ctx.isQueryCompleted()) {
sessionToReturn.resolveReturnFuture();
}
wmEvent.endEvent(sessionToReturn);
@@ -724,7 +726,8 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw
if (!tezAmPool.returnSessionAsync(ctx.session)) {
syncWork.toDestroyNoRestart.add(ctx.session);
} else {
- if (ctx.session.getWmContext() != null && ctx.session.getWmContext().isQueryCompleted()) {
+ WmContext wmCtx = ctx.session.getWmContext();
+ if (wmCtx != null && wmCtx.isQueryCompleted()) {
ctx.session.resolveReturnFuture();
}
wmEvent.endEvent(ctx.session);
@@ -1391,7 +1394,8 @@ private void returnSessionOnFailedReuse(
if (!tezAmPool.returnSessionAsync(session)) {
syncWork.toDestroyNoRestart.add(session);
} else {
- if (session.getWmContext() != null && session.getWmContext().isQueryCompleted()) {
+ WmContext wmCtx = session.getWmContext();
+ if (wmCtx != null && wmCtx.isQueryCompleted()) {
session.resolveReturnFuture();
}
wmEvent.endEvent(session);
@@ -1557,11 +1561,11 @@ public String toString() {
@VisibleForTesting
public WmTezSession getSession(
- TezSessionState session, MappingInput input, HiveConf conf) throws Exception {
+ TezSession session, MappingInput input, HiveConf conf) throws Exception {
return getSession(session, input, conf, null);
}
- public WmTezSession getSession(TezSessionState session, MappingInput input, HiveConf conf,
+ public WmTezSession getSession(TezSession session, MappingInput input, HiveConf conf,
final WmContext wmContext) throws Exception {
WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET);
// Note: not actually used for pool sessions; verify some things like doAs are not set.
@@ -1593,7 +1597,7 @@ public WmTezSession getSession(TezSessionState session, MappingInput input, Hive
}
@Override
- public void destroy(TezSessionState session) throws Exception {
+ public void destroy(TezSession session) throws Exception {
WmTezSession wmTezSession = ensureOwnedSession(session);
resetGlobalTezSession(wmTezSession);
currentLock.lock();
@@ -1727,7 +1731,7 @@ public void notifyInitializationCompleted(SessionInitContext initCtx) {
@Override
- public TezSessionState reopen(TezSessionState session) throws Exception {
+ public TezSession reopen(TezSession session) throws Exception {
WmTezSession wmTezSession = ensureOwnedSession(session);
HiveConf sessionConf = wmTezSession.getConf();
if (sessionConf == null) {
@@ -1765,7 +1769,7 @@ private void notifyWmThreadUnderLock() {
hasChangesCondition.signalAll();
}
- private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception {
+ private WmTezSession checkSessionForReuse(TezSession session) throws Exception {
if (session == null) return null;
WmTezSession result = null;
if (session instanceof WmTezSession) {
@@ -1814,10 +1818,11 @@ private WmTezSession createSession(HiveConf conf) {
protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
conf = (conf == null) ? new HiveConf(this.conf) : conf;
conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true");
- return new WmTezSession(sessionId, this, expirationTracker, conf);
+ TezSessionState base = TezSessionState.create(sessionId, conf, useExternalSessions);
+ return new WmTezSession(this, expirationTracker, base);
}
- private WmTezSession ensureOwnedSession(TezSessionState oldSession) {
+ private WmTezSession ensureOwnedSession(TezSession oldSession) {
if (!(oldSession instanceof WmTezSession) || !((WmTezSession)oldSession).isOwnedBy(this)) {
throw new AssertionError("Not a WM session " + oldSession);
}
@@ -1839,7 +1844,7 @@ public void unregisterOpenSession(TezSessionPoolSession session) {
synchronized (openSessions) {
openSessions.remove(session);
}
- tezAmPool.notifyClosed(session);
+ tezAmPool.notifyClosed(ensureOwnedSession(session));
}
@VisibleForTesting
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
index adde440bc79b..2a9aecb6297d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java
@@ -29,7 +29,7 @@
public class WorkloadManagerFederation {
private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class);
- public static TezSessionState getSession(TezSessionState session, HiveConf conf,
+ public static TezSession getSession(TezSession session, HiveConf conf,
MappingInput input, boolean isUnmanagedLlapMode, WmContext wmContext) throws Exception {
Set desiredCounters = new HashSet<>();
// 1. If WM is not present just go to unmanaged.
@@ -60,8 +60,8 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf,
}
}
- private static TezSessionState getUnmanagedSession(
- TezSessionState session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode,
+ private static TezSession getUnmanagedSession(
+ TezSession session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode,
final WmContext wmContext) throws Exception {
TezSessionPoolManager pm = TezSessionPoolManager.getInstance();
session = pm.getSession(session, conf, false, isWorkLlapNode);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
new file mode 100644
index 000000000000..550c77e573ab
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+// TODO: tez should provide this registry
+public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class);
+
+ private final HiveConf initConf;
+ private final Set available = new HashSet<>();
+ private final Set taken = new HashSet<>();
+ private final Object lock = new Object();
+ private final int maxAttempts;
+
+ private CuratorCache cache;
+ private volatile boolean isInitialized;
+
+
+ public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) {
+ this.initConf = initConf;
+ this.maxAttempts = HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS);
+ }
+
+ private static String getApplicationId(final ChildData childData) {
+ return childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1);
+ }
+
+ private void init() {
+ String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM);
+ String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+ String effectivePath = normalizeZkPath(zkNamespace);
+ CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3));
+ synchronized (lock) {
+ client.start();
+ this.cache = CuratorCache.build(client, effectivePath);
+ CuratorCacheListener listener = CuratorCacheListener.builder()
+ .forPathChildrenCache(effectivePath, client, new ExternalSessionsPathListener())
+ .build();
+ cache.listenable().addListener(listener);
+ cache.start();
+ cache.stream()
+ .filter(childData -> childData.getPath() != null
+ && childData.getPath().startsWith(effectivePath + "/"))
+ .forEach(childData -> available.add(getApplicationId(childData)));
+ LOG.info("Initial external sessions: {}", available);
+ isInitialized = true;
+ }
+ }
+
+ @VisibleForTesting
+ static String normalizeZkPath(String zkNamespace) {
+ return (zkNamespace.startsWith("/") ? zkNamespace : "/" + zkNamespace);
+ }
+
+ @Override
+ public String getSession() throws Exception {
+ synchronized (lock) {
+ if (!isInitialized) {
+ init();
+ }
+ long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts);
+ while (available.isEmpty() && ((endTimeNs - System.nanoTime()) > 0)) {
+ lock.wait(1000L);
+ }
+ Iterator iter = available.iterator();
+ if (!iter.hasNext()) {
+ throw new IOException("Cannot get a session after " + maxAttempts + " attempts");
+ }
+ String appId = iter.next();
+ iter.remove();
+ taken.add(appId);
+ return appId;
+ }
+ }
+
+ @Override
+ public void returnSession(String appId) {
+ synchronized (lock) {
+ if (!isInitialized) {
+ throw new IllegalStateException("Not initialized");
+ }
+ if (!taken.remove(appId)) {
+ return; // Session has been removed from ZK.
+ }
+ available.add(appId);
+ lock.notifyAll();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (cache != null) {
+ cache.close();
+ }
+ }
+
+ private final class ExternalSessionsPathListener implements PathChildrenCacheListener {
+
+ @Override
+ public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) throws Exception {
+ Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED,
+ "client is not started");
+ ChildData childData = event.getData();
+ if (childData == null) {
+ return;
+ }
+ String applicationId = getApplicationId(childData);
+ LOG.info("{} for external session {}", event.getType(), applicationId);
+
+ synchronized (lock) {
+ switch (event.getType()) {
+ case CHILD_UPDATED, CHILD_ADDED:
+ if (available.contains(applicationId) || taken.contains(applicationId)) {
+ return; // We do not expect updates to existing sessions; ignore them for now.
+ }
+ available.add(applicationId);
+ break;
+ case CHILD_REMOVED:
+ if (taken.remove(applicationId)) {
+ LOG.warn("The session in use has disappeared from the registry ({})", applicationId);
+ } else if (!available.remove(applicationId)) {
+ LOG.warn("An unknown session has been removed ({})", applicationId);
+ }
+ break;
+ default:
+ // Ignore all the other events; logged above.
+ }
+ }
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 81ef890dcb0b..92844f4d5716 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -39,8 +39,8 @@
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSession;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
-import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.exec.tez.Utils;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -78,7 +78,7 @@ public class TezJobMonitor {
private static final int MAX_RETRY_INTERVAL = 2500;
private static final int MAX_RETRY_FAILURES = (MAX_RETRY_INTERVAL / MAX_CHECK_INTERVAL) + 1;
- private final TezSessionState session;
+ private final TezSession session;
private final PerfLogger perfLogger;
private static final List shutdownList;
private final List topSortedWorks;
@@ -120,7 +120,7 @@ public static void initShutdownHook() {
// compile time tez counters
private final TezCounters counters;
- public TezJobMonitor(TezSessionState session, List topSortedWorks, final DAGClient dagClient, HiveConf conf,
+ public TezJobMonitor(TezSession session, List topSortedWorks, final DAGClient dagClient, HiveConf conf,
DAG dag, Context ctx, final TezCounters counters, PerfLogger perfLogger) {
this.session = session;
this.topSortedWorks = topSortedWorks;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
index 01dc7e2cd794..bbbd457e9ed7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java
@@ -22,5 +22,6 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
public interface KillQuery {
- void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException;
+ void killQuery(String queryId, String errMsg, HiveConf conf, boolean isYarn)
+ throws HiveException;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
index eac2936719a7..78598e6fdadc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java
@@ -23,7 +23,8 @@
public class NullKillQuery implements KillQuery {
@Override
- public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException {
+ public void killQuery(String queryId, String errMsg, HiveConf conf, boolean isYarn)
+ throws HiveException {
// Do nothing
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index d921c662ac27..53ee29743449 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -86,7 +86,9 @@
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.Registry;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezExternalSessionState;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.exec.tez.TezSession;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.history.HiveHistoryImpl;
@@ -255,7 +257,7 @@ public enum AuthorizationMode{V1, V2};
private Map> localMapRedErrors;
- private TezSessionState tezSessionState;
+ private TezSession tezSessionState;
private String currentDatabase;
@@ -761,9 +763,16 @@ private static void start(SessionState startSs, boolean isAsync, LogHelper conso
return;
}
+ boolean useExternalSessions = HiveConf.getBoolVar(startSs.getConf(),
+ ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS);
+
try {
if (startSs.tezSessionState == null) {
- startSs.setTezSession(new TezSessionState(startSs.getSessionId(), startSs.sessionConf));
+ if (useExternalSessions) {
+ startSs.setTezSession(new TezExternalSessionState(startSs.getSessionId(), startSs.sessionConf));
+ } else {
+ startSs.setTezSession(new TezSessionState(startSs.getSessionId(), startSs.sessionConf));
+ }
} else {
// Only TezTask sets this, and then removes when done, so we don't expect to see it.
LOG.warn("Tez session was already present in SessionState before start: "
@@ -2083,23 +2092,23 @@ public static PerfLogger getPerfLogger(boolean resetPerfLogger) {
}
}
- public TezSessionState getTezSession() {
+ public TezSession getTezSession() {
return tezSessionState;
}
/** Called from TezTask to attach a TezSession to use to the threadlocal. Ugly pattern... */
- public void setTezSession(TezSessionState session) {
+ public void setTezSession(TezSession session) {
if (tezSessionState == session) {
return; // The same object.
}
if (tezSessionState != null) {
- tezSessionState.markFree();
+ tezSessionState.unsetOwnerThread();
tezSessionState.setKillQuery(null);
tezSessionState = null;
}
tezSessionState = session;
if (session != null) {
- session.markInUse();
+ session.setOwnerThread();
tezSessionState.setKillQuery(getKillQuery());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
index 16106f481b8a..616304f3b478 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
@@ -17,21 +17,21 @@
import java.util.List;
-import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
+import org.apache.hadoop.hive.ql.exec.tez.TezSession;
/**
* Implementation for providing current open sessions and active trigger.
*/
public class SessionTriggerProvider {
- private List sessions;
+ private List sessions;
private List triggers;
- public SessionTriggerProvider(List openSessions, List triggers) {
+ public SessionTriggerProvider(List openSessions, List triggers) {
this.sessions = openSessions;
this.triggers = triggers;
}
- public List getSessions() {
+ public List getSessions() {
return sessions;
}
@@ -39,7 +39,7 @@ public List getTriggers() {
return triggers;
}
- public void setSessions(final List sessions) {
+ public void setSessions(final List sessions) {
this.sessions = sessions;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
index 7995a8f639b2..da99f0957315 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
@@ -17,8 +17,6 @@
import java.util.Map;
-import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
-
/**
* Interface for handling rule violations by queries and for performing actions defined in the rules.
*/
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
index f108160a80f3..502ee06c2521 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
@@ -24,6 +24,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -31,7 +32,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.dag.api.TezException;
-
/**
* This class is needed for writing junit tests. For testing the multi-session
* use case from hive server 2, we need a session simulation.
@@ -48,8 +48,9 @@ public class SampleTezSessionState extends WmTezSession {
public SampleTezSessionState(
String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) {
- super(sessionId, parent, (parent instanceof TezSessionPoolManager)
- ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf);
+ super(parent, (parent instanceof TezSessionPoolManager)
+ ? ((TezSessionPoolManager)parent).getExpirationTracker() : null,
+ new TezSessionState(sessionId, conf));
this.sessionId = sessionId;
this.hiveConf = conf;
waitForAmRegFuture = createDefaultWaitForAmRegistryFuture();
@@ -89,7 +90,7 @@ public void open(String[] additionalFiles) throws IOException {
}
@Override
- void close(boolean keepTmpDir) throws TezException, IOException {
+ public void close(boolean keepTmpDir) throws TezException, IOException {
open = keepTmpDir;
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionState.java
new file mode 100644
index 000000000000..1d4b5b10e5ef
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionState.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.lang.reflect.Field;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfForTest;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClient.TezClientBuilder;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+public class TestTezExternalSessionState {
+
+ private static SessionState createSessionState() {
+ HiveConf hiveConf = new HiveConfForTest(TestTezExternalSessionState.class);
+ hiveConf.set("hive.security.authorization.manager",
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory");
+ return SessionState.start(hiveConf);
+ }
+
+ public static class CountingExternalSessionsRegistry implements ExternalSessionsRegistry {
+ private final AtomicInteger getSessionCalls = new AtomicInteger(0);
+ private final AtomicInteger returnSessionCalls = new AtomicInteger(0);
+
+ public CountingExternalSessionsRegistry(HiveConf conf) {
+ }
+
+ @Override
+ public String getSession() {
+ int callId = getSessionCalls.incrementAndGet();
+ return "application_1_" + callId;
+ }
+
+ @Override
+ public void returnSession(String appId) {
+ returnSessionCalls.incrementAndGet();
+ }
+
+ @Override
+ public void close() {
+ // Testing registry; nothing specific to clean up.
+ }
+
+ int getGetSessionCalls() {
+ return getSessionCalls.get();
+ }
+
+ int getReturnSessionCalls() {
+ return returnSessionCalls.get();
+ }
+ }
+
+ @Test
+ public void testConsecutiveOpenInternalCallsAreIdempotent() throws Exception {
+ SessionState ss = createSessionState();
+ HiveConf conf = ss.getConf();
+ conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS,
+ CountingExternalSessionsRegistry.class.getName());
+
+ TezExternalSessionState session = new TezExternalSessionState("test-session", conf);
+ CountingExternalSessionsRegistry registry =
+ (CountingExternalSessionsRegistry) ExternalSessionsRegistryFactory.getClient(conf);
+ assertFalse("Session should start closed", session.isOpen());
+
+ TezClientBuilder builder = mock(TezClientBuilder.class);
+ TezClient tezClient = mock(TezClient.class);
+ when(builder.setIsSession(anyBoolean())).thenReturn(builder);
+ when(builder.setCredentials(any())).thenReturn(builder);
+ when(builder.setServicePluginDescriptor(any(ServicePluginsDescriptor.class))).thenReturn(builder);
+ when(builder.build()).thenReturn(tezClient);
+ when(tezClient.getClientName()).thenReturn("mock-client");
+ when(tezClient.getClient(any(ApplicationId.class))).thenReturn(null);
+
+ try (MockedStatic tezClientMock = mockStatic(TezClient.class)) {
+ tezClientMock.when(() -> TezClient.newBuilder(anyString(), any(TezConfiguration.class))).thenReturn(builder);
+
+ session.openInternal(null, false, null, null);
+ TezClient firstClient = session.getTezClient();
+ assertNotNull(firstClient);
+ assertTrue("Session should be open after first openInternal", session.isOpen());
+ String firstAppId = getExternalAppId(session);
+
+ session.openInternal(null, false, null, null);
+ TezClient secondClient = session.getTezClient();
+ assertNotNull(secondClient);
+ assertTrue("Session should remain open after second openInternal", session.isOpen());
+ String secondAppId = getExternalAppId(session);
+
+ // Idempotent behavior expectation: second openInternal should be a no-op for externalAppId.
+ assertSame("Consecutive openInternal calls should keep the same external app id",
+ firstAppId, secondAppId);
+ assertSame("Consecutive openInternal calls should keep the same Tez client instance",
+ firstClient, secondClient);
+ }
+
+ assertEquals("Idempotent openInternal should only acquire one external session",
+ 1, registry.getGetSessionCalls());
+ assertEquals("openInternal should not return sessions", 0, registry.getReturnSessionCalls());
+ }
+
+ private static String getExternalAppId(TezExternalSessionState session) throws Exception {
+ Field field = TezExternalSessionState.class.getDeclaredField("externalAppId");
+ field.setAccessible(true);
+ return (String) field.get(session);
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionsRegistryClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionsRegistryClient.java
new file mode 100644
index 000000000000..b0ee7042a122
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezExternalSessionsRegistryClient.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Test;
+
+public class TestTezExternalSessionsRegistryClient {
+ @Test
+ public void testDummyExternalSessionsRegistry() {
+ HiveConf conf = new HiveConf();
+ conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS,
+ DummyExternalSessionsRegistry.class.getName());
+ // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released.
+ conf.set("tez.am.registry.namespace", "dummy");
+ ExternalSessionsRegistry externalSessionsRegistry = ExternalSessionsRegistryFactory.getClient(conf);
+ assertEquals(DummyExternalSessionsRegistry.class, externalSessionsRegistry.getClass());
+ }
+
+ @Test
+ public void testTezExternalSessionsRegistry() {
+ HiveConf conf = new HiveConf();
+ // TODO: change this to TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM after Tez 1.0.0 is released.
+ conf.set("tez.am.zookeeper.quorum", "test-quorum");
+ // TODO: change this to TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE after Tez 1.0.0 is released.
+ conf.set("tez.am.registry.namespace", "tez");
+ conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_REGISTRY_CLASS,
+ ZookeeperExternalSessionsRegistryClient.class.getName());
+ ExternalSessionsRegistry externalSessionsRegistry = ExternalSessionsRegistryFactory.getClient(conf);
+ assertEquals(ZookeeperExternalSessionsRegistryClient.class, externalSessionsRegistry.getClass());
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index ee32d166101f..0729584aa338 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -22,7 +22,6 @@
import static org.junit.Assert.fail;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Random;
@@ -67,13 +66,13 @@ public void setUp() {
public void testGetNonDefaultSession() {
poolManager = new TestTezSessionPoolManager();
try {
- TezSessionState sessionState = poolManager.getSession(null, conf, true, false);
- TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true, false);
+ TezSession sessionState = poolManager.getSession(null, conf, true, false);
+ TezSession sessionState1 = poolManager.getSession(sessionState, conf, true, false);
if (sessionState1 != sessionState) {
fail();
}
conf.set("tez.queue.name", "nondefault");
- TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true, false);
+ TezSession sessionState2 = poolManager.getSession(sessionState, conf, true, false);
if (sessionState2 == sessionState) {
fail();
}
@@ -97,7 +96,7 @@ public void testSessionPoolGetInOrder() {
// this is now a LIFO operation
// draw 1 and replace
- TezSessionState sessionState = poolManager.getSession(null, conf, true, false);
+ TezSession sessionState = poolManager.getSession(null, conf, true, false);
assertEquals("a", sessionState.getQueueName());
poolManager.returnSession(sessionState);
@@ -108,13 +107,13 @@ public void testSessionPoolGetInOrder() {
// [a,b,c,a,b,c]
// draw 2 and return in order - further run should return last returned
- TezSessionState first = poolManager.getSession(null, conf, true, false);
- TezSessionState second = poolManager.getSession(null, conf, true, false);
+ TezSession first = poolManager.getSession(null, conf, true, false);
+ TezSession second = poolManager.getSession(null, conf, true, false);
assertEquals("a", first.getQueueName());
assertEquals("b", second.getQueueName());
poolManager.returnSession(first);
poolManager.returnSession(second);
- TezSessionState third = poolManager.getSession(null, conf, true, false);
+ TezSession third = poolManager.getSession(null, conf, true, false);
assertEquals("b", third.getQueueName());
poolManager.returnSession(third);
@@ -157,7 +156,7 @@ public void testSessionPoolThreads() {
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
poolManager.startPool(conf, null);
- TezSessionState[] sessions = new TezSessionState[12];
+ TezSession[] sessions = new TezSession[12];
int[] queueCounts = new int[3];
for (int i = 0; i < sessions.length; ++i) {
sessions[i] = poolManager.getSession(null, conf, true, false);
@@ -184,7 +183,7 @@ public void testSessionReopen() {
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 1);
poolManager = new TestTezSessionPoolManager();
- TezSessionState session = Mockito.mock(TezSessionState.class);
+ TezSession session = Mockito.mock(TezSession.class);
Mockito.when(session.getQueueName()).thenReturn("default");
Mockito.when(session.isDefault()).thenReturn(false);
Mockito.when(session.getConf()).thenReturn(conf);
@@ -192,7 +191,7 @@ public void testSessionReopen() {
poolManager.reopen(session);
Mockito.verify(session).close(true);
- Mockito.verify(session).open(Mockito.any());
+ Mockito.verify(session).open(Mockito.any());
// mocked session starts with default queue
assertEquals("default", session.getQueueName());
@@ -278,7 +277,7 @@ public void run() {
tmpConf.set("tez.queue.name", "");
}
- TezSessionState session = poolManager.getSession(null, tmpConf, true, llap);
+ TezSession session = poolManager.getSession(null, tmpConf, true, llap);
Thread.sleep((random.nextInt(9) % 10) * 1000);
session.setLegacyLlapMode(llap);
poolManager.returnSession(session);
@@ -323,20 +322,20 @@ public void testReturn() {
@Test
public void testCloseAndOpenDefault() throws Exception {
poolManager = new TestTezSessionPoolManager();
- TezSessionState session = Mockito.mock(TezSessionState.class);
+ TezSession session = Mockito.mock(TezSession.class);
Mockito.when(session.isDefault()).thenReturn(false);
Mockito.when(session.getConf()).thenReturn(conf);
poolManager.reopen(session);
Mockito.verify(session).close(true);
- Mockito.verify(session).open(Mockito.any());
+ Mockito.verify(session).open(Mockito.any());
}
@Test
public void testSessionDestroy() throws Exception {
poolManager = new TestTezSessionPoolManager();
- TezSessionState session = Mockito.mock(TezSessionState.class);
+ TezSession session = Mockito.mock(TezSession.class);
Mockito.when(session.isDefault()).thenReturn(false);
poolManager.destroy(session);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPoolManagerMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPoolManagerMetrics.java
index 3c79a34bee08..a34a1e2a27b9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPoolManagerMetrics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPoolManagerMetrics.java
@@ -42,7 +42,7 @@ public class TestTezSessionPoolManagerMetrics {
@Test
public void testRefreshMetrics(){
TezSessionPoolManager poolManager = mock(TezSessionPoolManager.class);
- List sessions = testSessions();
+ List sessions = testSessions();
when(poolManager.getSessions()).thenReturn(sessions);
TezSessionPoolManagerMetrics metrics = new TezSessionPoolManagerMetrics(poolManager);
@@ -56,7 +56,7 @@ public void testRefreshMetrics(){
@Test
public void testBasicMetricsUpdate() {
TezSessionPoolManager poolManager = mock(TezSessionPoolManager.class);
- List sessions = testSessions();
+ List sessions = testSessions();
when(poolManager.getSessions()).thenReturn(sessions);
TezSessionPoolManagerMetrics metrics = new TezSessionPoolManagerMetrics(poolManager);
@@ -93,9 +93,9 @@ public void testMetricsBeforeFirstUpdateAndStart(){
Assert.assertEquals(0.0, metrics.taskBacklogRatio.value, 0.0001);
}
- private List testSessions() {
- List sessions = new ArrayList<>();
- TezSessionState session = mock(TezSessionState.class);
+ private List testSessions() {
+ List sessions = new ArrayList<>();
+ TezSession session = mock(TezSession.class);
when(session.getMetrics()).thenReturn(testMetrics());
IntStream.range(0, NO_OF_SESSIONS).forEach(i -> sessions.add(session));
return sessions;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
index bafd0e547dc1..ffd1081d56f6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
@@ -75,7 +75,8 @@ public void testScratchDirDeletedInTheEventOfExceptionWhileOpeningSession() thro
TezSessionState sessionState = new TezSessionState(ss.getSessionId(), hiveConf) {
@Override
- void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) throws TezException, IOException {
+ void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console)
+ throws TezException, IOException {
super.openInternalUnsafe(isAsync, console);
// save scratch dir here as it's nullified while calling the cleanup
scratchDirPath.set(tezScratchDir.toUri().getPath());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 5afa13c6da30..fa71845ece27 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -178,7 +178,7 @@ public Edge answer(InvocationOnMock invocation) throws Throwable {
SessionState.start(hiveConf);
session = mock(TezClient.class);
sessionState = mock(TezSessionState.class);
- when(sessionState.getSession()).thenReturn(session);
+ when(sessionState.getTezClient()).thenReturn(session);
when(sessionState.reopen()).thenReturn(sessionState);
when(session.submitDAG(any(DAG.class)))
.thenThrow(new SessionNotRunning(""))
@@ -240,7 +240,7 @@ public void testSubmitOnNonPoolSession() throws Exception {
TezSessionState tezSessionState = mock(TezSessionState.class);
TezClient tezClient = mock(TezClient.class);
when(tezSessionState.reopen()).thenThrow(new HiveException("Dag cannot be submitted"));
- when(tezSessionState.getSession()).thenReturn(tezClient);
+ when(tezSessionState.getTezClient()).thenReturn(tezClient);
when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning(""));
doNothing().when(tezSessionState).destroy();
boolean isException = false;
@@ -264,7 +264,7 @@ public void testSubmitOnPoolSession() throws Exception {
TezClient tezClient = mock(TezClient.class);
when(tezSessionPoolSession.reopen()).thenThrow(new HiveException("Dag cannot be submitted"));
doNothing().when(tezSessionPoolSession).returnToSessionManager();
- when(tezSessionPoolSession.getSession()).thenReturn(tezClient);
+ when(tezSessionPoolSession.getTezClient()).thenReturn(tezClient);
when(tezSessionPoolSession.isDefault()).thenReturn(true);
when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning(""));
boolean isException = false;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 8ce58bb45cc2..082a8b8b97d4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -62,7 +62,6 @@
@RunWith(RetryTestRunner.class)
public class TestWorkloadManager {
- @SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class);
private final class GetSessionRunnable implements Runnable {
@@ -227,7 +226,7 @@ protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
@Override
public WmTezSession getSession(
- TezSessionState session, MappingInput input, HiveConf conf,
+ TezSession session, MappingInput input, HiveConf conf,
final WmContext wmContext) throws Exception {
// We want to wait for the iteration to finish and set the cluster fraction.
WmTezSession state = super.getSession(session, input, conf, null);
@@ -236,7 +235,7 @@ public WmTezSession getSession(
}
@Override
- public void destroy(TezSessionState session) throws Exception {
+ public void destroy(TezSession session) throws Exception {
super.destroy(session);
ensureWm();
}
@@ -252,7 +251,7 @@ public void returnAfterUse(TezSessionPoolSession session) throws Exception {
}
@Override
- public TezSessionState reopen(TezSessionState session) throws Exception {
+ public TezSession reopen(TezSession session) throws Exception {
session = super.reopen(session);
ensureWm();
return session;
@@ -269,10 +268,10 @@ public void testReuse() throws Exception {
MockQam qam = new MockQam();
WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
wm.start();
- TezSessionState nonPool = mock(TezSessionState.class);
+ TezSession nonPool = mock(TezSession.class);
when(nonPool.getConf()).thenReturn(conf);
doNothing().when(nonPool).close(anyBoolean());
- TezSessionState session = wm.getSession(nonPool, mappingInput("user"), conf);
+ TezSession session = wm.getSession(nonPool, mappingInput("user"), conf);
verify(nonPool).close(anyBoolean());
assertNotSame(nonPool, session);
session.returnToSessionManager();
@@ -282,7 +281,7 @@ public void testReuse() throws Exception {
session = wm.getSession(diffPool, mappingInput("user"), conf);
verify(diffPool).returnToSessionManager();
assertNotSame(diffPool, session);
- TezSessionState session2 = wm.getSession(session, mappingInput("user"), conf);
+ TezSession session2 = wm.getSession(session, mappingInput("user"), conf);
assertSame(session, session2);
}
@@ -294,7 +293,7 @@ public void testQueueName() throws Exception {
wm.start();
// The queue should be ignored.
conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
- TezSessionState session = wm.getSession(null, mappingInput("user"), conf);
+ TezSession session = wm.getSession(null, mappingInput("user"), conf);
assertEquals("test", session.getQueueName());
assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
session.setQueueName("test2");
@@ -415,7 +414,7 @@ public void testMappings() throws Exception {
verifyMapping(wm, conf, mappingInput("zzz", groups("g0", "g1"), "g1"), "g1");
// Explicit pool specification - invalid - there's no mapping that matches.
try {
- TezSessionState r = wm.getSession(
+ TezSession r = wm.getSession(
null, mappingInput("u0", groups("g0", "g1"), "u2"), conf);
fail("Expected failure, but got " + r);
} catch (Exception ex) {
@@ -428,7 +427,7 @@ public void testMappings() throws Exception {
verifyMapping(wm, conf, mappingInput("u0", groups("g0", "g1"), "u2"), "u2");
// The mapping that doesn't exist still shouldn't work.
try {
- TezSessionState r = wm.getSession(
+ TezSession r = wm.getSession(
null, mappingInput("u0", groups("g0", "g1"), "zzz"), conf);
fail("Expected failure, but got " + r);
} catch (Exception ex) {
@@ -824,7 +823,7 @@ public void testDisableEnable() throws Exception {
assertEquals(0, tezAmPool.getCurrentSize());
try {
- TezSessionState r = wm.getSession(null, mappingInput("A", null), conf, null);
+ TezSession r = wm.getSession(null, mappingInput("A", null), conf, null);
fail("Expected an error but got " + r);
} catch (WorkloadManager.NoPoolMappingException ex) {
// Ignore, this particular error is expected.
@@ -1316,7 +1315,7 @@ public void testAsyncSessionInitFailures() throws Exception {
SettableFuture failedWait = SettableFuture.create();
failedWait.setException(new Exception("foo"));
theOnlySession.setWaitForAmRegistryFuture(failedWait);
- TezSessionState retriedSession = wm.getSession(null, mappingInput("A"), conf);
+ TezSession retriedSession = wm.getSession(null, mappingInput("A"), conf);
assertNotNull(retriedSession);
assertNotSame(theOnlySession, retriedSession); // Should have been replaced.
retriedSession.returnToSessionManager();
@@ -1326,7 +1325,7 @@ public void testAsyncSessionInitFailures() throws Exception {
theOnlySession.setWaitForAmRegistryFuture(failedWait);
wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry.
try {
- TezSessionState r = wm.getSession(null, mappingInput("A"), conf);
+ TezSession r = wm.getSession(null, mappingInput("A"), conf);
fail("Expected an error but got " + r);
} catch (Exception ex) {
// Expected.
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
new file mode 100644
index 000000000000..8274e87187b0
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ZookeeperExternalSessionsRegistryClient}.
+ */
+public class TestZookeeperExternalSessionsRegistryClient {
+
+ /**
+ * Integration-style unit test that ensures {@link ZookeeperExternalSessionsRegistryClient}
+ * can discover sessions from ZooKeeper and hand them out via {@link ExternalSessionsRegistry#getSession()}.
+ */
+ @Test
+ public void testGetAndReturnSession() throws Exception {
+ CuratorFramework client = null;
+ ZookeeperExternalSessionsRegistryClient registry = null;
+
+ try (TestingServer server = new TestingServer()) {
+ String connectString = server.getConnectString();
+
+ HiveConf conf = new HiveConf();
+ conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+ conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns");
+ conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 1);
+
+ String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+ String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+ client = builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build();
+ client.start();
+
+ client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_1");
+ client.create().forPath(effectivePath + "/app_2");
+
+ registry = new ZookeeperExternalSessionsRegistryClient(conf);
+
+ String first = registry.getSession();
+ assertTrue("app_1".equals(first) || "app_2".equals(first));
+
+ registry.returnSession(first);
+ String second = registry.getSession();
+ assertNotNull(second);
+ assertTrue("app_1".equals(second) || "app_2".equals(second));
+ } finally {
+ if (registry != null) {
+ registry.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+ /**
+ * Verifies that the same external session ID can be obtained and returned
+ * multiple times in a row from {@link ZookeeperExternalSessionsRegistryClient}.
+ */
+ @Test
+ public void testReuseSameSession() throws Exception {
+ CuratorFramework client = null;
+ ZookeeperExternalSessionsRegistryClient registry = null;
+
+ try (TestingServer server = new TestingServer()) {
+ String connectString = server.getConnectString();
+
+ HiveConf conf = new HiveConf();
+ conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+ conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_reuse");
+ conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 1);
+
+ String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+ String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+ client = builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build();
+ client.start();
+
+ client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_reuse");
+
+ registry = new ZookeeperExternalSessionsRegistryClient(conf);
+
+ String first = registry.getSession();
+ assertEquals("app_reuse", first);
+
+ registry.returnSession(first);
+ String second = registry.getSession();
+ assertEquals("app_reuse", second);
+
+ registry.returnSession(second);
+ String third = registry.getSession();
+ assertEquals("app_reuse", third);
+ } finally {
+ if (registry != null) {
+ registry.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+}
+
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index a237bf16a98e..5d9a7649465f 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -165,16 +165,16 @@ public boolean isLocalQuery(String queryIdOrTag) {
}
@Override
- public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) throws HiveException {
- killQuery(queryIdOrTag, errMsg, conf, false, SessionState.get().getUserName(), isAdmin());
+ public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolean isYarn) throws HiveException {
+ killQuery(queryIdOrTag, errMsg, conf, false, SessionState.get().getUserName(), isAdmin(), isYarn);
}
public void killLocalQuery(String queryIdOrTag, HiveConf conf, String doAs, boolean doAsAdmin) throws HiveException {
- killQuery(queryIdOrTag, null, conf, true, doAs, doAsAdmin);
+ killQuery(queryIdOrTag, null, conf, true, doAs, doAsAdmin, true);
}
private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolean onlyLocal, String doAs,
- boolean doAsAdmin) throws HiveException {
+ boolean doAsAdmin, boolean isYarn) throws HiveException {
errMsg = StringUtils.defaultString(errMsg, KillQueriesOperation.KILL_QUERY_MESSAGE);
TagOrId tagOrId = TagOrId.UNKNOWN;
Set operationsToKill = new HashSet<>();
@@ -190,7 +190,7 @@ private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolea
}
}
if (!operationsToKill.isEmpty()) {
- killOperations(queryIdOrTag, errMsg, conf, tagOrId, operationsToKill, doAs, doAsAdmin);
+ killOperations(queryIdOrTag, errMsg, conf, tagOrId, operationsToKill, doAs, doAsAdmin, isYarn);
} else {
LOG.debug("Query not found with tag/id: {}", queryIdOrTag);
if (!onlyLocal && killQueryZookeeperManager != null &&
@@ -210,7 +210,7 @@ private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolea
}
private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, TagOrId tagOrId,
- Set operationsToKill, String doAs, boolean doAsAdmin) throws HiveException {
+ Set operationsToKill, String doAs, boolean doAsAdmin, boolean isYarn) throws HiveException {
try {
switch (tagOrId) {
case ID:
@@ -221,7 +221,9 @@ private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, T
if (queryTag == null) {
queryTag = queryIdOrTag;
}
- killChildYarnJobs(conf, queryTag, doAs, doAsAdmin);
+ if (isYarn) {
+ killChildYarnJobs(conf, queryTag, doAs, doAsAdmin);
+ }
} else {
// no privilege to cancel
throw new HiveSQLException("No privilege to kill query id");
@@ -237,7 +239,9 @@ private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, T
if (numCanceled == 0) {
throw new HiveSQLException("No privilege to kill query tag");
} else {
- killChildYarnJobs(conf, queryIdOrTag, doAs, doAsAdmin);
+ if (isYarn) {
+ killChildYarnJobs(conf, queryIdOrTag, doAs, doAsAdmin);
+ }
}
break;
case UNKNOWN: