Skip to content
Draft
Prev Previous commit
Next Next commit
feat(push): add maxLifetime maxOpen and maxIdle options parameters
  • Loading branch information
adrienaury committed Mar 13, 2024
commit 22d7c6f56dcb29f7b82004982d92a33d4da04e98
6 changes: 3 additions & 3 deletions cmd/lino/dep_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ func traceListner(file *os.File) domain.TraceListener {
return infra.NewJSONTraceListener(file)
}

func maxLifeTime(maxLifetimeInSeconds int64) domain.DataSourceOption {
func pullMaxLifeTime(maxLifetimeInSeconds int64) domain.DataSourceOption {
return infra.WithMaxLifetime(time.Duration(maxLifetimeInSeconds) * time.Second)
}

func maxOpenConns(maxOpenConns int) domain.DataSourceOption {
func pullMaxOpenConns(maxOpenConns int) domain.DataSourceOption {
return infra.WithMaxOpenConns(maxOpenConns)
}

func maxIdleConns(maxIdleConns int) domain.DataSourceOption {
func pullMaxIdleConns(maxIdleConns int) domain.DataSourceOption {
return infra.WithMaxIdleConns(maxIdleConns)
}
13 changes: 13 additions & 0 deletions cmd/lino/dep_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main

import (
"io"
"time"

infra "github.com/cgi-fr/lino/internal/infra/push"
domain "github.com/cgi-fr/lino/pkg/push"
Expand Down Expand Up @@ -48,3 +49,15 @@ func pushRowExporterFactory() func(io.Writer) domain.RowWriter {
func pushTranslator() domain.Translator {
return infra.NewFileTranslator()
}

func pushMaxLifeTime(maxLifetimeInSeconds int64) domain.DataDestinationOption {
return infra.WithMaxLifetime(time.Duration(maxLifetimeInSeconds) * time.Second)
}

func pushMaxOpenConns(maxOpenConns int) domain.DataDestinationOption {
return infra.WithMaxOpenConns(maxOpenConns)
}

func pushMaxIdleConns(maxIdleConns int) domain.DataDestinationOption {
return infra.WithMaxIdleConns(maxIdleConns)
}
4 changes: 2 additions & 2 deletions cmd/lino/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func initConfig() {
table.Inject(dataconnectorStorage(), tableStorage(), tableExtractorFactory())
sequence.Inject(dataconnectorStorage(), tableStorage(), sequenceStorage(), sequenceUpdatorFactory())
id.Inject(idStorageFile, relationStorage(), idExporter(), idJSONStorage(*os.Stdout))
pull.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pullDataSourceFactory(), pullRowExporterFactory(), pullRowReaderFactory(), pullKeyStoreFactory(), traceListner(os.Stderr), maxLifeTime, maxOpenConns, maxIdleConns)
push.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pushDataDestinationFactory(), pushRowIteratorFactory(), pushRowExporterFactory(), pushTranslator())
pull.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pullDataSourceFactory(), pullRowExporterFactory(), pullRowReaderFactory(), pullKeyStoreFactory(), traceListner(os.Stderr), pullMaxLifeTime, pullMaxOpenConns, pullMaxIdleConns)
push.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pushDataDestinationFactory(), pushRowIteratorFactory(), pushRowExporterFactory(), pushTranslator(), pushMaxLifeTime, pushMaxOpenConns, pushMaxIdleConns)
}

func writeMetricsToFile(statsFile string, statsByte []byte) {
Expand Down
57 changes: 44 additions & 13 deletions internal/app/push/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (
rowIteratorFactory func(io.ReadCloser) push.RowIterator
rowExporterFactory func(io.Writer) push.RowWriter
translator push.Translator
maxLifeTimeOption func(int64) push.DataDestinationOption
maxOpenConnsOption func(int) push.DataDestinationOption
maxIdleConnsOption func(int) push.DataDestinationOption
)

// Inject dependencies
Expand All @@ -57,6 +60,9 @@ func Inject(
rif func(io.ReadCloser) push.RowIterator,
ref func(io.Writer) push.RowWriter,
trnsltor push.Translator,
mltOpt func(int64) push.DataDestinationOption,
mocOpt func(int) push.DataDestinationOption,
micOpt func(int) push.DataDestinationOption,
) {
dataconnectorStorage = dbas
relStorage = rs
Expand All @@ -66,21 +72,26 @@ func Inject(
rowIteratorFactory = rif
rowExporterFactory = ref
translator = trnsltor
maxLifeTimeOption = mltOpt
maxOpenConnsOption = mocOpt
maxIdleConnsOption = micOpt
}

// NewCommand implements the cli pull command
func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra.Command {
var (
commitSize uint
disableConstraints bool
catchErrors string
table string
ingressDescriptor string
rowExporter push.RowWriter
pkTranslations map[string]string
whereField string
savepoint string
autoTruncate bool
commitSize uint
disableConstraints bool
catchErrors string
table string
ingressDescriptor string
rowExporter push.RowWriter
pkTranslations map[string]string
whereField string
savepoint string
autoTruncate bool
maxLifeTimeInSeconds int64
maxOpenConns, maxIdleConns int
)

cmd := &cobra.Command{
Expand All @@ -106,6 +117,9 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
Bool("disable-constraints", disableConstraints).
Str("catch-errors", catchErrors).
Str("table", table).
Int64("maxLifeTimeInSeconds", maxLifeTimeInSeconds).
Int("maxOpenConns", maxOpenConns).
Int("maxIdleConns", maxIdleConns).
Msg("Push mode")
},
Run: func(cmd *cobra.Command, args []string) {
Expand All @@ -122,7 +136,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
mode, _ = push.ParseMode(args[0])
}

datadestination, e1 := getDataDestination(dcDestination)
datadestination, e1 := getDataDestination(dcDestination, maxLifeTimeInSeconds, maxOpenConns, maxIdleConns)
if e1 != nil {
fmt.Fprintln(err, e1.Error())
os.Exit(1)
Expand Down Expand Up @@ -174,6 +188,9 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
cmd.Flags().StringVar(&whereField, "using-pk-field", "__usingpk__", "Name of the data field that can be used as pk for update queries")
cmd.Flags().StringVar(&savepoint, "savepoint", "", "Name of a file to write primary keys of effectively processed lines (commit to database)")
cmd.Flags().BoolVarP(&autoTruncate, "autotruncate", "a", false, "Automatically truncate values to the maximum length defined in table.yaml")
cmd.Flags().Int64Var(&maxLifeTimeInSeconds, "conn-max-lifetime", -1, "sets the maximum amount of time (in seconds) a connection may be reused")
cmd.Flags().IntVar(&maxOpenConns, "conn-max-open", -1, "sets the maximum number of open connections to the database")
cmd.Flags().IntVar(&maxIdleConns, "conn-max-idle", -1, "sets the maximum number of connections in the idle connection pool")
cmd.SetOut(out)
cmd.SetErr(err)
cmd.SetIn(in)
Expand Down Expand Up @@ -221,7 +238,7 @@ func loadTranslator(pkTranslations map[string]string) error {
return nil
}

func getDataDestination(dataconnectorName string) (push.DataDestination, *push.Error) {
func getDataDestination(dataconnectorName string, maxLifeTimeInSeconds int64, maxOpenConns, maxIdleConns int) (push.DataDestination, *push.Error) {
alias, e1 := dataconnector.Get(dataconnectorStorage, dataconnectorName)
if e1 != nil {
return nil, &push.Error{Description: e1.Error()}
Expand All @@ -240,7 +257,21 @@ func getDataDestination(dataconnectorName string) (push.DataDestination, *push.E
return nil, &push.Error{Description: "no datadestination found for database type " + u.UnaliasedDriver}
}

return datadestinationFactory.New(u.URL.String(), alias.Schema), nil
options := []push.DataDestinationOption{}

if maxLifeTimeInSeconds >= 0 {
options = append(options, maxLifeTimeOption(maxLifeTimeInSeconds))
}

if maxOpenConns >= 0 {
options = append(options, maxOpenConnsOption(maxOpenConns))
}

if maxIdleConns >= 0 {
options = append(options, maxIdleConnsOption(maxIdleConns))
}

return datadestinationFactory.New(u.URL.String(), alias.Schema, options...), nil
}

func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, *push.Error) {
Expand Down
5 changes: 4 additions & 1 deletion internal/app/push/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func Test_getDataDestination(t *testing.T) {
func(io.ReadCloser) push.RowIterator { return &push.MockRowIterator{} },
func(io.Writer) push.RowWriter { return &push.MockRowWriter{} },
push.NewMockTranslator(),
maxLifeTimeOption,
maxOpenConnsOption,
maxIdleConnsOption,
)

type args struct {
Expand All @@ -63,7 +66,7 @@ func Test_getDataDestination(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1 := getDataDestination(tt.args.dataconnectorName)
got, got1 := getDataDestination(tt.args.dataconnectorName, -1, -1, -1)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getDataDestination() got = %v, want %v", got, tt.want)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/push/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc
return
}

datadestination, err := getDataDestination(dcDestination)
datadestination, err := getDataDestination(dcDestination, -1, -1, -1)
if err != nil {
log.Error().Err(err).Msg("")
w.WriteHeader(http.StatusNotFound)
Expand Down
4 changes: 2 additions & 2 deletions internal/infra/push/datadestination_db2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func NewDb2DataDestinationFactory() *Db2DataDestinationFactory {
}

// New return a Db2 pusher
func (e *Db2DataDestinationFactory) New(url string, schema string) push.DataDestination {
return NewSQLDataDestination(url, schema, Db2Dialect{})
func (e *Db2DataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination {
return NewSQLDataDestination(url, schema, Db2Dialect{}, options...)
}

// Db2Dialect inject oracle variations
Expand Down
4 changes: 2 additions & 2 deletions internal/infra/push/datadestination_db2_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func NewDb2DataDestinationFactory() *Db2DataDestinationFactory {
}

// New return a Db2 pusher
func (e *Db2DataDestinationFactory) New(url string, schema string) push.DataDestination {
return NewSQLDataDestination(url, schema, Db2Dialect{})
func (e *Db2DataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination {
return NewSQLDataDestination(url, schema, Db2Dialect{}, options...)
}

// Db2Dialect inject oracle variations
Expand Down
2 changes: 1 addition & 1 deletion internal/infra/push/datadestination_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewHTTPDataDestinationFactory() *HTTPDataDestinationFactory {
}

// New return a HTTP pusher
func (e *HTTPDataDestinationFactory) New(url string, schema string) push.DataDestination {
func (e *HTTPDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination {
return NewHTTPDataDestination(url, schema)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/infra/push/datadestination_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func NewMariadbDataDestinationFactory() *MariadbDataDestinationFactory {
}

// New return a Mariadb pusher
func (e *MariadbDataDestinationFactory) New(url string, schema string) push.DataDestination {
return NewSQLDataDestination(url, schema, MariadbDialect{})
func (e *MariadbDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination {
return NewSQLDataDestination(url, schema, MariadbDialect{}, options...)
}

// MariadbDialect inject mariadb variations
Expand Down
4 changes: 2 additions & 2 deletions internal/infra/push/datadestination_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func NewOracleDataDestinationFactory() *OracleDataDestinationFactory {
}

// New return a Oracle pusher
func (e *OracleDataDestinationFactory) New(url string, schema string) push.DataDestination {
return NewSQLDataDestination(url, schema, OracleDialect{})
func (e *OracleDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination {
return NewSQLDataDestination(url, schema, OracleDialect{}, options...)
}

// OracleDialect inject oracle variations
Expand Down
4 changes: 2 additions & 2 deletions internal/infra/push/datadestination_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func NewPostgresDataDestinationFactory() *PostgresDataDestinationFactory {
}

// New return a Postgres pusher
func (e *PostgresDataDestinationFactory) New(url string, schema string) push.DataDestination {
return NewSQLDataDestination(url, schema, PostgresDialect{})
func (e *PostgresDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination {
return NewSQLDataDestination(url, schema, PostgresDialect{}, options...)
}

// PostgresDialect inject postgres variations
Expand Down
40 changes: 38 additions & 2 deletions internal/infra/push/datadestination_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,35 @@ import (
"database/sql"
"fmt"
"strings"
"time"

"github.com/cgi-fr/lino/pkg/push"
"github.com/jmoiron/sqlx"
"github.com/rs/zerolog/log"
"github.com/xo/dburl"
)

func WithMaxLifetime(maxLifeTime time.Duration) push.DataDestinationOption {
return func(ds push.DataDestination) {
log.Info().Int64("maxLifetime", int64(maxLifeTime.Seconds())).Msg("setting database connection parameter")
ds.(*SQLDataDestination).maxLifetime = maxLifeTime
}
}

func WithMaxOpenConns(maxOpenConns int) push.DataDestinationOption {
return func(ds push.DataDestination) {
log.Info().Int("maxOpenConns", maxOpenConns).Msg("setting database connection parameter")
ds.(*SQLDataDestination).maxOpenConns = maxOpenConns
}
}

func WithMaxIdleConns(maxIdleConns int) push.DataDestinationOption {
return func(ds push.DataDestination) {
log.Info().Int("maxIdleConns", maxIdleConns).Msg("setting database connection parameter")
ds.(*SQLDataDestination).maxIdleConns = maxIdleConns
}
}

// SQLDataDestination read data from a SQL database.
type SQLDataDestination struct {
url string
Expand All @@ -38,16 +60,25 @@ type SQLDataDestination struct {
mode push.Mode
disableConstraints bool
dialect SQLDialect
maxLifetime time.Duration
maxOpenConns int
maxIdleConns int
}

// NewSQLDataDestination creates a new SQL datadestination.
func NewSQLDataDestination(url string, schema string, dialect SQLDialect) *SQLDataDestination {
return &SQLDataDestination{
func NewSQLDataDestination(url string, schema string, dialect SQLDialect, options ...push.DataDestinationOption) *SQLDataDestination {
dd := &SQLDataDestination{
url: url,
schema: schema,
rowWriter: map[string]*SQLRowWriter{},
dialect: dialect,
}

for _, option := range options {
option(dd)
}

return dd
}

// Close SQL connections
Expand Down Expand Up @@ -130,6 +161,11 @@ func (dd *SQLDataDestination) Open(plan push.Plan, mode push.Mode, disableConstr
return &push.Error{Description: err.Error()}
}

// database handle settings
db.SetConnMaxLifetime(dd.maxLifetime)
db.SetMaxOpenConns(dd.maxOpenConns)
db.SetMaxIdleConns(dd.maxIdleConns)

u, err := dburl.Parse(dd.url)
if err != nil {
return &push.Error{Description: err.Error()}
Expand Down
4 changes: 2 additions & 2 deletions internal/infra/push/datadestination_sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func NewSQLServerDataDestinationFactory() *SQLServerDataDestinationFactory {
}

// New return a SQLServer pusher
func (e *SQLServerDataDestinationFactory) New(url string, schema string) push.DataDestination {
return NewSQLDataDestination(url, schema, SQLServerDialect{})
func (e *SQLServerDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination {
return NewSQLDataDestination(url, schema, SQLServerDialect{}, options...)
}

// SQLServerDialect inject SQLServer variations
Expand Down
4 changes: 2 additions & 2 deletions internal/infra/push/datadestination_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

type Action string

// "pull_open", "push_open", "push_data", "push_commit", "push_close"
// "pull_open", "push_open", "push_data", "push_commit", "push_close"
const (
PushOpen Action = "push_open"
PushData Action = "push_data"
Expand Down Expand Up @@ -77,7 +77,7 @@ func NewWebSocketDataDestinationFactory() *WebSocketDataDestinationFactory {
}

// New return a web socket pusher
func (e *WebSocketDataDestinationFactory) New(url string, schema string) push.DataDestination {
func (e *WebSocketDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination {
return NewWebSocketDataDestination(url, schema)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/push/driven.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package push

type DataDestinationOption func(DataDestination)

// DataDestinationFactory exposes methods to create new datadestinations.
type DataDestinationFactory interface {
New(url string, schema string) DataDestination
New(url string, schema string, options ...DataDestinationOption) DataDestination
}

// DataDestination to write in the push process.
Expand Down