From aab4a64e2a424c064854ba928a45733455afb7e4 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 24 Oct 2023 08:22:47 +0530 Subject: [PATCH 1/4] TEZ-3821: Ability to fail fast tasks that write too much to local disk. --- .../apache/tez/dag/api/TezConfiguration.java | 5 ++ .../org/apache/tez/runtime/RuntimeTask.java | 45 +++++++++++++++++ .../apache/tez/runtime/task/TaskReporter.java | 7 +++ .../tez/runtime/task/TestTaskReporter.java | 49 +++++++++++++++++++ 4 files changed, 106 insertions(+) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 9e2e2d89cf..a0dd294a24 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2304,4 +2304,9 @@ static Set getPropertySet() { public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms"; + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty(type = "long") + public static final String TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES = "tez.thread.dump.interval"; + public static final long TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES_DEFAULT = -1; + } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index a53d0d2e7e..7cf28ea767 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime; +import java.io.IOException; import java.util.Collection; import java.util.EnumSet; import java.util.Map; @@ -26,6 +27,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -35,6 +38,11 @@ import org.apache.tez.runtime.metrics.TaskCounterUpdater; import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES_DEFAULT; public abstract class RuntimeTask { @@ -54,6 +62,9 @@ public abstract class RuntimeTask { private final TaskStatistics statistics; private final AtomicBoolean progressNotified = new AtomicBoolean(false); + private final long lfsBytesWriteLimit; + private static final Logger LOG = LoggerFactory.getLogger(RuntimeTask.class); + protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf, TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) { this.taskSpec = taskSpec; @@ -71,6 +82,8 @@ protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf, } else { this.counterUpdater = null; } + this.lfsBytesWriteLimit = + tezConf.getLong(TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES, TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES_DEFAULT); } protected enum State { @@ -182,4 +195,36 @@ protected void setTaskDone() { protected final boolean isUpdatingSystemCounters() { return counterUpdater != null; } + + /** + * Check whether the task has exceeded any configured limits. + * + * @throws TaskLimitException in case the limit is exceeded. + */ + public void checkTaskLimits() throws TaskLimitException { + // check the limit for writing to local file system + if (lfsBytesWriteLimit >= 0) { + Long lfsBytesWritten = null; + try { + LocalFileSystem localFS = FileSystem.getLocal(tezConf); + lfsBytesWritten = FileSystem.getGlobalStorageStatistics().get(localFS.getScheme()).getLong("bytesWritten"); + } catch (IOException e) { + LOG.warn("Could not get LocalFileSystem bytesWritten counter"); + } + if (lfsBytesWritten != null && lfsBytesWritten > lfsBytesWriteLimit) { + throw new TaskLimitException( + "Too much write to local file system." + " current value is " + lfsBytesWritten + " the limit is " + + lfsBytesWriteLimit); + } + } + } + + /** + * Exception thrown when the task exceeds some configured limits. + */ + public class TaskLimitException extends IOException { + public TaskLimitException(String str) { + super(str); + } + } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 99d8bbca47..5ceaf3e2d6 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -44,6 +44,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.RuntimeTask.TaskLimitException; import org.apache.tez.runtime.api.*; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; @@ -262,6 +263,12 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t sendCounters = true; prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get(); } + try { + task.checkTaskLimits(); + } catch (TaskLimitException tle) { + askedToDie.set(true); + return new ResponseWrapper(true, 1); + } updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata); events.add(updateEvent); } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java index 147d17655b..5560433ee0 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.task; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -27,7 +28,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.File; +import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -36,12 +41,21 @@ import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.RuntimeTask.TaskLimitException; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; @@ -55,6 +69,9 @@ @SuppressWarnings("rawtypes") public class TestTaskReporter { + private static final File TEST_DIR = + new File(System.getProperty("test.build.data"), TestTaskReporter.class.getName()).getAbsoluteFile(); + @Test(timeout = 10000) public void testContinuousHeartbeatsOnMaxEvents() throws Exception { @@ -218,6 +235,38 @@ public void testStatusUpdateAfterInitializationAndCounterFlag() { } + @Test + public void testLocalFileSystemBytesWrittenLimit() throws IOException { + TaskSpec mockSpec = mock(TaskSpec.class); + when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class))); + when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class))); + TezConfiguration tezConf = new TezConfiguration(); + LogicalIOProcessorRuntimeTask lio1 = + new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null, + Runtime.getRuntime().maxMemory(), true, null, null); + + LocalFileSystem localFS = FileSystem.getLocal(tezConf); + FileSystem.clearStatistics(); + Path tmpPath = + new Path(TEST_DIR + "/testLocalFileSystemBytesWrittenLimit" + new Random(System.currentTimeMillis()).nextInt()); + try (FSDataOutputStream out = localFS.create(tmpPath, true)) { + out.write(new byte[1024]); + } + // Check limits with default shouldn't throw exception. + lio1.checkTaskLimits(); + + tezConf.setLong(TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES, 10); + lio1 = new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null, + Runtime.getRuntime().maxMemory(), true, null, null); + + try { + lio1.checkTaskLimits(); + Assert.fail("Expected to throw TaskLimitException"); + } catch (TaskLimitException taskLimitException) { + Assert.assertTrue(taskLimitException.getMessage().contains("Too much write to local file system")); + } + } + private List createEvents(int numEvents) { List list = Lists.newArrayListWithCapacity(numEvents); for (int i = 0; i < numEvents; i++) { From 96839f17285303b74183d88c08f179ef5ced8e36 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 24 Oct 2023 16:09:40 +0530 Subject: [PATCH 2/4] Fix FindBugs & Some Refactor. --- .../java/org/apache/tez/dag/api/TezConfiguration.java | 7 +++++-- .../src/main/java/org/apache/tez/runtime/RuntimeTask.java | 8 ++++---- .../java/org/apache/tez/runtime/task/TaskReporter.java | 1 + .../org/apache/tez/runtime/task/TestTaskReporter.java | 4 ++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index a0dd294a24..3dc6fe4745 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2304,9 +2304,12 @@ static Set getPropertySet() { public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms"; + /** + * Limits the amount of data that can be written to LocalFileSystem by a Task. + */ @ConfigurationScope(Scope.DAG) @ConfigurationProperty(type = "long") - public static final String TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES = "tez.thread.dump.interval"; - public static final long TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES_DEFAULT = -1; + public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes"; + public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1; } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 7cf28ea767..d78ba1954a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -41,8 +41,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES_DEFAULT; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT; public abstract class RuntimeTask { @@ -83,7 +83,7 @@ protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf, this.counterUpdater = null; } this.lfsBytesWriteLimit = - tezConf.getLong(TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES, TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES_DEFAULT); + tezConf.getLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT); } protected enum State { @@ -222,7 +222,7 @@ public void checkTaskLimits() throws TaskLimitException { /** * Exception thrown when the task exceeds some configured limits. */ - public class TaskLimitException extends IOException { + public static class TaskLimitException extends IOException { public TaskLimitException(String str) { super(str); } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 5ceaf3e2d6..80308dc36b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -266,6 +266,7 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t try { task.checkTaskLimits(); } catch (TaskLimitException tle) { + LOG.error("Task limit exceeded", tle); askedToDie.set(true); return new ResponseWrapper(true, 1); } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java index 5560433ee0..2013d6a893 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java @@ -18,7 +18,7 @@ package org.apache.tez.runtime.task; -import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -255,7 +255,7 @@ public void testLocalFileSystemBytesWrittenLimit() throws IOException { // Check limits with default shouldn't throw exception. lio1.checkTaskLimits(); - tezConf.setLong(TEZ_TASK_LOCAL_WRITE_LIMIT_BYTES, 10); + tezConf.setLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, 10); lio1 = new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null, Runtime.getRuntime().maxMemory(), true, null, null); From 84aa1015ce1750206f28d683a291497fc68683c3 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 25 Oct 2023 10:01:29 +0530 Subject: [PATCH 3/4] Add 10 Seconds Interval like HEAP_MEMORY_USAGE_UPDATE_INTERVAL --- .../org/apache/tez/runtime/task/TaskReporter.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 80308dc36b..5f9856b900 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -142,6 +142,8 @@ static class HeartbeatCallable implements Callable { private static final float LOG_COUNTER_BACKOFF = 1.3f; private static final int HEAP_MEMORY_USAGE_UPDATE_INTERVAL = 5000; // 5 seconds + private static final int LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL = 10000; // 10 seconds + private final RuntimeTask task; private final EventMetaData updateEventMetadata; @@ -166,6 +168,9 @@ static class HeartbeatCallable implements Callable { private long usedMemory = 0; private long heapMemoryUsageUpdatedTime = System.currentTimeMillis() - HEAP_MEMORY_USAGE_UPDATE_INTERVAL; + private long localFileSystemBytesWrittenCheckInterval = + System.currentTimeMillis() - LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL; + /* * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send / * log counters. @@ -264,7 +269,11 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get(); } try { - task.checkTaskLimits(); + long now = System.currentTimeMillis(); + if (now - localFileSystemBytesWrittenCheckInterval > LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL) { + task.checkTaskLimits(); + localFileSystemBytesWrittenCheckInterval = now; + } } catch (TaskLimitException tle) { LOG.error("Task limit exceeded", tle); askedToDie.set(true); From bb8643dd35f295e1abc1a55ba1c0aa0c6f9b3b86 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Fri, 27 Oct 2023 00:28:11 +0530 Subject: [PATCH 4/4] Change Exception Name. --- .../main/java/org/apache/tez/runtime/RuntimeTask.java | 10 +++++----- .../java/org/apache/tez/runtime/task/TaskReporter.java | 6 +++--- .../org/apache/tez/runtime/task/TestTaskReporter.java | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index d78ba1954a..4c44985eed 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -199,9 +199,9 @@ protected final boolean isUpdatingSystemCounters() { /** * Check whether the task has exceeded any configured limits. * - * @throws TaskLimitException in case the limit is exceeded. + * @throws LocalWriteLimitException in case the limit is exceeded. */ - public void checkTaskLimits() throws TaskLimitException { + public void checkTaskLimits() throws LocalWriteLimitException { // check the limit for writing to local file system if (lfsBytesWriteLimit >= 0) { Long lfsBytesWritten = null; @@ -212,7 +212,7 @@ public void checkTaskLimits() throws TaskLimitException { LOG.warn("Could not get LocalFileSystem bytesWritten counter"); } if (lfsBytesWritten != null && lfsBytesWritten > lfsBytesWriteLimit) { - throw new TaskLimitException( + throw new LocalWriteLimitException( "Too much write to local file system." + " current value is " + lfsBytesWritten + " the limit is " + lfsBytesWriteLimit); } @@ -222,8 +222,8 @@ public void checkTaskLimits() throws TaskLimitException { /** * Exception thrown when the task exceeds some configured limits. */ - public static class TaskLimitException extends IOException { - public TaskLimitException(String str) { + public static class LocalWriteLimitException extends IOException { + public LocalWriteLimitException(String str) { super(str); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 5f9856b900..5b1a9544b1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -44,7 +44,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.RuntimeTask; -import org.apache.tez.runtime.RuntimeTask.TaskLimitException; +import org.apache.tez.runtime.RuntimeTask.LocalWriteLimitException; import org.apache.tez.runtime.api.*; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; @@ -274,8 +274,8 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t task.checkTaskLimits(); localFileSystemBytesWrittenCheckInterval = now; } - } catch (TaskLimitException tle) { - LOG.error("Task limit exceeded", tle); + } catch (LocalWriteLimitException lwle) { + LOG.error("Local FileSystem write limit exceeded", lwle); askedToDie.set(true); return new ResponseWrapper(true, 1); } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java index 2013d6a893..7ecd74fb72 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java @@ -51,7 +51,7 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; -import org.apache.tez.runtime.RuntimeTask.TaskLimitException; +import org.apache.tez.runtime.RuntimeTask.LocalWriteLimitException; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; @@ -261,9 +261,9 @@ public void testLocalFileSystemBytesWrittenLimit() throws IOException { try { lio1.checkTaskLimits(); - Assert.fail("Expected to throw TaskLimitException"); - } catch (TaskLimitException taskLimitException) { - Assert.assertTrue(taskLimitException.getMessage().contains("Too much write to local file system")); + Assert.fail("Expected to throw LocalWriteLimitException"); + } catch (LocalWriteLimitException localWriteLimitException) { + Assert.assertTrue(localWriteLimitException.getMessage().contains("Too much write to local file system")); } }