Skip to content
Merged
Prev Previous commit
Fix error in SQL query
  • Loading branch information
agodnic committed Oct 23, 2024
commit d83ddd9f35334c319bdb6592e6adc5ca85aad9a4
112 changes: 61 additions & 51 deletions idb/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,77 +723,82 @@ func buildTransactionQuery(tf idb.TransactionFilter) (query string, whereArgs []
return
}

// buildBlockWhereTerms generates some of the terms that go in the WHERE clause for the block search query.
//
// It only generates terms that filter blocks based on round and/or timestamp.
// buildBlockFilters generates filters based on a block's round and/or timestamp.
//
// Filters related to participation are generated elsewhere.
func buildBlockWhereTerms(bf idb.BlockFilter) []string {

var terms []string
//
// Some of the filters are meant to be used as boolean conditions in the WHERE clause.
// Others, for performance reasons, have to be used as INNER JOIN terms in the FROM clause.
func buildBlockFilters(bf idb.BlockFilter) (whereTerms []string, joinTerms []string) {

// Round-based filters
if bf.MaxRound != nil {
terms = append(
terms,
fmt.Sprintf("round <= %d", *bf.MaxRound),
whereTerms = append(
whereTerms,
fmt.Sprintf("bh.round <= %d", *bf.MaxRound),
)
}
if bf.MinRound != nil {
terms = append(
terms,
fmt.Sprintf("round >= %d", *bf.MinRound),
whereTerms = append(
whereTerms,
fmt.Sprintf("bh.round >= %d", *bf.MinRound),
)
}

// Timestamp-based filters
//
// Converting the timestamp into a round usually results in faster execution plans
// (compared to the execution plans that would result from using the `block_header.realtime` column directly)
//
// Unfortunately, writing this condition in the WHERE clause results in poor execution plans.
// Expressing the filter as an INNER JOIN results in an efficient query execution plan.
if !bf.AfterTime.IsZero() {
tmpl := `
round >= (
SELECT tmp.round
INNER JOIN (
SELECT COALESCE(tmp.round, 0) AS round
FROM block_header tmp
WHERE tmp.realtime > (to_timestamp(%d) AT TIME ZONE 'UTC')
ORDER BY tmp.realtime ASC, tmp.round ASC
LIMIT 1
)`
terms = append(
terms,
) bh_at ON bh.round >= bh_at.round
`
joinTerms = append(
joinTerms,
fmt.Sprintf(tmpl, bf.AfterTime.UTC().Unix()),
)
}
if !bf.BeforeTime.IsZero() {
tmpl := `
round <= (
SELECT tmp.round
INNER JOIN (
SELECT COALESCE(tmp.round, 0) AS round
FROM block_header tmp
WHERE tmp.realtime < (to_timestamp(%d) AT TIME ZONE 'UTC')
ORDER BY tmp.realtime DESC, tmp.round DESC
LIMIT 1
)`
terms = append(
terms,
) bh_bt ON bh.round <= bh_bt.round
`
joinTerms = append(
joinTerms,
fmt.Sprintf(tmpl, bf.BeforeTime.UTC().Unix()),
)
}

return terms
return whereTerms, joinTerms
}

func buildBlockQuery(bf idb.BlockFilter) (query string, err error) {

// helper function to build CTEs
buildCte := func(cteName string, whereTerms []string) string {
buildCte := func(cteName string, whereTerms []string, joinTerms []string) string {
tmpl := `%s AS (
SELECT round, header
FROM block_header
SELECT bh.round, bh.header
FROM block_header bh
%s
WHERE %s
ORDER BY round ASC
ORDER BY bh.round ASC
LIMIT %d
)`
return fmt.Sprintf(tmpl, cteName, strings.Join(whereTerms, " AND "), bf.Limit)
return fmt.Sprintf(tmpl, cteName, strings.Join(joinTerms, "\n"), strings.Join(whereTerms, " AND "), bf.Limit)
}

// Build auxiliary CTEs for participation-related parameters.
Expand All @@ -807,52 +812,52 @@ func buildBlockQuery(bf idb.BlockFilter) (query string, err error) {
var CteNames []string
{
if len(bf.Proposers) > 0 {
terms := buildBlockWhereTerms(bf)
whereTerms, joinTerms := buildBlockFilters(bf)
var proposersStr []string
for addr := range bf.Proposers {
proposersStr = append(proposersStr, `'"`+addr.String()+`"'`)
}
terms = append(
terms,
fmt.Sprintf("( (header->'prp') IS NOT NULL AND ((header->'prp')::TEXT IN (%s)) )", strings.Join(proposersStr, ",")),
whereTerms = append(
whereTerms,
fmt.Sprintf("( (bh.header->'prp') IS NOT NULL AND ((bh.header->'prp')::TEXT IN (%s)) )", strings.Join(proposersStr, ",")),
)

cteName := "prp"
cte := buildCte(cteName, terms)
cte := buildCte(cteName, whereTerms, joinTerms)
CTEs = append(CTEs, cte)
CteNames = append(CteNames, cteName)

}
if len(bf.ExpiredParticipationAccounts) > 0 {
terms := buildBlockWhereTerms(bf)
whereTerms, joinTerms := buildBlockFilters(bf)
var expiredStr []string
for addr := range bf.ExpiredParticipationAccounts {
expiredStr = append(expiredStr, `'`+addr.String()+`'`)
}
terms = append(
terms,
fmt.Sprintf("( (header->'partupdrmv') IS NOT NULL AND (header->'partupdrmv') ?| array[%s] )", strings.Join(expiredStr, ",")),
whereTerms = append(
whereTerms,
fmt.Sprintf("( (bh.header->'partupdrmv') IS NOT NULL AND (bh.header->'partupdrmv') ?| array[%s] )", strings.Join(expiredStr, ",")),
)

cteName := "expired"
CTE := buildCte(cteName, terms)
CTE := buildCte(cteName, whereTerms, joinTerms)
CTEs = append(CTEs, CTE)
CteNames = append(CteNames, "expired")

}
if len(bf.AbsentParticipationAccounts) > 0 {
terms := buildBlockWhereTerms(bf)
whereTerms, joinTerms := buildBlockFilters(bf)
var absentStr []string
for addr := range bf.AbsentParticipationAccounts {
absentStr = append(absentStr, `'`+addr.String()+`'`)
}
terms = append(
terms,
fmt.Sprintf("( (header->'partupdabs') IS NOT NULL AND (header->'partupdabs') ?| array[%s] )", strings.Join(absentStr, ",")),
whereTerms = append(
whereTerms,
fmt.Sprintf("( (bh.header->'partupdabs') IS NOT NULL AND (bh.header->'partupdabs') ?| array[%s] )", strings.Join(absentStr, ",")),
)

cteName := "absent"
CTE := buildCte(cteName, terms)
CTE := buildCte(cteName, whereTerms, joinTerms)
CTEs = append(CTEs, CTE)
CteNames = append(CteNames, cteName)
}
Expand All @@ -875,28 +880,33 @@ func buildBlockQuery(bf idb.BlockFilter) (query string, err error) {

var fromTable string
if len(CTEs) > 0 {
fromTable = "tmp"
fromTable = "tmp bh"
} else {
fromTable = "block_header"
fromTable = "block_header bh"
}

var whereClause string
var joinClause string
if len(CTEs) == 0 {
terms := buildBlockWhereTerms(bf)
if len(terms) > 0 {
whereClause = "WHERE " + strings.Join(terms, " AND ") + "\n"
whereTerms, joinTerms := buildBlockFilters(bf)
if len(whereTerms) > 0 {
whereClause = "WHERE " + strings.Join(whereTerms, " AND ") + "\n"
}
if len(joinTerms) > 0 {
joinClause = strings.Join(joinTerms, "\n")
}
}

tmpl := `
%s
SELECT header
SELECT bh.header
FROM %s
%s
ORDER BY round ASC
%s
ORDER BY bh.round ASC
LIMIT %d`

query = fmt.Sprintf(tmpl, withClause, fromTable, whereClause, bf.Limit)
query = fmt.Sprintf(tmpl, withClause, fromTable, joinClause, whereClause, bf.Limit)
}

return query, nil
Expand Down