Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.messaging.netty;

import org.apache.storm.utils.Utils;
Expand All @@ -33,10 +34,9 @@ public void uncaughtException(Thread t, Throwable e) {

try {
Utils.handleUncaughtException(e);
} catch (Throwable throwable) {
LOG.error("Exception thrown while handling uncaught exception " + throwable.getCause());
} catch (Error error) {
LOG.error("Exception thrown while handling uncaught exception " + error.getCause());
Runtime.getRuntime().exit(1);
}
LOG.info("Received error in netty thread.. terminating server...");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a thread hits the exception handler will it still die? If so who is responsible for replacing it? As far as I can tell this handler is used by a HashedWheelTimer in Context, and it doesn't look to me like that class will replace a dead thread.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the HashedWheelTimer will not be replaced - I'll take a second pass at this to confirm

Runtime.getRuntime().exit(1);
}
}
46 changes: 30 additions & 16 deletions storm-client/src/jvm/org/apache/storm/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,29 +534,43 @@ public static boolean isZkAuthenticationConfiguredTopology(Map<String, Object> c
&& !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
}

public static void handleUncaughtException(Throwable t) {
handleUncaughtException(t, defaultAllowedExceptions);
public static void handleUncaughtExceptionWithoutKillingProcess(Throwable t) {
handleUncaughtException(t, defaultAllowedExceptions, true);
}

public static void handleUncaughtException(Throwable t, Set<Class> allowedExceptions) {
handleUncaughtException(t, allowedExceptions, false);
}

public static void handleUncaughtException(Throwable t) {
handleUncaughtException(t, defaultAllowedExceptions, false);
}

public static void handleUncaughtException(Throwable t, Set<Class> allowedExceptions, Boolean alwaysSwallow) {
if (t != null) {
if (t instanceof OutOfMemoryError) {
try {
System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
} catch (Throwable err) {
//Again we don't want to exit because of logging issues.
if (t instanceof Error) {
if (t instanceof OutOfMemoryError) {
try {
System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
} catch (Throwable err) {
//Again we don't want to exit because of logging issues.
}
Runtime.getRuntime().halt(-1);
} else {
LOG.info("Bubble up the Error {} {}", t.getClass(), t);
throw new Error(t);
}
Runtime.getRuntime().halt(-1);
}
}

if(allowedExceptions.contains(t.getClass())) {
LOG.info("Swallowing {} {}", t.getClass(), t);
return;
}
if(alwaysSwallow || allowedExceptions.contains(t.getClass())) {
LOG.info("Swallowing {} {}", t.getClass(), t);
return;
}

//Running in daemon mode, we would pass Error to calling thread.
throw new Error(t);
//Running in daemon mode, we would pass Error to calling thread.
LOG.info("Bubble up the Error {} {}", t.getClass(), t);
throw new Error(t);
}
}

public static byte[] thriftSerialize(TBase t) {
Expand Down Expand Up @@ -865,7 +879,7 @@ public static void setupDefaultUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread thread, Throwable thrown) {
try {
handleUncaughtException(thrown);
handleUncaughtExceptionWithoutKillingProcess(thrown);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will swallow all the Throwables including Errors excluding OOME. Is it intended? We may still want to make it crash when there's Error thrown from somewhere, given that it normally indicates unrecoverable state.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question, if a thread gets an exception and the process is not shut down, will the thread be replaced?

} catch (Error err) {
LOG.error("Received error in main thread.. terminating server...", err);
Runtime.getRuntime().exit(-2);
Expand Down