Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.collection.mutable

import org.apache.commons.io.FileUtils
import org.apache.commons.io.output.TeeOutputStream
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.scalactic.TolerantNumerics
import org.scalatest.PrivateMethodTester

Expand Down Expand Up @@ -382,18 +381,16 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM

test("write jdbc") {
assume(IntegrationTestUtils.isSparkHiveJarAvailable)
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
val url = "jdbc:derby:memory:1234"
val table = "t1"
try {
spark.range(10).write.jdbc(url = s"$url;create=true", table, new Properties())
val result = spark.read.jdbc(url = url, table, new Properties()).collect()
assert(result.length == 10)
} finally {
// clean up
assertThrows[SparkException] {
spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect()
}
val url = "jdbc:derby:memory:1234"
Comment thread
HyukjinKwon marked this conversation as resolved.
Outdated
val table = "t1"
try {
spark.range(10).write.jdbc(url = s"$url;create=true", table, new Properties())
val result = spark.read.jdbc(url = url, table, new Properties()).collect()
assert(result.length == 10)
} finally {
// clean up
assertThrows[SparkException] {
spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong

import io.grpc.inprocess.InProcessChannelBuilder
import org.apache.arrow.memory.RootAllocator
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.commons.lang3.SystemUtils
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.connect.client.SparkConnectClient
Expand Down Expand Up @@ -146,13 +146,12 @@ class SQLImplicitsTestSuite extends ConnectFunSuite with BeforeAndAfterAll {
testImplicit(BigDecimal(decimal))
testImplicit(Date.valueOf(LocalDate.now()))
testImplicit(LocalDate.now())
// SPARK-42770: Run `LocalDateTime.now()` and `Instant.now()` with Java 8 & 11 always
// get microseconds on both Linux and MacOS, but there are some differences when
// using Java 17, it will get accurate nanoseconds on Linux, but still get the microseconds
// on MacOS. At present, Spark always converts them to microseconds, this will cause the
// SPARK-42770: `LocalDateTime.now()` and `Instant.now()` it will get accurate
// nanoseconds on Linux, but get the microseconds on MacOS. At present,
// Spark always converts them to microseconds, this will cause the
// test fail when using Java 17 on Linux, so add `truncatedTo(ChronoUnit.MICROS)` when
// testing on Linux using Java 17 to ensure the accuracy of input data is microseconds.
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_17) && SystemUtils.IS_OS_LINUX) {
if (SystemUtils.IS_OS_LINUX) {
testImplicit(LocalDateTime.now().truncatedTo(ChronoUnit.MICROS))
testImplicit(Instant.now().truncatedTo(ChronoUnit.MICROS))
testImplicit(Timestamp.from(Instant.now().truncatedTo(ChronoUnit.MICROS)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.internal.config
import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.commons.lang3.{JavaVersion, SystemUtils}

import org.apache.spark.network.util.ByteUnit

private[spark] object UI {
Expand Down Expand Up @@ -102,7 +100,7 @@ private[spark] object UI {
val UI_HEAP_HISTOGRAM_ENABLED = ConfigBuilder("spark.ui.heapHistogramEnabled")
.version("3.5.0")
.booleanConf
.createWithDefault(SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11))
.createWithDefault(true)
Comment thread
HyukjinKwon marked this conversation as resolved.
Outdated

val UI_PROMETHEUS_ENABLED = ConfigBuilder("spark.ui.prometheus.enabled")
.internal()
Expand Down
32 changes: 8 additions & 24 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.Map
import scala.collection.mutable

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import sun.misc.Unsafe
import sun.nio.ch.DirectBuffer

Expand Down Expand Up @@ -198,29 +197,14 @@ private[spark] class StorageStatus(
/** Helper methods for storage-related objects. */
private[spark] object StorageUtils extends Logging {

// In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible
// to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to
// jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with
// reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is
// still accessible with reflection.
private val bufferCleaner: DirectBuffer => Unit =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
val cleanerMethod =
Utils.classForName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer])
val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe")
unsafeField.setAccessible(true)
val unsafe = unsafeField.get(null).asInstanceOf[Unsafe]
buffer: DirectBuffer => cleanerMethod.invoke(unsafe, buffer)
} else {
val cleanerMethod = Utils.classForName("sun.misc.Cleaner").getMethod("clean")
buffer: DirectBuffer => {
// Careful to avoid the return type of .cleaner(), which changes with JDK
val cleaner: AnyRef = buffer.cleaner()
if (cleaner != null) {
cleanerMethod.invoke(cleaner)
}
}
}
private val bufferCleaner: DirectBuffer => Unit = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this function, it should be possible to further refactor to avoid using un-exported APIs and internal APIs, but the current changes in this pr are ok.

val cleanerMethod =
Utils.classForName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer])
val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe")
unsafeField.setAccessible(true)
val unsafe = unsafeField.get(null).asInstanceOf[Unsafe]
buffer: DirectBuffer => cleanerMethod.invoke(unsafe, buffer)
}

/**
* Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.lang.reflect.{Field, Modifier}
import scala.collection.JavaConverters._
import scala.collection.mutable.{Map, Set, Stack}

import org.apache.commons.lang3.{ClassUtils, JavaVersion, SystemUtils}
import org.apache.commons.lang3.ClassUtils
import org.apache.xbean.asm9.{ClassReader, ClassVisitor, Handle, MethodVisitor, Type}
import org.apache.xbean.asm9.Opcodes._
import org.apache.xbean.asm9.tree.{ClassNode, MethodNode}
Expand Down Expand Up @@ -421,8 +421,7 @@ private[spark] object ClosureCleaner extends Logging {
* This method is used to get the final modifier field when on Java 17.
*/
private def getFinalModifiersFieldForJava17(field: Field): Option[Field] = {
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_17) &&
Modifier.isFinal(field.getModifiers)) {
if (Modifier.isFinal(field.getModifiers)) {
val methodGetDeclaredFields0 = classOf[Class[_]]
.getDeclaredMethod("getDeclaredFields0", classOf[Boolean])
methodGetDeclaredFields0.setAccessible(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, V1WritesUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.client.hive._


/**
Expand Down Expand Up @@ -235,28 +234,6 @@ case class InsertIntoHiveTable(
.unwrapped.asInstanceOf[HiveExternalCatalog]
.client
.version
// SPARK-31684:
Comment thread
HyukjinKwon marked this conversation as resolved.
// For Hive 2.0.0 and onwards, as https://issues.apache.org/jira/browse/HIVE-11940
// has been fixed, and there is no performance issue anymore. We should leave the
// overwrite logic to hive to avoid failure in `FileSystem#checkPath` when the table
// and partition locations do not belong to the same `FileSystem`
// TODO(SPARK-31675): For Hive 2.2.0 and earlier, if the table and partition locations
// do not belong together, we will still get the same error thrown by hive encryption
// check. see https://issues.apache.org/jira/browse/HIVE-14380.
// So we still disable for Hive overwrite for Hive 1.x for better performance because
// the partition and table are on the same cluster in most cases.
if (partitionPath.nonEmpty && overwrite && hiveVersion < v2_0) {
partitionPath.foreach { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.exists(path)) {
if (!fs.delete(path, true)) {
throw QueryExecutionErrors.cannotRemovePartitionDirError(path)
}
// Don't let Hive do overwrite operation since it is slower.
doHiveOverwrite = false
}
}
}

// inheritTableSpecs is set to true. It should be set to false for an IMPORT query
// which is currently considered as a Hive native command.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
private val sparkTestingDir = Option(System.getProperty(SPARK_TEST_CACHE_DIR_SYSTEM_PROPERTY))
.map(new File(_)).getOrElse(Utils.createTempDir(namePrefix = "test-spark"))
private val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val hiveVersion = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
HiveUtils.builtinHiveVersion
} else {
"1.2.1"
}
val hiveVersion = HiveUtils.builtinHiveVersion

override def afterAll(): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.{BufferedWriter, File, FileWriter}

import scala.util.Properties

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.FileUtils
import org.scalatest.Assertions._
Expand Down Expand Up @@ -141,8 +140,7 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}

test("SPARK-8020: set sql conf in spark conf") {
assume(!SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9))
ignore("SPARK-8020: set sql conf in spark conf") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be a chance for these ignored cases to be restarted?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it for now. I think this test does not target to test 0.12 (although the metastore version is set like that).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we change the hive version in these tests?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this and and the last one, we could. But the others don't set the Hive version apparently. I will just take a look separately.

val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SparkSQLConfTest.getClass.getName.stripSuffix("$"),
Expand Down Expand Up @@ -180,8 +178,7 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}

test("SPARK-9757 Persist Parquet relation with decimal column") {
assume(!SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9))
ignore("SPARK-9757 Persist Parquet relation with decimal column") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SPARK_9757.getClass.getName.stripSuffix("$"),
Expand Down Expand Up @@ -277,8 +274,7 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}

test("SPARK-16901: set javax.jdo.option.ConnectionURL") {
assume(!SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9))
ignore("SPARK-16901: set javax.jdo.option.ConnectionURL") {
// In this test, we set javax.jdo.option.ConnectionURL and set metastore version to
// 0.13. This test will make sure that javax.jdo.option.ConnectionURL will not be
// overridden by hive's default settings when we create a HiveConf object inside
Expand Down Expand Up @@ -359,9 +355,8 @@ class HiveSparkSubmitSuite
runSparkSubmit(argsForShowTables)
}

test("SPARK-34772: RebaseDateTime loadRebaseRecords should use Spark classloader " +
ignore("SPARK-34772: RebaseDateTime loadRebaseRecords should use Spark classloader " +
"instead of context") {
assume(!SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9))
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)

// We need to specify the metastore database location in case of conflict with other hive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client
import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
import java.net.URI

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
Expand Down Expand Up @@ -663,14 +662,12 @@ class HiveClientSuite(version: String, allVersions: Seq[String])

test("sql read hive materialized view") {
// HIVE-14249 Since Hive 2.3.0, materialized view is supported.
if (version == "2.3" || version == "3.0" || version == "3.1") {
// Since Hive 3.0(HIVE-19383), we can not run local MR by `client.runSqlHive` with JDK 11.
assume(version == "2.3" || !SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9))
// Since Hive 3.0(HIVE-19383), we can not run local MR by `client.runSqlHive` with JDK 11.
if (version == "2.3") {
// Since HIVE-18394(Hive 3.1), "Create Materialized View" should default to rewritable ones
val disableRewrite = if (version == "2.3" || version == "3.0") "" else "DISABLE REWRITE"
client.runSqlHive("CREATE TABLE materialized_view_tbl (c1 INT)")
client.runSqlHive(
s"CREATE MATERIALIZED VIEW mv1 $disableRewrite AS SELECT * FROM materialized_view_tbl")
s"CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM materialized_view_tbl")
checkError(
exception = intercept[AnalysisException] {
versionSpark.table("mv1").collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.sql.Timestamp

import scala.util.Try

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter

Expand Down Expand Up @@ -1640,12 +1639,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
test("udf_radians") {
withSQLConf("hive.fetch.task.conversion" -> "more") {
val result = sql("select radians(57.2958) FROM src tablesample (1 rows)").collect()
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
assertResult(Array(Row(1.0000003575641672))) (result)
} else {
assertResult(Array(Row(1.000000357564167))) (result)
}

assertResult(Array(Row(1.0000003575641672))) (result)
assertResult(Array(Row(2.4999991485811655))) {
sql("select radians(143.2394) FROM src tablesample (1 rows)").collect()
}
Expand Down