Skip to content

Commit 14e44fe

Browse files
author
jiayuhan-it
committed
Merge branch 'work/flush-logs' into 'master'
Fix missing mpi logs See merge request xt_hadoop/hbox!35
2 parents f73740b + 57acfd0 commit 14e44fe

File tree

27 files changed

+289
-216
lines changed

27 files changed

+289
-216
lines changed

common/src/main/java/net/qihoo/hbox/conf/HboxConfiguration.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public HboxConfiguration(Configuration conf) {
5050

5151
public static final String HBOX_LOG_PULL_INTERVAL = "hbox.log.pull.interval";
5252

53-
public static final int DEFAULT_HBOX_LOG_PULL_INTERVAL = 10000;
53+
public static final int DEFAULT_HBOX_LOG_PULL_INTERVAL = 2000;
5454

5555
public static final String HBOX_MEMORY_OVERHEAD_FRACTION = "hbox.memory.overhead.fraction";
5656

@@ -302,7 +302,7 @@ public HboxConfiguration(Configuration conf) {
302302

303303
public static final String HBOX_MESSAGES_LEN_MAX = "hbox.messages.len.max";
304304

305-
public static final int DEFAULT_HBOX_MESSAGES_LEN_MAX = 1000;
305+
public static final int DEFAULT_HBOX_MESSAGES_LEN_MAX = 10000;
306306

307307
public static final String HBOX_EXECUTE_NODE_LIMIT = "hbox.execute.node.limit";
308308

@@ -557,6 +557,9 @@ public HboxConfiguration(Configuration conf) {
557557
public static final String HBOX_MPI_CONTAINER_UPDATE_APP_STATUS_RETRY = "hbox.mpi.container.update.status.retry";
558558
public static final int DEFAULT_HBOX_MPI_CONTAINER_UPDATE_APP_STATUS_RETRY = 3;
559559

560+
public static final String HBOX_AGG_ALL_MPI_STDERR = "hbox.agg.all.mpi.stderr";
561+
public static final boolean DEFAULT_HBOX_AGG_ALL_MPI_STDERR = false;
562+
560563
/**
561564
* Configuration for horovod app
562565
*/

core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java

Lines changed: 100 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.LinkedBlockingDeque;
2626
import java.util.concurrent.LinkedBlockingQueue;
27+
import java.util.concurrent.TimeUnit;
2728
import net.qihoo.hbox.api.HboxConstants;
2829
import net.qihoo.hbox.common.*;
2930
import net.qihoo.hbox.common.exceptions.HboxExecException;
@@ -1921,8 +1922,66 @@ private void launchMpiExec() throws IOException {
19211922

19221923
mpiexec.append(mpiInstallDir).append(File.separator).append("bin").append(File.separator);
19231924
mpiexec.append("mpiexec");
1925+
1926+
// get mpi version
1927+
Process mpiexecVersion = null;
1928+
String mpiexecVersionDesc = null;
1929+
try {
1930+
mpiexecVersion = new ProcessBuilder(
1931+
"env",
1932+
"OPAL_PREFIX=" + mpiInstallDir,
1933+
"LD_LIBRARY_PATH=" + ldLibraryPath.toString(),
1934+
mpiexec.toString(),
1935+
"-V")
1936+
.redirectErrorStream(true)
1937+
.start();
1938+
if (null != mpiexecVersion) {
1939+
mpiexecVersion.getOutputStream().close();
1940+
final long startMs = System.currentTimeMillis();
1941+
try (final BufferedReader reader =
1942+
new BufferedReader(new InputStreamReader(mpiexecVersion.getInputStream()))) {
1943+
while (mpiexecVersion.isAlive() && System.currentTimeMillis() - startMs < 1000) {
1944+
if (reader.ready()) {
1945+
mpiexecVersionDesc = reader.readLine();
1946+
if (null != mpiexecVersionDesc) {
1947+
break;
1948+
}
1949+
}
1950+
Utilities.sleep(50);
1951+
}
1952+
if (null == mpiexecVersionDesc) {
1953+
mpiexecVersionDesc = reader.readLine();
1954+
}
1955+
}
1956+
}
1957+
} catch (final Throwable e) {
1958+
LOG.warn("cannot read the mpiexec version", e);
1959+
} finally {
1960+
if (null != mpiexecVersion) {
1961+
mpiexecVersion.destroy();
1962+
try {
1963+
if (!mpiexecVersion.waitFor(1, TimeUnit.SECONDS)) {
1964+
mpiexecVersion.destroyForcibly();
1965+
}
1966+
if (0 != mpiexecVersion.exitValue()) {
1967+
LOG.warn("mpiexec -V fail" + (null == mpiexecVersionDesc ? "" : ": " + mpiexecVersionDesc));
1968+
mpiexecVersionDesc = null;
1969+
}
1970+
} catch (final InterruptedException ignore) {
1971+
Thread.currentThread().interrupt();
1972+
mpiexecVersion.destroyForcibly();
1973+
} catch (final IllegalThreadStateException e) {
1974+
LOG.warn("cannot read the mpiexec version", e);
1975+
}
1976+
mpiexecVersion.getInputStream().close();
1977+
mpiexecVersion = null;
1978+
}
1979+
if (null != mpiexecVersionDesc) {
1980+
Span.current().setAttribute("mpi.runtime.desc", mpiexecVersionDesc);
1981+
}
1982+
}
1983+
19241984
mpiexecArgs.add(mpiexec.toString());
1925-
mpiexecArgs.add("--tag-output");
19261985
mpiexecArgs.add("--host");
19271986
for (Container container : acquiredWorkerContainers) {
19281987
nodes.append(container.getNodeId().getHost()).append(",");
@@ -1942,17 +2001,30 @@ private void launchMpiExec() throws IOException {
19422001
// * unset PMIX_INSTALL_PREFIX, avoid introduce the openmpi prefix path on AM
19432002
// * fix PATH, remove the openmpi prefix of AM which is appended to the worker when OPAL_PREFIX of AM is set
19442003
// * fix LD_LIBRARY_PATH, also remove the openmpi prefix of AM
1945-
// * redirect stderr to both mpi-stderr and normal stderr (which is shend back to mpiexec by orted)
2004+
// * redirect stderr to both mpi-stderr and normal stderr (which is send back to mpiexec by orted)
19462005
mpiexecArgs.add("/bin/bash");
19472006
mpiexecArgs.add("-ec");
1948-
mpiexecArgs.add(String.format(
1949-
"unset PMIX_INSTALL_PREFIX; LD_LIBRARY_PATH=\"${LD_LIBRARY_PATH#*:}\" PATH=\"${PATH#*:}\" exec 2> >(tee \"$%s/%s\" >&2) \"$@\"",
1950-
HboxConstants.Environment.HBOX_CONTAINER_LOG_DIR, HboxConstants.MPI_STD_ERR_FILE));
2007+
if (conf.getBoolean(
2008+
HboxConfiguration.HBOX_AGG_ALL_MPI_STDERR, HboxConfiguration.DEFAULT_HBOX_AGG_ALL_MPI_STDERR)) {
2009+
mpiexecArgs.add(String.format(
2010+
"unset PMIX_INSTALL_PREFIX; LD_LIBRARY_PATH=\"${LD_LIBRARY_PATH#*:}\" PATH=\"${PATH#*:}\" exec 2> >(tee \"$%s/%s\" | sed \"s/^/[rank $((%s-1)) stderr] /\" >&2) \"$@\"",
2011+
HboxConstants.Environment.HBOX_CONTAINER_LOG_DIR,
2012+
HboxConstants.MPI_STD_ERR_FILE,
2013+
HboxConstants.Environment.HBOX_TF_INDEX));
2014+
} else {
2015+
mpiexecArgs.add(String.format(
2016+
"unset PMIX_INSTALL_PREFIX; (( %s == 1 )) && exec 2> >(tee \"$%s/%s\" >&2) || exec 2>\"$%s/%s\"; LD_LIBRARY_PATH=\"${LD_LIBRARY_PATH#*:}\" PATH=\"${PATH#*:}\" exec \"$@\"",
2017+
HboxConstants.Environment.HBOX_TF_INDEX,
2018+
HboxConstants.Environment.HBOX_CONTAINER_LOG_DIR,
2019+
HboxConstants.MPI_STD_ERR_FILE,
2020+
HboxConstants.Environment.HBOX_CONTAINER_LOG_DIR,
2021+
HboxConstants.MPI_STD_ERR_FILE));
2022+
}
19512023
mpiexecArgs.add("--");
19522024

19532025
// The second bash snippet run on workers:
19542026
// * for the 1st rank, redirected stdout to both mpi-stdout and normal stdout
1955-
// (which is shend back to mpiexec by orted)
2027+
// (which is send back to mpiexec by orted)
19562028
// * for other ranks, stdout is only redirected to mpi-stdout
19572029
mpiexecArgs.add("/bin/bash");
19582030
mpiexecArgs.add("-ec");
@@ -2309,17 +2381,31 @@ private void appendMessage(String message, boolean logEnable) {
23092381
}
23102382

23112383
private void appendMessage(Message message) {
2312-
if (applicationMessageQueue.size()
2313-
>= conf.getInt(
2314-
HboxConfiguration.HBOX_MESSAGES_LEN_MAX, HboxConfiguration.DEFAULT_HBOX_MESSAGES_LEN_MAX)) {
2315-
applicationMessageQueue.poll();
2316-
}
2317-
if (!applicationMessageQueue.offer(message)) {
2318-
LOG.warn("Message queue is full, this message will be ingored");
2384+
for (int retries = 10; retries > 0; --retries) {
2385+
try {
2386+
if (applicationMessageQueue.offer(message, 1, TimeUnit.SECONDS)) {
2387+
return;
2388+
}
2389+
} catch (final InterruptedException e) {
2390+
Thread.currentThread().interrupt();
2391+
LOG.warn("Dropped message by InterruptedException: " + message.getMessage());
2392+
return;
2393+
}
23192394
}
2395+
LOG.warn("Dropped message after retries: " + message.getMessage());
23202396
}
23212397

23222398
private void unregisterApp(FinalApplicationStatus finalStatus, String diagnostics) {
2399+
for (int retry = 30; retry > 0; ++retry) {
2400+
if (applicationMessageQueue.isEmpty()) {
2401+
break;
2402+
}
2403+
Utilities.sleep(1 * 1000);
2404+
}
2405+
if (!applicationMessageQueue.isEmpty()) {
2406+
LOG.error("applicationMessageQueue is not drained");
2407+
}
2408+
23232409
try {
23242410
amrmAsync.unregisterApplicationMaster(finalStatus, diagnostics, applicationHistoryUrl);
23252411
amrmAsync.stop();
@@ -3781,7 +3867,7 @@ public void run() {
37813867
throw new RuntimeException("Application Failed, retry starting. Note that container memory auto scale");
37823868
}
37833869

3784-
this.appendMessage("Unregister Application", true);
3870+
this.appendMessage("Flush log and Unregister Application", true);
37853871
unregisterApp(finalSuccess ? FinalApplicationStatus.SUCCEEDED : FinalApplicationStatus.FAILED, diagnostics);
37863872

37873873
return finalSuccess;

core/src/main/java/net/qihoo/hbox/AM/ApplicationMessageService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void start() {
5757

5858
@Override
5959
public Message[] fetchApplicationMessages() {
60-
int defaultMaxBatch = 100;
60+
int defaultMaxBatch = ApplicationMessageProtocol.DEFAULT_BATCH;
6161
return fetchApplicationMessages(defaultMaxBatch);
6262
}
6363

core/src/main/java/net/qihoo/hbox/api/ApplicationMessageProtocol.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
public interface ApplicationMessageProtocol extends VersionedProtocol {
1010

1111
public static final long versionID = 1L;
12+
public static final int DEFAULT_BATCH = 200;
1213

1314
/*
1415
* Fetch application from ApplicationMaster.

core/src/main/java/net/qihoo/hbox/client/Client.java

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,7 +1341,9 @@ private boolean waitCompleted() throws IOException, YarnException {
13411341
ApplicationMessageProtocol hboxClient = null;
13421342
YarnApplicationState lastYarnApplicationState = null;
13431343
boolean isRunning = true;
1344+
boolean result = false;
13441345

1346+
long lastUpdateMs = System.currentTimeMillis();
13451347
LOG.info("The url to track the job: " + applicationReport.getTrackingUrl());
13461348
while (true) {
13471349
assert (applicationReport != null);
@@ -1367,10 +1369,12 @@ private boolean waitCompleted() throws IOException, YarnException {
13671369
hboxClient = getAppMessageHandler(conf, applicationReport.getHost(), applicationReport.getRpcPort());
13681370
}
13691371

1372+
boolean messageQueueFull = false;
13701373
if (hboxClient != null) {
13711374
try {
13721375
Message[] messages = hboxClient.fetchApplicationMessages();
13731376
if (messages != null && messages.length > 0) {
1377+
messageQueueFull = messages.length >= ApplicationMessageProtocol.DEFAULT_BATCH;
13741378
for (Message message : messages) {
13751379
if (message.getLogType() == LogType.STDERR) {
13761380
LOG.info(message.getMessage());
@@ -1389,27 +1393,70 @@ private boolean waitCompleted() throws IOException, YarnException {
13891393
hboxClient = null;
13901394
isRunning = false;
13911395
if (FinalApplicationStatus.SUCCEEDED == finalApplicationStatus) {
1392-
return true;
1396+
result = true;
1397+
break;
13931398
} else {
13941399
LOG.error("Application has completed failed with YarnApplicationState="
13951400
+ yarnApplicationState + " and FinalApplicationStatus="
13961401
+ finalApplicationStatus);
1397-
return false;
1402+
result = false;
1403+
break;
13981404
}
13991405
} else if (YarnApplicationState.KILLED == yarnApplicationState
14001406
|| YarnApplicationState.FAILED == yarnApplicationState) {
14011407
hboxClient = null;
14021408
isRunning = false;
14031409
LOG.error("Application has completed with YarnApplicationState=" + yarnApplicationState
14041410
+ " and FinalApplicationStatus=" + finalApplicationStatus);
1405-
return false;
1411+
result = false;
1412+
break;
14061413
}
14071414

1408-
int logInterval = conf.getInt(
1415+
final int logInterval = conf.getInt(
14091416
HboxConfiguration.HBOX_LOG_PULL_INTERVAL, HboxConfiguration.DEFAULT_HBOX_LOG_PULL_INTERVAL);
1410-
Utilities.sleep(logInterval);
1411-
applicationReport = yarnClient.getApplicationReport(applicationId);
1417+
1418+
if (!messageQueueFull) {
1419+
Utilities.sleep(logInterval);
1420+
}
1421+
1422+
final long currentMs = System.currentTimeMillis();
1423+
if (currentMs - lastUpdateMs >= logInterval) {
1424+
applicationReport = yarnClient.getApplicationReport(applicationId);
1425+
lastUpdateMs = currentMs;
1426+
}
1427+
}
1428+
1429+
if (hboxClient != null) {
1430+
boolean first = true;
1431+
while (true) {
1432+
try {
1433+
final Message[] messages = hboxClient.fetchApplicationMessages(1000);
1434+
if (messages == null || messages.length == 0) {
1435+
break;
1436+
}
1437+
1438+
for (final Message message : messages) {
1439+
if (first) {
1440+
first = false;
1441+
LOG.info("Flushing logs from AM ...");
1442+
}
1443+
if (message.getLogType() == LogType.STDERR) {
1444+
LOG.info(message.getMessage());
1445+
} else {
1446+
System.out.println(message.getMessage());
1447+
}
1448+
}
1449+
} catch (final UndeclaredThrowableException e) {
1450+
LOG.info("Connecting to ApplicationManager failed when flushing logs from am: ", e);
1451+
break;
1452+
}
1453+
}
1454+
if (!first) {
1455+
LOG.info("AM logs are drained.");
1456+
}
14121457
}
1458+
1459+
return result;
14131460
}
14141461

14151462
public static void main(String[] args) {
@@ -1437,6 +1484,24 @@ public static void main(String[] args) {
14371484
AttributeKey.stringArrayKey("hbox.args"), Arrays.asList(client.clientArguments.hboxCommandArgs))
14381485
.setAttribute("hbox.worker.num", client.clientArguments.workerNum)
14391486
.setAttribute("hbox.ps.num", client.clientArguments.psNum));
1487+
if ((client.clientArguments.appType.equals("MPI")
1488+
|| client.clientArguments.appType.equals("TENSORNET")
1489+
|| client.clientArguments.appType.equals("HOROVOD"))) {
1490+
mainSpan.setAttribute(
1491+
HboxConfiguration.HBOX_MPI_INSTALL_DIR_ENABLE,
1492+
client.conf.getBoolean(
1493+
HboxConfiguration.HBOX_MPI_INSTALL_DIR_ENABLE,
1494+
HboxConfiguration.DEFAULT_HBOX_MPI_INSTALL_DIR_ENABLE));
1495+
mainSpan.setAttribute(
1496+
HboxConfiguration.HBOX_MPI_INSTALL_DIR,
1497+
client.conf.get(
1498+
HboxConfiguration.HBOX_MPI_INSTALL_DIR, HboxConfiguration.DEFAULT_HBOX_MPI_INSTALL_DIR));
1499+
final String mpiRemotePath = client.conf.get(HboxConfiguration.HBOX_CACHED_MPI_PACKAGE_PATH);
1500+
if (null != mpiRemotePath) {
1501+
mainSpan.setAttribute(HboxConfiguration.HBOX_CACHED_MPI_PACKAGE_PATH, mpiRemotePath);
1502+
}
1503+
}
1504+
14401505
int exitCode = 1; // error for exceptions
14411506

14421507
try (final Scope scope = mainSpan.makeCurrent()) {

core/src/main/java/net/qihoo/hbox/client/ClientArguments.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,12 @@ private void cliParser(String[] args) throws ParseException, IOException, ClassN
517517
}
518518
}
519519

520+
if (null == hboxCommandArgs || hboxCommandArgs.length == 0) {
521+
LOG.error("No commands to submit");
522+
printUsage(allOptions);
523+
System.exit(1);
524+
}
525+
520526
if (cliParser.hasOption("isRenameInputFile")) {
521527
String renameInputFile = cliParser.getOptionValue("isRenameInputFile");
522528
isRenameInputFile = Boolean.parseBoolean(renameInputFile);

core/src/main/java/net/qihoo/hbox/util/Utilities.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ private Utilities() {}
2929
public static void sleep(long millis) {
3030
try {
3131
Thread.sleep(millis);
32-
} catch (InterruptedException e) {
32+
} catch (final InterruptedException e) {
33+
Thread.currentThread().interrupt();
3334
LOG.warn("Sleeping are Interrupted ...", e);
3435
}
3536
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@
134134
<dependency>
135135
<groupId>io.opentelemetry</groupId>
136136
<artifactId>opentelemetry-bom</artifactId>
137-
<version>1.49.0</version>
137+
<version>1.50.0</version>
138138
<type>pom</type>
139139
<scope>import</scope>
140140
</dependency>

tests/test-complex/hpc-yarn.sh

Lines changed: 0 additions & 24 deletions
This file was deleted.

tests/test-complex/lycc-yarn.sh

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)