[SPARK-43033][SQL] Avoid task retries due to AssertNotNull checks#40707
[SPARK-43033][SQL] Avoid task retries due to AssertNotNull checks#40707xiaochen-zhou wants to merge 34 commits into
Conversation
| messageParameters = Map( | ||
| "field" -> errMsg | ||
| ), | ||
| cause = new NullPointerException) |
There was a problem hiding this comment.
I have not followed the error classes changes much - but this is counter intuitive - why are we not passing the actual exception here ? Instead of creating a dummy exception ?
There was a problem hiding this comment.
I have not followed the error classes changes much - but this is counter intuitive - why are we not passing the actual exception here ? Instead of creating a dummy exception ?
Thank you very much for your review, I have modified the code, can you re-review the code when you are free, and make some comments.
|
cc @Ngone51 @jiangxb1987 too FYI |
| "User exception: <msg>" | ||
| ] | ||
| }, | ||
| "_LEGACY_ERROR_TEMP_3044" : { |
There was a problem hiding this comment.
Shall we define a proper error class? There is NOT_NULL_CONSTRAINT_VIOLATION that seems relevant. It is currently used to validate NOT NULL constraints for array elements and map values.
| info.id, taskSet.id, tid, ef.description)) | ||
| return | ||
| } | ||
| if (ef.className == classOf[SparkUserException].getName) { |
There was a problem hiding this comment.
I wonder whether this logic can be generic and apply to all exceptions that extend SparkThrowable and have an error class defined?
There was a problem hiding this comment.
+1, we should skip retry if the exception is SparkThrowable and the error class is present.
There was a problem hiding this comment.
One thing we should think about is how to differentiate user-facing error and user-triggered error. We may still need to retry for user-facing error, e.g. file read error which can be transient.
One idea is to have a special prefix for error classes that should still trigger retry, such as file read error and OOM, which shouldn't be many.
There was a problem hiding this comment.
Or we can create a base trait for transient errors.
There was a problem hiding this comment.
Thank you very much for review and sorry for the late response. I will try to modify the code according to your suggestions @aokolnychyi @cloud-fan
| /** | ||
| * User error exception thrown from Spark with an error class. | ||
| */ | ||
| private[spark] class SparkUserException( |
There was a problem hiding this comment.
Question: Are we sure a custom exception is needed for this case? Is there any existing exception we can reuse with NPE as cause?
If we want to have a brand new exception, what about SparkNotNullConstraintViolationException to be more specific? I guess it will depend whether we want to skip retries only for this exception type as opposed to all Spark exceptions with known error codes.
There was a problem hiding this comment.
It's too complicated to use both error class and exception type to differentiate errors. I think in principle we should always use SparkException with different error classes, except for some places that need to be compatible with old code.
There was a problem hiding this comment.
It's too complicated to use both error class and exception type to differentiate errors. I think in principle we should always use
SparkExceptionwith different error classes, except for some places that need to be compatible with old code.
Thank you very much for review, I try to change the code according to this idea.
There was a problem hiding this comment.
Question: Are we sure a custom exception is needed for this case? Is there any existing exception we can reuse with NPE as cause?
If we want to have a brand new exception, what about
SparkNotNullConstraintViolationExceptionto be more specific? I guess it will depend whether we want to skip retries only for this exception type as opposed to all Spark exceptions with known error codes.
Thank you very much for review, My understanding is that we want to skip retry logic of user-triggered error, not only NPE, So I defined a new exception SparkUserException.
There was a problem hiding this comment.
There can be thousands of user-triggered errors while the transient errors are likely to be less than 10. I think it's better to define a new exception for transient errors.
There was a problem hiding this comment.
There can be thousands of user-triggered errors while the transient errors are likely to be less than 10. I think it's better to define a new exception for transient errors.
I see, thank you. I try to change the code according to this idea.
There was a problem hiding this comment.
There can be thousands of user-triggered errors while the transient errors are likely to be less than 10. I think it's better to define a new exception for transient errors.
During the implementation process, I think that if the idea of define a new exception is adopted, the exception type of error_class may be changed, such as _UNABLE_TO_ACQUIRE_MEMORY and its exception type may be changed from SparkOutOfMemoryError to SparkTransientError, but we need to use SparkOutOfMemoryError in many places. (SparkOutOfMemoryError cannot extend SparkTransientError)
} catch (SparkOutOfMemoryError e) {
// should have trigger spilling
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.error("Unable to grow the pointer array");
throw e;
}So I think having a special prefix may be a more good idea. I don't know if my idea is right, hope you leave some comments in your free time. @cloud-fan
|
According to the two ideas provided by @cloud-fan on how to differentiate user-facing errors and user-triggered errors ([have a special prefix] or [create a base trait for transient errors]), in the implementation process, I think having a special prefix may be a more good idea. def isInternalError(errorClass: String): Boolean = {
errorClass == "INTERNAL_ERROR"
}
def isTransientError(errorClass: String): Boolean = {
errorClass.startsWith("TRANSIENT")
}can you re-review the code when you are free, and make some comments. @cloud-fan @aokolnychyi |
| "sqlState" : "22003" | ||
| }, | ||
| "INVALID_BUCKET_FILE" : { | ||
| "TRANSIENT_INVALID_BUCKET_FILE" : { |
There was a problem hiding this comment.
Optional: Instead of changing the error class name (since that error class is shown in the error message), I'd consider adding a field to indicate whether the error is transient (i.e. should be retried), similar to sqlCode we have today. We would need more feedback from folks who worked on the error framework.
There was a problem hiding this comment.
Optional: Instead of changing the error class name (since that error class is shown in the error message), I'd consider adding a field to indicate whether the error is transient (i.e. should be retried), similar to
sqlCodewe have today. We would need more feedback from folks who worked on the error framework.
I am now changing the code based on this idea, and the next step may be to seek help from people who worked on the error framework.
|
OK, I'm on the fence now. On one hand, the number of transient errors should be much smaller than the number of user-triggered errors, so it's better to find out these transient errors and mark them. On the other hand, not retrying the task can be a regression that leads to job failure, so we should make sure we only skip task retry when the error is definitely user-triggered. To be conservative, now I'm leaning towards picking some errors and marking them as "can skip task retry". I like the idea from @aokolnychyi that we can add a JSON field for it. |
|
I would be on the conservative side and skip retry when we are absolutely certain that a retry will not help. |
|
+1 for being safe |
I'm trying to change the code now |
I see, I'm trying to change the code now |
…c-SPARK-43033 # Conflicts: # project/MimaExcludes.scala
|
I have modified the code, can you re-review the code when you are free, and make some comments. @cloud-fan @aokolnychyi @mridulm |
|
According to the suggestions provided by @cloud-fan @aokolnychyi .I modified the code.
"CANNOT_PARSE_DECIMAL" : {
"message" : [
"Cannot parse decimal."
],
"sqlState" : "22018",
"isTransient" : false
},When these errors occur, the retry logic is skipped. if (!ef.isTransient) {
// if the exception has an error class which means a non-transient error, not retry
logError(s"$task has a non-transient exception: ${ef.description}; not retrying")
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, metricPeaks, info)
abort(s"$task has a non-transient exception: ${ef.description}", ef.exception)
return
}hope you leave some comments in your free time. @cloud-fan @aokolnychyi @mridulm, thanks a lot. |
|
useful feature, any updates here? |
I modified the code according to the suggestions provided by @cloud-fan @aokolnychyi @mridulm , next step may be to seek help from people who worked on the error framework. Can you give some suggestions on the next work? |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This PR update the task retry logic to not retry if the exception has an error class which means a user error.
Why are the changes needed?
As discussed #40655 (comment), tasks that failed because of exceptions generated by AssertNotNull should not be retried.
Does this PR introduce any user-facing change?
No
How was this patch tested?
This PR comes with tests.