diff --git a/src/fluentd/build/fluentd.k8s.dockerfile b/src/fluentd/build/fluentd.k8s.dockerfile index ab2a422aaf..8649db6dea 100644 --- a/src/fluentd/build/fluentd.k8s.dockerfile +++ b/src/fluentd/build/fluentd.k8s.dockerfile @@ -22,7 +22,8 @@ RUN apk add --no-cache --update --virtual .build-deps \ sudo build-base ruby-dev make gcc libc-dev postgresql-dev git \ && apk add --no-cache --update libpq \ && sudo gem install fluent-plugin-concat \ - && sudo gem install rake bundler pg + && sudo gem install rake bundler pg \ + && sudo apk add ruby-bigdecimal # Build fluent-plugin-pgjson from scratch # Original fluent-plugin-pgjson is from https://github.com/fluent-plugins-nursery/fluent-plugin-pgjson diff --git a/src/fluentd/deploy/fluentd-config.yaml.template b/src/fluentd/deploy/fluentd-config.yaml.template index 198c8b0001..515217202a 100644 --- a/src/fluentd/deploy/fluentd-config.yaml.template +++ b/src/fluentd/deploy/fluentd-config.yaml.template @@ -118,8 +118,6 @@ data: user {{ cluster_cfg['postgresql']['user'] }} password {{ cluster_cfg['postgresql']['passwd'] }} database {{ cluster_cfg['postgresql']['db'] }} - table fc_objectsnapshots - record_col record @type memory flush_mode immediate diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index 75fdafa118..4c23e50c0c 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -24,18 +24,30 @@ class PgJsonOutput < Fluent::Plugin::Output config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer desc "The database name to connect" config_param :database, :string - desc "The table name to insert records" - config_param :table, :string desc "The user name to connect database" config_param :user, :string, default: nil desc "The password to connect database" config_param :password, :string, default: nil, secret: true - desc "The column name for the time" - config_param :time_col, :string, default: "time" - desc "The column name for the tag" - config_param :tag_col, :string, default: "tag" - desc "The column name for the record" - config_param :record_col, :string, default: "record" + desc "The column name for the insertedAt" + config_param :insertedAt_col, :string, default: "insertedAt" + desc "The column name for the updatedAt" + config_param :updatedAt_col, :string, default: "updatedAt" + desc "The column name for the uid" + config_param :uid_col, :string, default: "uid" + desc "The column name for the frameworkName" + config_param :frameworkName_col, :string, default: "frameworkName" + desc "The column name for the attemptIndex" + config_param :attemptIndex_col, :string, default: "attemptIndex" + desc "The column name for the historyType" + config_param :historyType_col, :string, default: "historyType" + desc "The column name for the taskroleName" + config_param :taskroleName_col, :string, default: "taskroleName" + desc "The column name for the taskroleIndex" + config_param :taskroleIndex_col, :string, default: "taskroleIndex" + desc "The column name for the taskAttemptIndex" + config_param :taskAttemptIndex_col, :string, default: "taskAttemptIndex" + desc "The column name for the snapshot" + config_param :snapshot_col, :string, default: "snapshot" desc "If true, insert records formatted as msgpack" config_param :msgpack, :bool, default: false desc "JSON encoder (yajl/json)" @@ -44,6 +56,7 @@ class PgJsonOutput < Fluent::Plugin::Output config_param :time_format, :string, default: "%F %T.%N %z" config_param :reset_connection_interval, :integer, default: 5 + desc "Control the reset connection interval to be greater than 5 seconds globally" config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE @@ -52,10 +65,6 @@ class PgJsonOutput < Fluent::Plugin::Output def initialize super - # if @conn == nil: The connection has not been established yet. - # if @conn != nil: The connection has been established, but it could be a broken connection. - @conn = nil - @thread_lock = Mutex.new @last_reset_ts = 0 end @@ -74,67 +83,48 @@ def configure(conf) end def init_connection - # This function is used to init the connection (first connecting). - # If @conn == nil, try to establish a connection, and set @conn. - # If @conn != nil, skip this function. - # @conn could be a nil or non-nil value after this call. - if @conn.nil? - begin - log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." - @conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) - rescue PG::Error - log.debug "[pgjson] [init_connection] Failed to initialize a connection." - if ! @conn.nil? - @conn.close() - @conn = nil - end - rescue => err - log.debug "#{err}" - end - end - end - - def timestamp - Time.now.getutc.to_i - end - - def reset_connection - # This function try to fix the broken connection to database. - # if @conn == nil, call init_connection - # if @conn != nil, call @conn.reset - # This function must be protected by thread_lock to ensure thread safety. + # This function is used to create a connection. + thread = Thread.current begin - if timestamp - @last_reset_ts > @reset_connection_interval - if @conn.nil? - log.debug "[pgjson] [reset_connection] Call init_connection." - init_connection - else - log.debug "[pgjson] [reset_connection] Reset Connection." - @conn.reset - end - else - log.debug "[pgjson] [reset_connection] Skip reset." + log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." + thread[:conn] = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) + rescue PG::Error + log.debug "[pgjson] [init_connection] Failed to initialize a connection." + if ! thread[:conn].nil? + thread[:conn].close() + thread[:conn] = nil end rescue => err - log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" - ensure - @last_reset_ts = timestamp + log.debug "#{err}" end end - def shutdown - begin - @thread_lock.lock() - if ! @conn.nil? - @conn.close() - @conn = nil - end - rescue => err - log.debug "[pgjson] [shutdown] #{err.class}, #{err.message}" - ensure - @thread_lock.unlock() + def reset_connection + # This function try to fix the broken connection to database. + # if conn == nil, call init_connection + # if conn != nil, call conn.reset + thread = Thread.current + begin + if timestamp - @last_reset_ts > @reset_connection_interval + if thread[:conn].nil? + log.debug "[pgjson] [reset_connection] Call init_connection." + init_connection + else + log.debug "[pgjson] [reset_connection] Reset Connection." + thread[:conn].reset + end + else + log.debug "[pgjson] [reset_connection] Skip reset." + end + rescue => err + log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" + ensure + @last_reset_ts = timestamp end - super + end + + def timestamp + Time.now.getutc.to_i end def formatted_to_msgpack_binary @@ -151,43 +141,60 @@ def format(tag, time, record) def write(chunk) log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" - @thread_lock.lock() - init_connection - if ! @conn.nil? + thread = Thread.current + if ! thread.key?(:conn) + init_connection + end + if ! thread[:conn].nil? begin - @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") - tag = chunk.metadata.tag chunk.msgpack_each do |time, record| - @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" + kind = record["objectSnapshot"]["kind"] + log.debug "log type: #{kind}" + if kind == "Framework" + thread[:conn].exec("COPY framework_history (#{@insertedAt_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@historyType_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") + frameworkName = record["objectSnapshot"]["metadata"]["name"] + attemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"] + historyType = "retry" + snapshot = record_value(record["objectSnapshot"]) + thread[:conn].put_copy_data "#{time}\x01#{frameworkName}\x01#{attemptIndex}\x01#{historyType}\x01#{snapshot}\n" + elsif kind == "Pod" + thread[:conn].exec("COPY pods (#{@insertedAt_col}, #{@updatedAt_col}, #{@uid_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@taskroleName_col}, #{@taskroleIndex_col}, #{@taskAttemptIndex_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") + uid = record["objectSnapshot"]["metadata"]["uid"] + frameworkName = record["objectSnapshot"]["metadata"]["name"] + attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_FRAMEWORK_ATTEMPT_ID"] + taskroleName = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASKROLE_NAME"] + taskroleIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_INDEX"] + taskAttemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_ATTEMPT_ID"] + snapshot = record_value(record["objectSnapshot"]) + thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{taskroleName}\x01#{taskroleIndex}\x01#{taskAttemptIndex}\x01#{snapshot}\n" + end end rescue PG::ConnectionBad, PG::UnableToSend => err # connection error - reset_connection # try to reset broken connection, and wait for next retry - errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - raise errmsg + reset_connection # try to reset broken connection, and wait for next retry + log.debug "%s while copy data: %s" % [ err.class.name, err.message ] + retry rescue PG::Error => err log.debug "[pgjson] [write] Error while writing, error is #{err.class}" errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - @conn.put_copy_end( errmsg ) - @conn.get_result + thread[:conn].put_copy_end( errmsg ) + thread[:conn].get_result raise errmsg else - @conn.put_copy_end - res = @conn.get_result + thread[:conn].put_copy_end + res = thread[:conn].get_result raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" - ensure - @thread_lock.unlock() end else - @thread_lock.unlock() raise "Cannot connect to db host." end end def record_value(record) + thread = Thread.current if @msgpack - "\\#{@conn.escape_bytea(record.to_msgpack)}" + "\\#{thread[:conn].escape_bytea(record.to_msgpack)}" else json = @encoder.dump(record) json.gsub!(/\\/){ '\\\\' } diff --git a/src/postgresql/src/init_table.sql b/src/postgresql/src/init_table.sql index 65ae96b0a8..e467c5af66 100644 --- a/src/postgresql/src/init_table.sql +++ b/src/postgresql/src/init_table.sql @@ -1,8 +1,21 @@ /* This init script will be run every time when the database container is started. */ -CREATE TABLE IF NOT EXISTS fc_objectsnapshots ( - tag Text, - time Timestamptz, - record JsonB +CREATE TABLE IF NOT EXISTS framework_history ( + insertedAt Timestamptz, + uid SERIAL, + frameworkName VARCHAR(64), + attemptIndex INTEGER, + historyType VARCHAR(16), + snapshot TEXT +); +CREATE TABLE IF NOT EXISTS pods ( + insertedAt Timestamptz, + updatedAt Timestamptz, + uid VARCHAR(36), + frameworkName VARCHAR(64), + attemptIndex INTEGER, + taskroleName VARCHAR(256), + taskroleIndex INTEGER, + taskAttemptIndex INTEGER, + snapshot TEXT ); -CREATE INDEX IF NOT EXISTS uidindex ON fc_objectsnapshots USING gin ((record -> 'objectSnapshot' -> 'metadata' -> 'uid')); diff --git a/src/rest-server/src/models/v2/job-attempt.js b/src/rest-server/src/models/v2/job-attempt.js index 5c9ea86371..b621fd2ef3 100644 --- a/src/rest-server/src/models/v2/job-attempt.js +++ b/src/rest-server/src/models/v2/job-attempt.js @@ -92,14 +92,14 @@ if (sequelize && launcherConfig.enabledJobHistory) { return {status: 404, data: null}; } - const sqlSentence = `SELECT (record->'objectSnapshot') as data FROM fc_objectsnapshots WHERE ` + - `record->'objectSnapshot'->'metadata'->'uid' ? '${uid}' and ` + - `record->'objectSnapshot'->'kind' ? 'Framework' ` + - `ORDER BY cast(record->'objectSnapshot'->'status'->'attemptStatus'->>'id' as INTEGER) ASC;`; + const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` + + `frameworkName = '${encodeName(frameworkName)}' ` + + `ORDER BY uid ASC;`; const pgResult = (await sequelize.query(sqlSentence))[0]; + const jobRetries = await Promise.all( pgResult.map((row) => { - return convertToJobAttempt(row.data); + return convertToJobAttempt(JSON.parse(row.data)); }), ); attemptData.push( @@ -107,7 +107,6 @@ if (sequelize && launcherConfig.enabledJobHistory) { return {...jobRetry, isLatest: false}; }), ); - return {status: 200, data: attemptData}; }; @@ -115,6 +114,7 @@ if (sequelize && launcherConfig.enabledJobHistory) { let uid; let attemptFramework; let response; + try { response = await k8sModel.getClient().get( launcherConfig.frameworkPath(encodeName(frameworkName)), @@ -146,17 +146,16 @@ if (sequelize && launcherConfig.enabledJobHistory) { if (isNil(uid)) { return {status: 404, data: null}; } - const sqlSentence = `SELECT (record->'objectSnapshot') as data FROM fc_objectsnapshots WHERE ` + - `record->'objectSnapshot'->'metadata'->'uid' ? '${uid}' and ` + - `record->'objectSnapshot'->'status'->'attemptStatus'->>'id' = '${jobAttemptIndex}' and ` + - `record->'objectSnapshot'->'kind' ? 'Framework' ` + - `ORDER BY cast(record->'objectSnapshot'->'status'->'attemptStatus'->>'id' as INTEGER) ASC;`; + const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` + + `frameworkName = '${encodeName(frameworkName)}' and ` + + `attemptIndex = '${jobAttemptIndex}' ` + + `ORDER BY uid ASC;`; const pgResult = (await sequelize.query(sqlSentence))[0]; if (pgResult.length === 0) { return {status: 404, data: null}; } else { - attemptFramework = pgResult[0].data; + attemptFramework = JSON.parse(pgResult[0].data); const attemptDetail = await convertToJobAttempt(attemptFramework); return {status: 200, data: {...attemptDetail, isLatest: false}}; }