Skip to content

[SPARK-9990][SQL]Create local hash join operator#8535

Closed
zsxwing wants to merge 8 commits into
apache:masterfrom
zsxwing:SPARK-9990
Closed

[SPARK-9990][SQL]Create local hash join operator#8535
zsxwing wants to merge 8 commits into
apache:masterfrom
zsxwing:SPARK-9990

Conversation

@zsxwing

@zsxwing zsxwing commented Aug 31, 2015

Copy link
Copy Markdown
Member

This PR includes the following changes:

  • Add SQLConf to LocalNode
  • Add HashJoinNode
  • Add ConvertToUnsafeNode and ConvertToSafeNode.scala to test unsafe hash join.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use a new method name because the default parameter is in conflict with overloading.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some java doc to explain what this method's doing?

@SparkQA

SparkQA commented Aug 31, 2015

Copy link
Copy Markdown

Test build #41824 has finished for PR 8535 at commit 2ca5778.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf)
    • case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf)
    • case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
    • case class HashJoinNode (
    • case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf)
    • abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
    • abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf)
    • abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf)
    • abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf)
    • case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)
    • case class SeqScanNode(conf: SQLConf, output: Seq[Attribute], data: Seq[InternalRow])
    • case class UnionNode(conf: SQLConf, children: Seq[LocalNode]) extends LocalNode(conf)

@zsxwing

zsxwing commented Sep 1, 2015

Copy link
Copy Markdown
Member Author

@rxin do we need to make these local classes private[sql]?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just use tungstenEnabled here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the ProjectNode always uses unsafe projection, should we control that by this config?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the ProjectNode always uses unsafe projection, should we control that by this config?

Agreed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just use tungstenEnabled here?

Just followed SparkPlan.

@SparkQA

SparkQA commented Sep 2, 2015

Copy link
Copy Markdown

Test build #41902 has finished for PR 8535 at commit aa928fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf)
    • case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf)
    • case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
    • case class HashJoinNode (
    • case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf)
    • abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
    • abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf)
    • abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf)
    • abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf)
    • case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)
    • case class SeqScanNode(conf: SQLConf, output: Seq[Attribute], data: Seq[InternalRow])
    • case class UnionNode(conf: SQLConf, children: Seq[LocalNode]) extends LocalNode(conf)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think implementing the wrapper is better since it's not very complicated. Duplicate code in general is really bad and hard to maintain. We can have something like the following in LocalNode

def asIterator: Iterator[InternalRow] = new LocalNodeIterator(this)

then provide the dummy SQLMetrics.nullLongMetric

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote some code for this. Feel free to steal or come up with something better. (not tested!)

/**
 * An thin wrapper around a [[LocalNode]] that provides an iterator interface.
 */
private class LocalNodeIterator(localNode: LocalNode) extends Iterator[InternalRow] {
  private var nextRow: InternalRow = _

  override def hasNext: Boolean = {
    if (nextRow == null) {
      val res = localNode.next()
      if (res) {
        nextRow = localNode.fetch()
      }
      res
    } else {
      true
    }
  }

  override def next(): InternalRow = {
    if (hasNext) {
      val res = nextRow
      nextRow = null
      res
    } else {
      throw new NoSuchElementException
    }
  }
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added LocalNodeIterator to this PR :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants