From a933093f194e6457151bca6ef0ea1e77a90f671f Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 12 Sep 2019 16:32:43 +0200 Subject: [PATCH 1/8] [SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer --- .../sql/kafka010/CachedKafkaProducer.scala | 114 +++++----- .../kafka010/InternalKafkaConnectorPool.scala | 203 +++++++++++++++++ .../kafka010/InternalKafkaConsumerPool.scala | 210 +++--------------- .../kafka010/InternalKafkaProducerPool.scala | 72 ++++++ .../sql/kafka010/KafkaDataConsumer.scala | 6 +- .../spark/sql/kafka010/KafkaDataWriter.scala | 19 +- .../spark/sql/kafka010/KafkaWriteTask.scala | 20 +- .../apache/spark/sql/kafka010/package.scala | 34 ++- .../kafka010/CachedKafkaProducerSuite.scala | 162 ++++++++++---- ... => InternalKafkaConnectorPoolSuite.scala} | 9 +- .../sql/kafka010/KafkaDataConsumerSuite.scala | 6 +- .../apache/spark/sql/kafka010/KafkaTest.scala | 10 +- 12 files changed, 551 insertions(+), 314 deletions(-) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala rename external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/{InternalKafkaConsumerPoolSuite.scala => InternalKafkaConnectorPoolSuite.scala} (96%) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index fc177cdc9037e..6958cf126d1cf 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -18,60 +18,70 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit} +import java.io.Closeable +import java.util.concurrent.ExecutionException -import com.google.common.cache._ -import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} -import org.apache.kafka.clients.producer.KafkaProducer import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} +import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord} + import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil} +import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._ +import org.apache.spark.util.ShutdownHookManager -private[kafka010] object CachedKafkaProducer extends Logging { +private[kafka010] class CachedKafkaProducer(val kafkaParams: ju.Map[String, Object]) + extends Closeable with Logging { private type Producer = KafkaProducer[Array[Byte], Array[Byte]] - private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10) - - private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get) - .map(_.conf.get(PRODUCER_CACHE_TIMEOUT)) - .getOrElse(defaultCacheExpireTimeout) + private val producer = createProducer() - private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { - override def load(config: Seq[(String, Object)]): Producer = { - createKafkaProducer(config) + private def createProducer(): Producer = { + val producer: Producer = new Producer(kafkaParams) + if (log.isDebugEnabled()) { + val redactedParamsSeq = KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams)) + logDebug(s"Created a new instance of kafka producer for $redactedParamsSeq.") } + producer } - private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() { - override def onRemoval( - notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = { - val paramsSeq: Seq[(String, Object)] = notification.getKey - val producer: Producer = notification.getValue - if (log.isDebugEnabled()) { - val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) - logDebug(s"Evicting kafka producer $producer params: $redactedParamsSeq, " + - s"due to ${notification.getCause}") + override def close(): Unit = { + try { + if (log.isInfoEnabled()) { + val redactedParamsSeq = KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams)) + logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.") } - close(paramsSeq, producer) + producer.close() + } catch { + case NonFatal(e) => logWarning("Error while closing kafka producer.", e) } } - private lazy val guavaCache: LoadingCache[Seq[(String, Object)], Producer] = - CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS) - .removalListener(removalListener) - .build[Seq[(String, Object)], Producer](cacheLoader) + def send(record: ProducerRecord[Array[Byte], Array[Byte]], callback: Callback): Unit = { + producer.send(record, callback) + } - private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = { - val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava) - if (log.isDebugEnabled()) { - val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) - logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.") + def flush(): Unit = { + producer.flush() + } +} + +private[kafka010] object CachedKafkaProducer extends Logging { + + private val sparkConf = SparkEnv.get.conf + private val producerPool = new InternalKafkaProducerPool(sparkConf) + + ShutdownHookManager.addShutdownHook { () => + try { + producerPool.close() + } catch { + case e: Throwable => + logWarning("Ignoring exception while shutting down pool from shutdown hook", e) } - kafkaProducer } /** @@ -79,14 +89,14 @@ private[kafka010] object CachedKafkaProducer extends Logging { * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep * one instance per specified kafkaParams. */ - private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = { - val updatedKafkaProducerConfiguration = + def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = { + val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) .setAuthenticationConfigIfNeeded() .build() - val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedKafkaProducerConfiguration) + val key = toCacheKey(updatedKafkaParams) try { - guavaCache.get(paramsSeq) + producerPool.borrowObject(key, updatedKafkaParams) } catch { case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) if e.getCause != null => @@ -94,35 +104,11 @@ private[kafka010] object CachedKafkaProducer extends Logging { } } - private def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = { - val paramsSeq: Seq[(String, Object)] = kafkaParams.asScala.toSeq.sortBy(x => x._1) - paramsSeq - } - - /** For explicitly closing kafka producer */ - private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = { - val paramsSeq = paramsToSeq(kafkaParams) - guavaCache.invalidate(paramsSeq) - } - - /** Auto close on cache evict */ - private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = { - try { - if (log.isInfoEnabled()) { - val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) - logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.") - } - producer.close() - } catch { - case NonFatal(e) => logWarning("Error while closing kafka producer.", e) - } + def release(producer: CachedKafkaProducer): Unit = { + producerPool.returnObject(producer) } private[kafka010] def clear(): Unit = { - logInfo("Cleaning up guava cache.") - guavaCache.invalidateAll() + producerPool.reset() } - - // Intended for testing purpose only. - private def getAsMap: ConcurrentMap[Seq[(String, Object)], Producer] = guavaCache.asMap() } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala new file mode 100644 index 0000000000000..51e629f85c2bb --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} +import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} + +import org.apache.spark.internal.Logging + +/** + * Provides object pool for objects which is grouped by a key. + * + * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on + * the class, and same contract applies: after using the borrowed object, you must either call + * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object + * should be destroyed. + * + * The soft capacity of pool is determined by "poolConfig.capacity" config value, + * and the pool will have reasonable default value if the value is not provided. + * (The instance will do its best effort to respect soft capacity but it can exceed when there's + * a borrowing request and there's neither free space nor idle object to clear.) + * + * This class guarantees that no caller will get pooled object once the object is borrowed and + * not yet returned, hence provide thread-safety usage of non-thread-safe objects unless caller + * shares the object to multiple threads. + */ +private[kafka010] abstract class InternalKafkaConnectorPool[K, V]( + objectFactory: ObjectFactory[K, V], + poolConfig: PoolConfig[V], + swallowedExceptionListener: SwallowedExceptionListener) extends Logging { + + // the class is intended to have only soft capacity + assert(poolConfig.getMaxTotal < 0) + + private val pool = { + val internalPool = new GenericKeyedObjectPool[K, V](objectFactory, poolConfig) + internalPool.setSwallowedExceptionListener(swallowedExceptionListener) + internalPool + } + + /** + * Borrows object from the pool. If there's no idle object for the key, + * the pool will create the object. + * + * If the pool doesn't have idle object for the key and also exceeds the soft capacity, + * pool will try to clear some of idle objects. + * + * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise + * the object will be kept in pool as active object. + */ + def borrowObject(key: K, kafkaParams: ju.Map[String, Object]): V = { + updateKafkaParamForKey(key, kafkaParams) + + if (size >= poolConfig.softMaxSize) { + logWarning("Pool exceeds its soft max size, cleaning up idle objects...") + pool.clearOldest() + } + + pool.borrowObject(key) + } + + /** Returns borrowed object to the pool. */ + def returnObject(connector: V): Unit = { + pool.returnObject(createKey(connector), connector) + } + + /** Invalidates (destroy) borrowed object to the pool. */ + def invalidateObject(connector: V): Unit = { + pool.invalidateObject(createKey(connector), connector) + } + + /** Invalidates all idle values for the key */ + def invalidateKey(key: K): Unit = { + pool.clear(key) + } + + /** + * Closes the keyed object pool. Once the pool is closed, + * borrowObject will fail with [[IllegalStateException]], but returnObject and invalidateObject + * will continue to work, with returned objects destroyed on return. + * + * Also destroys idle instances in the pool. + */ + def close(): Unit = { + pool.close() + } + + def reset(): Unit = { + // this is the best-effort of clearing up. otherwise we should close the pool and create again + // but we don't want to make it "var" only because of tests. + pool.clear() + } + + def numIdle: Int = pool.getNumIdle + + def numIdle(key: K): Int = pool.getNumIdle(key) + + def numActive: Int = pool.getNumActive + + def numActive(key: K): Int = pool.getNumActive(key) + + def size: Int = numIdle + numActive + + def size(key: K): Int = numIdle(key) + numActive(key) + + private def updateKafkaParamForKey(key: K, kafkaParams: ju.Map[String, Object]): Unit = { + // We can assume that kafkaParam should not be different for same cache key, + // otherwise we can't reuse the cached object and cache key should contain kafkaParam. + // So it should be safe to put the key/value pair only when the key doesn't exist. + val oldKafkaParams = objectFactory.keyToKafkaParams.putIfAbsent(key, kafkaParams) + require(oldKafkaParams == null || kafkaParams == oldKafkaParams, "Kafka parameters for same " + + s"cache key should be equal. old parameters: $oldKafkaParams new parameters: $kafkaParams") + } + + protected def createKey(connector: V): K +} + +private[kafka010] abstract class PoolConfig[V] extends GenericKeyedObjectPoolConfig[V] { + + init() + + def softMaxSize: Int + + def jmxEnabled: Boolean + + def minEvictableIdleTimeMillis: Long + + def evictorThreadRunIntervalMillis: Long + + def jmxNamePrefix: String + + def init(): Unit = { + // NOTE: Below lines define the behavior, so do not modify unless you know what you are + // doing, and update the class doc accordingly if necessary when you modify. + + // 1. Set min idle objects per key to 0 to avoid creating unnecessary object. + // 2. Set max idle objects per key to 3 but set total objects per key to infinite + // which ensures borrowing per key is not restricted. + // 3. Set max total objects to infinite which ensures all objects are managed in this pool. + setMinIdlePerKey(0) + setMaxIdlePerKey(3) + setMaxTotalPerKey(-1) + setMaxTotal(-1) + + // Set minimum evictable idle time which will be referred from evictor thread + setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis) + setSoftMinEvictableIdleTimeMillis(-1) + + // evictor thread will run test with ten idle objects + setTimeBetweenEvictionRunsMillis(evictorThreadRunIntervalMillis) + setNumTestsPerEvictionRun(10) + setEvictionPolicy(new DefaultEvictionPolicy[V]()) + + // Immediately fail on exhausted pool while borrowing + setBlockWhenExhausted(false) + + setJmxEnabled(jmxEnabled) + setJmxNamePrefix(jmxNamePrefix) + } +} + +private[kafka010] abstract class ObjectFactory[K, V] extends BaseKeyedPooledObjectFactory[K, V] { + val keyToKafkaParams = new ConcurrentHashMap[K, ju.Map[String, Object]]() + + override def create(key: K): V = { + Option(keyToKafkaParams.get(key)) match { + case Some(kafkaParams) => createValue(key, kafkaParams) + case None => throw new IllegalStateException("Kafka params should be set before " + + "borrowing object.") + } + } + + override def wrap(value: V): PooledObject[V] = { + new DefaultPooledObject[V](value) + } + + protected def createValue(key: K, kafkaParams: ju.Map[String, Object]): V +} + +private[kafka010] class CustomSwallowedExceptionListener(connectorType: String) + extends SwallowedExceptionListener with Logging { + override def onSwallowException(e: Exception): Unit = { + logError(s"Error closing Kafka $connectorType", e) + } +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala index 276a942742b8e..0831869134c24 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala @@ -18,204 +18,50 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.ConcurrentHashMap -import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} -import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} +import org.apache.commons.pool2.PooledObject import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey -/** - * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. - * - * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on - * the class, and same contract applies: after using the borrowed object, you must either call - * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object - * should be destroyed. - * - * The soft capacity of pool is determined by "spark.kafka.consumer.cache.capacity" config value, - * and the pool will have reasonable default value if the value is not provided. - * (The instance will do its best effort to respect soft capacity but it can exceed when there's - * a borrowing request and there's neither free space nor idle object to clear.) - * - * This class guarantees that no caller will get pooled object once the object is borrowed and - * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] - * unless caller shares the object to multiple threads. - */ +// TODO: revisit the relation between CacheKey and kafkaParams - for now it looks a bit weird +// as we force all consumers having same (groupId, topicPartition) to have same kafkaParams +// which might be viable in performance perspective (kafkaParams might be too huge to use +// as a part of key), but there might be the case kafkaParams could be different - +// cache key should be differentiated for both kafkaParams. private[kafka010] class InternalKafkaConsumerPool( - objectFactory: ObjectFactory, - poolConfig: PoolConfig) extends Logging { + objectFactory: ConsumerObjectFactory, + poolConfig: ConsumerPoolConfig) + extends InternalKafkaConnectorPool[CacheKey, InternalKafkaConsumer]( + objectFactory, + poolConfig, + new CustomSwallowedExceptionListener("consumer")) { def this(conf: SparkConf) = { - this(new ObjectFactory, new PoolConfig(conf)) - } - - // the class is intended to have only soft capacity - assert(poolConfig.getMaxTotal < 0) - - private val pool = { - val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer]( - objectFactory, poolConfig) - internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener) - internalPool - } - - /** - * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key, - * the pool will create the [[InternalKafkaConsumer]] object. - * - * If the pool doesn't have idle object for the key and also exceeds the soft capacity, - * pool will try to clear some of idle objects. - * - * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise - * the object will be kept in pool as active object. - */ - def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { - updateKafkaParamForKey(key, kafkaParams) - - if (size >= poolConfig.softMaxSize) { - logWarning("Pool exceeds its soft max size, cleaning up idle objects...") - pool.clearOldest() - } - - pool.borrowObject(key) + this(new ConsumerObjectFactory, new ConsumerPoolConfig(conf)) } - /** Returns borrowed object to the pool. */ - def returnObject(consumer: InternalKafkaConsumer): Unit = { - pool.returnObject(extractCacheKey(consumer), consumer) - } - - /** Invalidates (destroy) borrowed object to the pool. */ - def invalidateObject(consumer: InternalKafkaConsumer): Unit = { - pool.invalidateObject(extractCacheKey(consumer), consumer) - } - - /** Invalidates all idle consumers for the key */ - def invalidateKey(key: CacheKey): Unit = { - pool.clear(key) - } - - /** - * Closes the keyed object pool. Once the pool is closed, - * borrowObject will fail with [[IllegalStateException]], but returnObject and invalidateObject - * will continue to work, with returned objects destroyed on return. - * - * Also destroys idle instances in the pool. - */ - def close(): Unit = { - pool.close() - } - - def reset(): Unit = { - // this is the best-effort of clearing up. otherwise we should close the pool and create again - // but we don't want to make it "var" only because of tests. - pool.clear() - } - - def numIdle: Int = pool.getNumIdle - - def numIdle(key: CacheKey): Int = pool.getNumIdle(key) - - def numActive: Int = pool.getNumActive - - def numActive(key: CacheKey): Int = pool.getNumActive(key) - - def size: Int = numIdle + numActive - - def size(key: CacheKey): Int = numIdle(key) + numActive(key) - - // TODO: revisit the relation between CacheKey and kafkaParams - for now it looks a bit weird - // as we force all consumers having same (groupId, topicPartition) to have same kafkaParams - // which might be viable in performance perspective (kafkaParams might be too huge to use - // as a part of key), but there might be the case kafkaParams could be different - - // cache key should be differentiated for both kafkaParams. - private def updateKafkaParamForKey(key: CacheKey, kafkaParams: ju.Map[String, Object]): Unit = { - // We can assume that kafkaParam should not be different for same cache key, - // otherwise we can't reuse the cached object and cache key should contain kafkaParam. - // So it should be safe to put the key/value pair only when the key doesn't exist. - val oldKafkaParams = objectFactory.keyToKafkaParams.putIfAbsent(key, kafkaParams) - require(oldKafkaParams == null || kafkaParams == oldKafkaParams, "Kafka parameters for same " + - s"cache key should be equal. old parameters: $oldKafkaParams new parameters: $kafkaParams") - } - - private def extractCacheKey(consumer: InternalKafkaConsumer): CacheKey = { + protected def createKey(consumer: InternalKafkaConsumer): CacheKey = { new CacheKey(consumer.topicPartition, consumer.kafkaParams) } } -private[kafka010] object InternalKafkaConsumerPool { - object CustomSwallowedExceptionListener extends SwallowedExceptionListener with Logging { - override def onSwallowException(e: Exception): Unit = { - logError(s"Error closing Kafka consumer", e) - } - } - - class PoolConfig(conf: SparkConf) extends GenericKeyedObjectPoolConfig[InternalKafkaConsumer] { - private var _softMaxSize = Int.MaxValue - - def softMaxSize: Int = _softMaxSize - - init() - - def init(): Unit = { - _softMaxSize = conf.get(CONSUMER_CACHE_CAPACITY) - - val jmxEnabled = conf.get(CONSUMER_CACHE_JMX_ENABLED) - val minEvictableIdleTimeMillis = conf.get(CONSUMER_CACHE_TIMEOUT) - val evictorThreadRunIntervalMillis = conf.get( - CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL) - - // NOTE: Below lines define the behavior, so do not modify unless you know what you are - // doing, and update the class doc accordingly if necessary when you modify. - - // 1. Set min idle objects per key to 0 to avoid creating unnecessary object. - // 2. Set max idle objects per key to 3 but set total objects per key to infinite - // which ensures borrowing per key is not restricted. - // 3. Set max total objects to infinite which ensures all objects are managed in this pool. - setMinIdlePerKey(0) - setMaxIdlePerKey(3) - setMaxTotalPerKey(-1) - setMaxTotal(-1) - - // Set minimum evictable idle time which will be referred from evictor thread - setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis) - setSoftMinEvictableIdleTimeMillis(-1) - - // evictor thread will run test with ten idle objects - setTimeBetweenEvictionRunsMillis(evictorThreadRunIntervalMillis) - setNumTestsPerEvictionRun(10) - setEvictionPolicy(new DefaultEvictionPolicy[InternalKafkaConsumer]()) - - // Immediately fail on exhausted pool while borrowing - setBlockWhenExhausted(false) +private class ConsumerPoolConfig(conf: SparkConf) extends PoolConfig[InternalKafkaConsumer] { + def softMaxSize: Int = conf.get(CONSUMER_CACHE_CAPACITY) + def jmxEnabled: Boolean = conf.get(CONSUMER_CACHE_JMX_ENABLED) + def minEvictableIdleTimeMillis: Long = conf.get(CONSUMER_CACHE_TIMEOUT) + def evictorThreadRunIntervalMillis: Long = conf.get(CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL) + def jmxNamePrefix: String = "kafka010-cached-simple-kafka-consumer-pool" +} - setJmxEnabled(jmxEnabled) - setJmxNamePrefix("kafka010-cached-simple-kafka-consumer-pool") - } +private class ConsumerObjectFactory extends ObjectFactory[CacheKey, InternalKafkaConsumer] { + override def destroyObject(key: CacheKey, p: PooledObject[InternalKafkaConsumer]): Unit = { + p.getObject.close() } - class ObjectFactory extends BaseKeyedPooledObjectFactory[CacheKey, InternalKafkaConsumer] { - val keyToKafkaParams = new ConcurrentHashMap[CacheKey, ju.Map[String, Object]]() - - override def create(key: CacheKey): InternalKafkaConsumer = { - Option(keyToKafkaParams.get(key)) match { - case Some(kafkaParams) => new InternalKafkaConsumer(key.topicPartition, kafkaParams) - case None => throw new IllegalStateException("Kafka params should be set before " + - "borrowing object.") - } - } - - override def wrap(value: InternalKafkaConsumer): PooledObject[InternalKafkaConsumer] = { - new DefaultPooledObject[InternalKafkaConsumer](value) - } - - override def destroyObject(key: CacheKey, p: PooledObject[InternalKafkaConsumer]): Unit = { - p.getObject.close() - } + protected def createValue( + key: CacheKey, + kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { + new InternalKafkaConsumer(key.topicPartition, kafkaParams) } } - diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala new file mode 100644 index 0000000000000..03d206eb8021f --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.commons.pool2.PooledObject + +import org.apache.spark.SparkConf +import org.apache.spark.sql.kafka010.InternalKafkaProducerPool.CacheKey + +private[kafka010] class InternalKafkaProducerPool( + objectFactory: ProducerObjectFactory, + poolConfig: ProducerPoolConfig) + extends InternalKafkaConnectorPool[CacheKey, CachedKafkaProducer]( + objectFactory, + poolConfig, + new CustomSwallowedExceptionListener("producer")) { + + def this(conf: SparkConf) = { + this(new ProducerObjectFactory, new ProducerPoolConfig(conf)) + } + + protected def createKey(producer: CachedKafkaProducer): CacheKey = { + producer.kafkaParams.asScala.toSeq.sortBy(x => x._1) + } +} + +private class ProducerPoolConfig(conf: SparkConf) extends PoolConfig[CachedKafkaProducer] { + def softMaxSize: Int = conf.get(PRODUCER_CACHE_CAPACITY) + def jmxEnabled: Boolean = conf.get(PRODUCER_CACHE_JMX_ENABLED) + def minEvictableIdleTimeMillis: Long = conf.get(PRODUCER_CACHE_TIMEOUT) + def evictorThreadRunIntervalMillis: Long = conf.get(PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL) + def jmxNamePrefix: String = "kafka010-cached-simple-kafka-producer-pool" +} + +private class ProducerObjectFactory extends ObjectFactory[CacheKey, CachedKafkaProducer] { + override def destroyObject(key: CacheKey, p: PooledObject[CachedKafkaProducer]): Unit = { + p.getObject.close() + } + + protected def createValue( + key: CacheKey, + kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = { + new CachedKafkaProducer(kafkaParams) + } +} + +private[kafka010] object InternalKafkaProducerPool { + type CacheKey = Seq[(String, Object)] + + def toCacheKey(params: ju.Map[String, Object]): CacheKey = { + params.asScala.toSeq.sortBy(x => x._1) + } +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 87036beb9a252..285c93d344fc5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -593,7 +593,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { consumerPool.close() } catch { case e: Throwable => - logWarning("Ignoring Exception while shutting down pools from shutdown hook", e) + logWarning("Ignoring exception while shutting down pools from shutdown hook", e) } } @@ -619,6 +619,10 @@ private[kafka010] object KafkaDataConsumer extends Logging { new KafkaDataConsumer(topicPartition, kafkaParams, consumerPool, fetchedDataPool) } + private[kafka010] def clear(): Unit = { + consumerPool.reset() + } + private def reportDataLoss0( failOnDataLoss: Boolean, finalMessage: String, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala index 3f8d3d2da5797..2c946f7855a1d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala @@ -44,7 +44,7 @@ private[kafka010] class KafkaDataWriter( inputSchema: Seq[Attribute]) extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - private lazy val producer = CachedKafkaProducer.getOrCreate(producerParams) + private lazy val producer = CachedKafkaProducer.acquire(producerParams) def write(row: InternalRow): Unit = { checkForErrors() @@ -61,14 +61,21 @@ private[kafka010] class KafkaDataWriter( KafkaDataWriterCommitMessage } - def abort(): Unit = {} + def abort(): Unit = { + close() + } def close(): Unit = { - checkForErrors() - if (producer != null) { - producer.flush() + try { checkForErrors() - CachedKafkaProducer.close(producerParams) + if (producer != null) { + producer.flush() + checkForErrors() + } + } finally { + if (producer != null) { + CachedKafkaProducer.release(producer) + } } } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index b423ddc959c1b..eb95603cf9943 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -39,13 +39,13 @@ private[kafka010] class KafkaWriteTask( inputSchema: Seq[Attribute], topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) { // used to synchronize with Kafka callbacks - private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ + private var producer: CachedKafkaProducer = _ /** * Writes key value data out to topics. */ def execute(iterator: Iterator[InternalRow]): Unit = { - producer = CachedKafkaProducer.getOrCreate(producerConfiguration) + producer = CachedKafkaProducer.acquire(producerConfiguration) while (iterator.hasNext && failedWrite == null) { val currentRow = iterator.next() sendRow(currentRow, producer) @@ -53,11 +53,17 @@ private[kafka010] class KafkaWriteTask( } def close(): Unit = { - checkForErrors() - if (producer != null) { - producer.flush() + try { checkForErrors() - producer = null + if (producer != null) { + producer.flush() + checkForErrors() + } + } finally { + if (producer != null) { + CachedKafkaProducer.release(producer) + producer = null + } } } } @@ -83,7 +89,7 @@ private[kafka010] abstract class KafkaRowWriter( * assuming the row is in Kafka. */ protected def sendRow( - row: InternalRow, producer: KafkaProducer[Array[Byte], Array[Byte]]): Unit = { + row: InternalRow, producer: CachedKafkaProducer): Unit = { val projectedRow = projection(row) val topic = projectedRow.getUTF8String(0) val key = projectedRow.getBinary(1) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala index 6f6ae55fc4971..f103b5b69b583 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala @@ -26,12 +26,6 @@ package object kafka010 { // scalastyle:ignore // ^^ scalastyle:ignore is for ignoring warnings about digits in package name type PartitionOffsetMap = Map[TopicPartition, Long] - private[kafka010] val PRODUCER_CACHE_TIMEOUT = - ConfigBuilder("spark.kafka.producer.cache.timeout") - .doc("The expire time to remove the unused producers.") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("10m") - private[kafka010] val CONSUMER_CACHE_CAPACITY = ConfigBuilder("spark.kafka.consumer.cache.capacity") .doc("The maximum number of consumers cached. Please note it's a soft limit" + @@ -74,4 +68,32 @@ package object kafka010 { // scalastyle:ignore "When non-positive, no idle evictor thread will be run.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1m") + + private[kafka010] val PRODUCER_CACHE_CAPACITY = + ConfigBuilder("spark.kafka.producer.cache.capacity") + .doc("The maximum number of producers cached. Please note it's a soft limit" + + " (check Structured Streaming Kafka integration guide for further details).") + .intConf + .createWithDefault(64) + + private[kafka010] val PRODUCER_CACHE_JMX_ENABLED = + ConfigBuilder("spark.kafka.producer.cache.jmx.enable") + .doc("Enable or disable JMX for pools created with this configuration instance.") + .booleanConf + .createWithDefault(false) + + private[kafka010] val PRODUCER_CACHE_TIMEOUT = + ConfigBuilder("spark.kafka.producer.cache.timeout") + .doc("The minimum amount of time a producer may sit idle in the pool before " + + "it is eligible for eviction by the evictor. " + + "When non-positive, no producers will be evicted from the pool due to idle time alone.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("5m") + + private[kafka010] val PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL = + ConfigBuilder("spark.kafka.producer.cache.evictorThreadRunInterval") + .doc("The interval of time between runs of the idle evictor thread for producer pool. " + + "When non-positive, no idle evictor thread will be run.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1m") } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 35c1379de160b..a5763e6a42d50 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -17,61 +17,141 @@ package org.apache.spark.sql.kafka010 -import java.{util => ju} -import java.util.concurrent.ConcurrentMap +import java.util.concurrent.{Executors, TimeUnit} -import org.apache.kafka.clients.producer.KafkaProducer +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG +import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.ProducerConfig.{KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG} import org.apache.kafka.common.serialization.ByteArraySerializer import org.scalatest.PrivateMethodTester +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._ import org.apache.spark.sql.test.SharedSparkSession class CachedKafkaProducerSuite extends SharedSparkSession with PrivateMethodTester with KafkaTest { - type KP = KafkaProducer[Array[Byte], Array[Byte]] + private var testUtils: KafkaTestUtils = _ + private val topic = "topic" + Random.nextInt() + private var producerPool: InternalKafkaProducerPool = _ + + override def beforeAll(): Unit = { + super.beforeAll() + testUtils = new KafkaTestUtils(Map[String, Object]()) + testUtils.setup() + } + + override def afterAll(): Unit = { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + } + super.afterAll() + } - protected override def beforeEach(): Unit = { + override def beforeEach(): Unit = { super.beforeEach() - CachedKafkaProducer.clear() + + producerPool = { + val internalKafkaConsumerPoolMethod = PrivateMethod[InternalKafkaProducerPool]('producerPool) + CachedKafkaProducer.invokePrivate(internalKafkaConsumerPoolMethod()) + } + + producerPool.reset() } - test("Should return the cached instance on calling getOrCreate with same params.") { - val kafkaParams = new ju.HashMap[String, Object]() - kafkaParams.put("acks", "0") + private def getKafkaParams(acks: Int = 0) = Map[String, Object]( + "acks" -> acks.toString, // Here only host should be resolvable, it does not need a running instance of kafka server. - kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") - kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) - kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) - val producer = CachedKafkaProducer.getOrCreate(kafkaParams) - val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams) - assert(producer == producer2) - - val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap) - val map = CachedKafkaProducer.invokePrivate(cacheMap()) - assert(map.size == 1) + BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress, + KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, + VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName + ).asJava + + test("acquire should return the cached instance with same params") { + val kafkaParams = getKafkaParams() + + val producer1 = CachedKafkaProducer.acquire(kafkaParams) + CachedKafkaProducer.release(producer1) + val producer2 = CachedKafkaProducer.acquire(kafkaParams) + CachedKafkaProducer.release(producer2) + + assert(producer1 === producer2) + assert(producerPool.size(toCacheKey(kafkaParams)) === 1) + assert(producerPool.size === 1) } - test("Should close the correct kafka producer for the given kafkaPrams.") { - val kafkaParams = new ju.HashMap[String, Object]() - kafkaParams.put("acks", "0") - kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") - kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) - kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) - val producer: KP = CachedKafkaProducer.getOrCreate(kafkaParams) - kafkaParams.put("acks", "1") - val producer2: KP = CachedKafkaProducer.getOrCreate(kafkaParams) - // With updated conf, a new producer instance should be created. - assert(producer != producer2) - - val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap) - val map = CachedKafkaProducer.invokePrivate(cacheMap()) - assert(map.size == 2) - - CachedKafkaProducer.close(kafkaParams) - val map2 = CachedKafkaProducer.invokePrivate(cacheMap()) - assert(map2.size == 1) - import scala.collection.JavaConverters._ - val (seq: Seq[(String, Object)], _producer: KP) = map2.asScala.toArray.apply(0) - assert(_producer == producer) + test("acquire should return a new instance with different params") { + val kafkaParams1 = getKafkaParams() + val kafkaParams2 = getKafkaParams(1) + + val producer1 = CachedKafkaProducer.acquire(kafkaParams1) + CachedKafkaProducer.release(producer1) + val producer2 = CachedKafkaProducer.acquire(kafkaParams2) + CachedKafkaProducer.release(producer2) + + assert(producer1 !== producer2) + assert(producerPool.size(toCacheKey(kafkaParams1)) === 1) + assert(producerPool.size(toCacheKey(kafkaParams2)) === 1) + assert(producerPool.size === 2) + } + + test("Concurrent use of CachedKafkaProducer") { + val data = (1 to 1000).map(_.toString) + testUtils.createTopic(topic, 1) + + val kafkaParams = getKafkaParams() + val numThreads = 100 + val numProducerUsages = 500 + + @volatile var error: Throwable = null + + val callback = new Callback() { + override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { + if (error == null && e != null) { + error = e + } + } + } + + def produce(): Unit = { + val taskContext = if (Random.nextBoolean) { + new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), null, null, null) + } else { + null + } + TaskContext.setTaskContext(taskContext) + val producer = CachedKafkaProducer.acquire(kafkaParams) + try { + data.foreach { d => + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, d.getBytes) + producer.send(record, callback) + } + } catch { + case e: Throwable => + if (error == null) { + error = e + } + throw e + } finally { + CachedKafkaProducer.release(producer) + } + } + + val threadpool = Executors.newFixedThreadPool(numThreads) + try { + val futures = (1 to numProducerUsages).map { i => + threadpool.submit(new Runnable { + override def run(): Unit = { produce() } + }) + } + futures.foreach(_.get(1, TimeUnit.MINUTES)) + assert(error == null) + } finally { + threadpool.shutdown() + } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala similarity index 96% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala rename to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala index 78d7feef58519..367b8a9231d90 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala @@ -29,7 +29,14 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession -class InternalKafkaConsumerPoolSuite extends SharedSparkSession { + +/* + * There are multiple implementations of [[InternalKafkaConnectorPool]] but they don't differ + * significantly. Because of that only [[InternalKafkaConsumerPool]] used to test all the + * functionality. If the behavior of implementations starts to differ it worth to add further + * tests but for now it would be mainly copy-paste. + */ +class InternalKafkaConnectorPoolSuite extends SharedSparkSession { test("basic multiple borrows and returns for single key") { val pool = new InternalKafkaConsumerPool(new SparkConf()) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala index d97f627fbac08..372c60db474d8 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.{TaskContext, TaskContextImpl} import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession -class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester { +class KafkaDataConsumerSuite extends SharedSparkSession with KafkaTest with PrivateMethodTester { protected var testUtils: KafkaTestUtils = _ private val topic = "topic" + Random.nextInt() @@ -148,7 +148,7 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester @volatile var error: Throwable = null - def consume(i: Int): Unit = { + def consume(): Unit = { val taskContext = if (Random.nextBoolean) { new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), null, null, null) } else { @@ -188,7 +188,7 @@ class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester try { val futures = (1 to numConsumerUsages).map { i => threadpool.submit(new Runnable { - override def run(): Unit = { consume(i) } + override def run(): Unit = { consume() } }) } futures.foreach(_.get(1, TimeUnit.MINUTES)) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala index 19acda95c707c..2900322b947be 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala @@ -21,12 +21,16 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite -/** A trait to clean cached Kafka producers in `afterAll` */ +/** A trait to clean cached Kafka connector in `afterAll` */ trait KafkaTest extends BeforeAndAfterAll { self: SparkFunSuite => override def afterAll(): Unit = { - super.afterAll() - CachedKafkaProducer.clear() + try { + KafkaDataConsumer.clear() + CachedKafkaProducer.clear() + } finally { + super.afterAll() + } } } From 481160b833e6b8adda730119e4ef737fa1ef68d5 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 19 Sep 2019 15:08:17 +0200 Subject: [PATCH 2/8] Since pool.reset() is best effort there is no guarantee it will be fully cleared --- .../apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index a5763e6a42d50..c7a1a2e58340b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -81,7 +81,6 @@ class CachedKafkaProducerSuite extends SharedSparkSession with PrivateMethodTest assert(producer1 === producer2) assert(producerPool.size(toCacheKey(kafkaParams)) === 1) - assert(producerPool.size === 1) } test("acquire should return a new instance with different params") { @@ -96,7 +95,6 @@ class CachedKafkaProducerSuite extends SharedSparkSession with PrivateMethodTest assert(producer1 !== producer2) assert(producerPool.size(toCacheKey(kafkaParams1)) === 1) assert(producerPool.size(toCacheKey(kafkaParams2)) === 1) - assert(producerPool.size === 2) } test("Concurrent use of CachedKafkaProducer") { From 9a3aef811dff567f000799060d79c5aec7686143 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 24 Sep 2019 13:03:15 +0200 Subject: [PATCH 3/8] Review fix --- .../spark/sql/kafka010/CachedKafkaProducer.scala | 10 +--------- .../sql/kafka010/InternalKafkaConnectorPool.scala | 10 ++++++++-- .../spark/sql/kafka010/InternalKafkaConsumerPool.scala | 4 ---- .../spark/sql/kafka010/InternalKafkaProducerPool.scala | 6 +----- .../apache/spark/sql/kafka010/KafkaDataConsumer.scala | 1 + .../sql/kafka010/InternalKafkaConnectorPoolSuite.scala | 1 - 6 files changed, 11 insertions(+), 21 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 6958cf126d1cf..907440ab3731b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -19,12 +19,10 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} import java.io.Closeable -import java.util.concurrent.ExecutionException import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord} import org.apache.spark.SparkEnv @@ -95,13 +93,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { .setAuthenticationConfigIfNeeded() .build() val key = toCacheKey(updatedKafkaParams) - try { - producerPool.borrowObject(key, updatedKafkaParams) - } catch { - case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) - if e.getCause != null => - throw e.getCause - } + producerPool.borrowObject(key, updatedKafkaParams) } def release(producer: CachedKafkaProducer): Unit = { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala index 51e629f85c2bb..b2bb20af78369 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.io.Closeable import java.util.concurrent.ConcurrentHashMap import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} @@ -42,7 +43,7 @@ import org.apache.spark.internal.Logging * not yet returned, hence provide thread-safety usage of non-thread-safe objects unless caller * shares the object to multiple threads. */ -private[kafka010] abstract class InternalKafkaConnectorPool[K, V]( +private[kafka010] abstract class InternalKafkaConnectorPool[K, V <: Closeable]( objectFactory: ObjectFactory[K, V], poolConfig: PoolConfig[V], swallowedExceptionListener: SwallowedExceptionListener) extends Logging { @@ -177,7 +178,8 @@ private[kafka010] abstract class PoolConfig[V] extends GenericKeyedObjectPoolCon } } -private[kafka010] abstract class ObjectFactory[K, V] extends BaseKeyedPooledObjectFactory[K, V] { +private[kafka010] abstract class ObjectFactory[K, V <: Closeable] + extends BaseKeyedPooledObjectFactory[K, V] { val keyToKafkaParams = new ConcurrentHashMap[K, ju.Map[String, Object]]() override def create(key: K): V = { @@ -192,6 +194,10 @@ private[kafka010] abstract class ObjectFactory[K, V] extends BaseKeyedPooledObje new DefaultPooledObject[V](value) } + override def destroyObject(key: K, p: PooledObject[V]): Unit = { + p.getObject.close() + } + protected def createValue(key: K, kafkaParams: ju.Map[String, Object]): V } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala index 0831869134c24..5f978c9497afc 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala @@ -55,10 +55,6 @@ private class ConsumerPoolConfig(conf: SparkConf) extends PoolConfig[InternalKaf } private class ConsumerObjectFactory extends ObjectFactory[CacheKey, InternalKafkaConsumer] { - override def destroyObject(key: CacheKey, p: PooledObject[InternalKafkaConsumer]): Unit = { - p.getObject.close() - } - protected def createValue( key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala index 03d206eb8021f..0c56eeb185cc0 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala @@ -39,7 +39,7 @@ private[kafka010] class InternalKafkaProducerPool( } protected def createKey(producer: CachedKafkaProducer): CacheKey = { - producer.kafkaParams.asScala.toSeq.sortBy(x => x._1) + InternalKafkaProducerPool.toCacheKey(producer.kafkaParams) } } @@ -52,10 +52,6 @@ private class ProducerPoolConfig(conf: SparkConf) extends PoolConfig[CachedKafka } private class ProducerObjectFactory extends ObjectFactory[CacheKey, CachedKafkaProducer] { - override def destroyObject(key: CacheKey, p: PooledObject[CachedKafkaProducer]): Unit = { - p.getObject.close() - } - protected def createValue( key: CacheKey, kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 285c93d344fc5..604d7e3f06ec0 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -621,6 +621,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { private[kafka010] def clear(): Unit = { consumerPool.reset() + fetchedDataPool.reset() } private def reportDataLoss0( diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala index 367b8a9231d90..3143429abd711 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPoolSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession - /* * There are multiple implementations of [[InternalKafkaConnectorPool]] but they don't differ * significantly. Because of that only [[InternalKafkaConsumerPool]] used to test all the From 408cd3c3c05b0fd221bb7f7f888c4781e0a61647 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 14 Oct 2019 18:08:10 +0200 Subject: [PATCH 4/8] KafkaDataWriter producer lifecycle fix --- .../spark/sql/kafka010/KafkaDataWriter.scala | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala index 2c946f7855a1d..870ed7aef4deb 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala @@ -44,7 +44,7 @@ private[kafka010] class KafkaDataWriter( inputSchema: Seq[Attribute]) extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - private lazy val producer = CachedKafkaProducer.acquire(producerParams) + private var producer = CachedKafkaProducer.acquire(producerParams) def write(row: InternalRow): Unit = { checkForErrors() @@ -55,9 +55,13 @@ private[kafka010] class KafkaDataWriter( // Send is asynchronous, but we can't commit until all rows are actually in Kafka. // This requires flushing and then checking that no callbacks produced errors. // We also check for errors before to fail as soon as possible - the check is cheap. - checkForErrors() - producer.flush() - checkForErrors() + try { + checkForErrors() + producer.flush() + checkForErrors() + } finally { + releaseProducer() + } KafkaDataWriterCommitMessage } @@ -73,9 +77,14 @@ private[kafka010] class KafkaDataWriter( checkForErrors() } } finally { - if (producer != null) { - CachedKafkaProducer.release(producer) - } + releaseProducer() + } + } + + private def releaseProducer(): Unit = { + if (producer != null) { + CachedKafkaProducer.release(producer) + producer = null } } } From 3046e6c6217dfcc947dbe5616c17dd939b64cd73 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 30 Oct 2019 15:04:40 +0100 Subject: [PATCH 5/8] Review fix --- .../apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala index b2bb20af78369..55b1d20e0ff00 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala @@ -203,6 +203,7 @@ private[kafka010] abstract class ObjectFactory[K, V <: Closeable] private[kafka010] class CustomSwallowedExceptionListener(connectorType: String) extends SwallowedExceptionListener with Logging { + override def onSwallowException(e: Exception): Unit = { logError(s"Error closing Kafka $connectorType", e) } From aacd16ccaa406df14179460e3a72cb237b7bdc16 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 5 Nov 2019 10:28:46 +0100 Subject: [PATCH 6/8] Log type fix --- .../apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala index 55b1d20e0ff00..0fb250e8d05ae 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala @@ -205,6 +205,6 @@ private[kafka010] class CustomSwallowedExceptionListener(connectorType: String) extends SwallowedExceptionListener with Logging { override def onSwallowException(e: Exception): Unit = { - logError(s"Error closing Kafka $connectorType", e) + logWarning(s"Error closing Kafka $connectorType", e) } } From 1c86f0442dd86ef6387d3c5189fd6e5a9ec90627 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 6 Nov 2019 11:18:13 +0100 Subject: [PATCH 7/8] Review fix --- .../spark/sql/kafka010/CachedKafkaProducerSuite.scala | 6 ------ .../apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala | 2 +- .../spark/streaming/kafka010/KafkaDataConsumerSuite.scala | 7 ------- 3 files changed, 1 insertion(+), 14 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index c7a1a2e58340b..4506a4029d88d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -128,12 +128,6 @@ class CachedKafkaProducerSuite extends SharedSparkSession with PrivateMethodTest val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, d.getBytes) producer.send(record, callback) } - } catch { - case e: Throwable => - if (error == null) { - error = e - } - throw e } finally { CachedKafkaProducer.release(producer) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala index 039fc9c92b829..6e1f10e7f6d74 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala @@ -233,7 +233,7 @@ class KafkaDataConsumerSuite val threadpool = Executors.newFixedThreadPool(numThreads) try { - val futures = (1 to numConsumerUsages).map { i => + val futures = (1 to numConsumerUsages).map { _ => threadpool.submit(new Runnable { override def run(): Unit = { consume() } }) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala index 82913cf416a5f..246672bcbacfc 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala @@ -121,8 +121,6 @@ class KafkaDataConsumerSuite extends SparkFunSuite with MockitoSugar with Before val numThreads = 100 val numConsumerUsages = 500 - @volatile var error: Throwable = null - def consume(i: Int): Unit = { val useCache = Random.nextBoolean val taskContext = if (Random.nextBoolean) { @@ -138,10 +136,6 @@ class KafkaDataConsumerSuite extends SparkFunSuite with MockitoSugar with Before new String(bytes) } assert(rcvd == data) - } catch { - case e: Throwable => - error = e - throw e } finally { consumer.release() } @@ -155,7 +149,6 @@ class KafkaDataConsumerSuite extends SparkFunSuite with MockitoSugar with Before }) } futures.foreach(_.get(1, TimeUnit.MINUTES)) - assert(error == null) } finally { threadPool.shutdown() } From ebe9708c96e8127b9ef5c43fc26cff2811a79d82 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 7 Nov 2019 14:50:57 +0100 Subject: [PATCH 8/8] override added --- .../apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala | 4 ++-- .../apache/spark/sql/kafka010/InternalKafkaProducerPool.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala index 5f978c9497afc..a8e6045646451 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala @@ -41,7 +41,7 @@ private[kafka010] class InternalKafkaConsumerPool( this(new ConsumerObjectFactory, new ConsumerPoolConfig(conf)) } - protected def createKey(consumer: InternalKafkaConsumer): CacheKey = { + override protected def createKey(consumer: InternalKafkaConsumer): CacheKey = { new CacheKey(consumer.topicPartition, consumer.kafkaParams) } } @@ -55,7 +55,7 @@ private class ConsumerPoolConfig(conf: SparkConf) extends PoolConfig[InternalKaf } private class ConsumerObjectFactory extends ObjectFactory[CacheKey, InternalKafkaConsumer] { - protected def createValue( + override protected def createValue( key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { new InternalKafkaConsumer(key.topicPartition, kafkaParams) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala index 0c56eeb185cc0..165b64313abb5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala @@ -38,7 +38,7 @@ private[kafka010] class InternalKafkaProducerPool( this(new ProducerObjectFactory, new ProducerPoolConfig(conf)) } - protected def createKey(producer: CachedKafkaProducer): CacheKey = { + override protected def createKey(producer: CachedKafkaProducer): CacheKey = { InternalKafkaProducerPool.toCacheKey(producer.kafkaParams) } } @@ -52,7 +52,7 @@ private class ProducerPoolConfig(conf: SparkConf) extends PoolConfig[CachedKafka } private class ProducerObjectFactory extends ObjectFactory[CacheKey, CachedKafkaProducer] { - protected def createValue( + override protected def createValue( key: CacheKey, kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = { new CachedKafkaProducer(kafkaParams)