Skip to content

Commit 9725e38

Browse files
TEZ-4689: Introduce Node abstraction for DAGAppMaster instead of sepagrate NodeManager-related fields (#461) (Raghav Aggarwal reviewed by Laszlo Bodor)
1 parent ea890f8 commit 9725e38

13 files changed

Lines changed: 205 additions & 78 deletions

File tree

tez-dag/src/main/java/org/apache/tez/client/LocalClient.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.apache.tez.dag.app.DAGAppMaster;
6868
import org.apache.tez.dag.app.DAGAppMasterState;
6969
import org.apache.tez.dag.app.LocalDAGAppMaster;
70+
import org.apache.tez.dag.app.LocalNodeContext;
71+
import org.apache.tez.dag.app.NodeContext;
7072
import org.apache.tez.dag.app.dag.DAG;
7173

7274
import com.google.common.annotations.VisibleForTesting;
@@ -369,10 +371,11 @@ public void run() {
369371
long appSubmitTime = System.currentTimeMillis();
370372

371373
dagAppMaster =
372-
createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
374+
createDAGAppMaster(applicationAttemptId, cId,
373375
SystemClock.getInstance(), appSubmitTime, isSession, userDir.toUri().getPath(),
374376
new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()},
375-
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName());
377+
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName(),
378+
new LocalNodeContext(currentHost, nmPort, nmHttpPort));
376379
DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
377380
clientHandler = new DAGClientHandler(dagAppMaster);
378381
((AsyncDispatcher)dagAppMaster.getDispatcher()).setDrainEventsOnStop();
@@ -395,27 +398,32 @@ public void run() {
395398

396399
// this can be overridden by test code to create a mock app
397400
@VisibleForTesting
398-
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
399-
ContainerId cId, String currentHost, int nmPort,
400-
int nmHttpPort,
401-
Clock clock, long appSubmitTime, boolean isSession,
402-
String userDir,
403-
String[] localDirs, String[] logDirs,
404-
Credentials credentials, String jobUserName) throws
405-
IOException {
401+
protected DAGAppMaster createDAGAppMaster(
402+
ApplicationAttemptId applicationAttemptId,
403+
ContainerId cId,
404+
Clock clock,
405+
long appSubmitTime,
406+
boolean isSession,
407+
String userDir,
408+
String[] localDirs,
409+
String[] logDirs,
410+
Credentials credentials,
411+
String jobUserName,
412+
NodeContext nodeContext)
413+
throws IOException {
406414

407415
// Read in additional information about external services
408416
AMPluginDescriptorProto amPluginDescriptorProto =
409417
TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
410418
.getAmPluginDescriptor();
411419

412420
return isLocalWithoutNetwork
413-
? new LocalDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
421+
? new LocalDAGAppMaster(applicationAttemptId, cId,
414422
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
415-
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto)
416-
: new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
423+
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext)
424+
: new DAGAppMaster(applicationAttemptId, cId,
417425
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
418-
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
426+
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext);
419427
}
420428

421429
@Override

tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,16 @@
209209
* The state machine is encapsulated in the implementation of Job interface.
210210
* All state changes happens via Job interface. Each event
211211
* results in a Finite State Transition in Job.
212-
*
212+
* <p>
213213
* Tez DAG AppMaster is the composition of loosely coupled services. The services
214214
* interact with each other via events. The components resembles the
215215
* Actors model. The component acts on received event and send out the
216216
* events to other components.
217217
* This keeps it highly concurrent with no or minimal synchronization needs.
218-
*
218+
* <p>
219219
* The events are dispatched by a central Dispatch mechanism. All components
220220
* register to the Dispatcher.
221-
*
221+
* <p>
222222
* The information is shared across different components using AppContext.
223223
*/
224224

@@ -245,9 +245,6 @@ public class DAGAppMaster extends AbstractService {
245245
private String appName;
246246
private final ApplicationAttemptId appAttemptID;
247247
private final ContainerId containerID;
248-
private final String nmHost;
249-
private final int nmPort;
250-
private final int nmHttpPort;
251248
private final String workingDirectory;
252249
private final String[] localDirs;
253250
private final String[] logDirs;
@@ -309,6 +306,7 @@ public class DAGAppMaster extends AbstractService {
309306

310307
private ListeningExecutorService execService;
311308
private final PluginManager pluginManager;
309+
private final NodeContext nodeContext;
312310

313311

314312
/**
@@ -344,20 +342,19 @@ public class DAGAppMaster extends AbstractService {
344342
private TezDAGHook[] hooks = {};
345343

346344
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
347-
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
345+
ContainerId containerId,
348346
Clock clock, long appSubmitTime, boolean isSession, String workingDirectory,
349347
String [] localDirs, String[] logDirs, String clientVersion,
350-
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
348+
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto,
349+
NodeContext nodeContext) {
351350
super(DAGAppMaster.class.getName());
352351
this.mdcContext = LoggingUtils.setupLog4j();
353352
this.clock = clock;
354353
this.startTime = clock.getTime();
355354
this.appSubmitTime = appSubmitTime;
356355
this.appAttemptID = applicationAttemptId;
357356
this.containerID = containerId;
358-
this.nmHost = nmHost;
359-
this.nmPort = nmPort;
360-
this.nmHttpPort = nmHttpPort;
357+
this.nodeContext = nodeContext;
361358
this.state = DAGAppMasterState.NEW;
362359
this.isSession = isSession;
363360
this.workingDirectory = workingDirectory;
@@ -371,9 +368,6 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
371368
.createRemoteUser(jobUserName);
372369
this.appMasterUgi.addCredentials(amCredentials);
373370

374-
this.containerLogs = getRunningLogURL(this.nmHost + ":" + this.nmHttpPort,
375-
this.containerID.toString(), this.appMasterUgi.getShortUserName());
376-
377371
LOG.info("Created DAGAppMaster for application " + applicationAttemptId
378372
+ ", versionInfo=" + dagVersionInfo);
379373
TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am");
@@ -443,6 +437,14 @@ protected void serviceInit(final Configuration conf) throws Exception {
443437
this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
444438
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
445439

440+
if (!isLocal) {
441+
this.containerLogs =
442+
getRunningLogURL(
443+
nodeContext.nodeHost() + ":" + nodeContext.nodeHttpPort(),
444+
this.containerID.toString(),
445+
this.appMasterUgi.getShortUserName());
446+
}
447+
446448
UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);
447449

448450
PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(isLocal, defaultPayload);
@@ -1207,15 +1209,7 @@ public ContainerId getAppContainerId() {
12071209
}
12081210

12091211
public String getAppNMHost() {
1210-
return nmHost;
1211-
}
1212-
1213-
public int getAppNMPort() {
1214-
return nmPort;
1215-
}
1216-
1217-
public int getAppNMHttpPort() {
1218-
return nmHttpPort;
1212+
return nodeContext.nodeHost();
12191213
}
12201214

12211215
public int getRpcPort() {
@@ -2415,13 +2409,15 @@ public static void main(String[] args) {
24152409
// Install the tez class loader, which can be used add new resources
24162410
TezClassLoader.setupTezClassLoader();
24172411
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
2418-
final String pid = System.getenv().get("JVM_PID");
24192412

2420-
String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name());
2421-
String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
2422-
String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
2413+
final String pid = System.getenv().get("JVM_PID");
24232414
String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
24242415
String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
2416+
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
2417+
String pwd = System.getenv(ApplicationConstants.Environment.PWD.name());
2418+
String localDirs = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name());
2419+
String logDirs = System.getenv(ApplicationConstants.Environment.LOG_DIRS.name());
2420+
24252421
if (clientVersion == null) {
24262422
clientVersion = VersionInfo.UNKNOWN;
24272423
}
@@ -2431,18 +2427,20 @@ public static void main(String[] args) {
24312427

24322428
Configuration conf = new Configuration();
24332429

2434-
AMExtensions amExtensions = getFrameworkService(conf).getAMExtensions();
2430+
ServerFrameworkService frameworkService = getFrameworkService(conf);
2431+
AMExtensions amExtensions = frameworkService.getAMExtensions();
24352432
DAGProtos.ConfigurationProto confProto = amExtensions.loadConfigurationProto();
24362433
TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());
24372434

2435+
NodeContext nodeContext = frameworkService.getNodeContext();
2436+
24382437
ContainerId containerId = amExtensions.allocateContainerId(conf);
24392438

24402439
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
24412440
org.apache.hadoop.ipc.CallerContext.setCurrent(new org.apache.hadoop.ipc.CallerContext
24422441
.Builder("tez_appmaster_" + containerId.getApplicationAttemptId()
24432442
).build());
24442443
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
2445-
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
24462444

24472445
// Command line options
24482446
Option option = Option.builder()
@@ -2462,9 +2460,9 @@ public static void main(String[] args) {
24622460
+ ", jvmPid=" + pid
24632461
+ ", userFromEnv=" + jobUserName
24642462
+ ", cliSessionOption=" + sessionModeCliOption
2465-
+ ", pwd=" + System.getenv(ApplicationConstants.Environment.PWD.name())
2466-
+ ", localDirs=" + System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())
2467-
+ ", logDirs=" + System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
2463+
+ ", pwd=" + pwd
2464+
+ ", localDirs=" + localDirs
2465+
+ ", logDirs=" + logDirs);
24682466

24692467
AMPluginDescriptorProto amPluginDescriptorProto = null;
24702468
if (confProto.hasAmPluginDescriptor()) {
@@ -2477,20 +2475,26 @@ public static void main(String[] args) {
24772475
TezUtilsInternal.setSecurityUtilConfigration(LOG, conf);
24782476

24792477
DAGAppMaster appMaster =
2480-
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString),
2481-
Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime, sessionModeCliOption,
2482-
System.getenv(ApplicationConstants.Environment.PWD.name()),
2483-
TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
2484-
TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
2485-
clientVersion, credentials, jobUserName, amPluginDescriptorProto);
2478+
new DAGAppMaster(
2479+
applicationAttemptId,
2480+
containerId,
2481+
new SystemClock(),
2482+
appSubmitTime,
2483+
sessionModeCliOption,
2484+
pwd,
2485+
TezCommonUtils.getTrimmedStrings(localDirs),
2486+
TezCommonUtils.getTrimmedStrings(logDirs),
2487+
clientVersion,
2488+
credentials,
2489+
jobUserName,
2490+
amPluginDescriptorProto,
2491+
nodeContext);
24862492
ShutdownHookManager.get().addShutdownHook(new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
24872493

24882494
// log the system properties
24892495
if (LOG.isInfoEnabled()) {
24902496
String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf);
2491-
if (systemPropsToLog != null) {
2492-
LOG.info(systemPropsToLog);
2493-
}
2497+
LOG.info(systemPropsToLog);
24942498
}
24952499

24962500
initAndStartAppMaster(appMaster, conf);

tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@
3232
public class LocalDAGAppMaster extends DAGAppMaster {
3333

3434
public LocalDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
35-
String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession,
35+
Clock clock, long appSubmitTime, boolean isSession,
3636
String workingDirectory, String[] localDirs, String[] logDirs, String clientVersion,
37-
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
38-
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
37+
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto,
38+
NodeContext nodeContext) {
39+
super(applicationAttemptId, containerId, clock, appSubmitTime,
3940
isSession, workingDirectory, localDirs, logDirs, clientVersion, credentials, jobUserName,
40-
pluginDescriptorProto);
41+
pluginDescriptorProto, nodeContext);
4142
}
4243

4344
@Override
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* <p>http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package org.apache.tez.dag.app;
16+
17+
/** Local implementation of NodeContext. */
18+
public record LocalNodeContext(String nodeHost, int nodePort, int nodeHttpPort)
19+
implements NodeContext {}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* <p>http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package org.apache.tez.dag.app;
16+
17+
/** Provides context information about the node on which the DAGAppMaster is running. */
18+
public sealed interface NodeContext permits YarnNodeManagerContext, LocalNodeContext {
19+
20+
/**
21+
* @return The node host string
22+
*/
23+
String nodeHost();
24+
25+
/**
26+
* @return The node port string
27+
*/
28+
int nodePort();
29+
30+
/**
31+
* @return The node HTTP port string
32+
*/
33+
int nodeHttpPort();
34+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* <p>http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package org.apache.tez.dag.app;
16+
17+
import org.apache.hadoop.yarn.api.ApplicationConstants;
18+
19+
/** YARN specific implementation of NodeContext. Resolves YARN environment variables on demand. */
20+
public final class YarnNodeManagerContext implements NodeContext {
21+
22+
@Override
23+
public String nodeHost() {
24+
return System.getenv(ApplicationConstants.Environment.NM_HOST.name());
25+
}
26+
27+
@Override
28+
public int nodePort() {
29+
String port = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
30+
return Integer.parseInt(port);
31+
}
32+
33+
@Override
34+
public int nodeHttpPort() {
35+
String port = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
36+
return Integer.parseInt(port);
37+
}
38+
}

tez-dag/src/main/java/org/apache/tez/frameworkplugins/ServerFrameworkService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.hadoop.conf.Configuration;
2121
import org.apache.tez.client.registry.AMRegistry;
22+
import org.apache.tez.dag.app.NodeContext;
2223

2324
/**
2425
* A {@code FrameworkService} that runs inside the Application Master (AM) process.
@@ -30,4 +31,5 @@
3031
public interface ServerFrameworkService extends FrameworkService {
3132
AMRegistry getAMRegistry(Configuration conf);
3233
AMExtensions getAMExtensions();
34+
NodeContext getNodeContext();
3335
}

0 commit comments

Comments
 (0)