Skip to content
This repository was archived by the owner on Jun 6, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/fluentd/build/fluentd.k8s.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ RUN apk add --no-cache --update --virtual .build-deps \
sudo build-base ruby-dev make gcc libc-dev postgresql-dev git \
&& apk add --no-cache --update libpq \
&& sudo gem install fluent-plugin-concat \
&& sudo gem install rake bundler pg
&& sudo gem install rake bundler pg \
&& sudo apk add ruby-bigdecimal

# Build fluent-plugin-pgjson from scratch
# Original fluent-plugin-pgjson is from https://github.com/fluent-plugins-nursery/fluent-plugin-pgjson
Expand Down
2 changes: 0 additions & 2 deletions src/fluentd/deploy/fluentd-config.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ data:
user {{ cluster_cfg['postgresql']['user'] }}
password {{ cluster_cfg['postgresql']['passwd'] }}
database {{ cluster_cfg['postgresql']['db'] }}
table fc_objectsnapshots
record_col record
<buffer>
@type memory
flush_mode immediate
Expand Down
173 changes: 90 additions & 83 deletions src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,30 @@ class PgJsonOutput < Fluent::Plugin::Output
config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer
desc "The database name to connect"
config_param :database, :string
desc "The table name to insert records"
config_param :table, :string
desc "The user name to connect database"
config_param :user, :string, default: nil
desc "The password to connect database"
config_param :password, :string, default: nil, secret: true
desc "The column name for the time"
config_param :time_col, :string, default: "time"
desc "The column name for the tag"
config_param :tag_col, :string, default: "tag"
desc "The column name for the record"
config_param :record_col, :string, default: "record"
desc "The column name for the insertedAt"
config_param :insertedAt_col, :string, default: "insertedAt"
desc "The column name for the updatedAt"
config_param :updatedAt_col, :string, default: "updatedAt"
desc "The column name for the uid"
config_param :uid_col, :string, default: "uid"
desc "The column name for the frameworkName"
config_param :frameworkName_col, :string, default: "frameworkName"
desc "The column name for the attemptIndex"
config_param :attemptIndex_col, :string, default: "attemptIndex"
desc "The column name for the historyType"
config_param :historyType_col, :string, default: "historyType"
desc "The column name for the taskroleName"
config_param :taskroleName_col, :string, default: "taskroleName"
desc "The column name for the taskroleIndex"
config_param :taskroleIndex_col, :string, default: "taskroleIndex"
desc "The column name for the taskAttemptIndex"
config_param :taskAttemptIndex_col, :string, default: "taskAttemptIndex"
desc "The column name for the snapshot"
config_param :snapshot_col, :string, default: "snapshot"
desc "If true, insert records formatted as msgpack"
config_param :msgpack, :bool, default: false
desc "JSON encoder (yajl/json)"
Expand All @@ -44,6 +56,7 @@ class PgJsonOutput < Fluent::Plugin::Output
config_param :time_format, :string, default: "%F %T.%N %z"

config_param :reset_connection_interval, :integer, default: 5
desc "Control the reset connection interval to be greater than 5 seconds globally"

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
Expand All @@ -52,10 +65,6 @@ class PgJsonOutput < Fluent::Plugin::Output

def initialize
super
# if @conn == nil: The connection has not been established yet.
# if @conn != nil: The connection has been established, but it could be a broken connection.
@conn = nil
@thread_lock = Mutex.new
@last_reset_ts = 0
end

Expand All @@ -74,67 +83,48 @@ def configure(conf)
end

def init_connection
# This function is used to init the connection (first connecting).
# If @conn == nil, try to establish a connection, and set @conn.
# If @conn != nil, skip this function.
# @conn could be a nil or non-nil value after this call.
if @conn.nil?
begin
log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..."
@conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password)
rescue PG::Error
log.debug "[pgjson] [init_connection] Failed to initialize a connection."
if ! @conn.nil?
@conn.close()
@conn = nil
end
rescue => err
log.debug "#{err}"
end
end
end

def timestamp
Time.now.getutc.to_i
end

def reset_connection
# This function try to fix the broken connection to database.
# if @conn == nil, call init_connection
# if @conn != nil, call @conn.reset
# This function must be protected by thread_lock to ensure thread safety.
# This function is used to create a connection.
thread = Thread.current
begin
if timestamp - @last_reset_ts > @reset_connection_interval
if @conn.nil?
log.debug "[pgjson] [reset_connection] Call init_connection."
init_connection
else
log.debug "[pgjson] [reset_connection] Reset Connection."
@conn.reset
end
else
log.debug "[pgjson] [reset_connection] Skip reset."
log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..."
thread[:conn] = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password)
rescue PG::Error
log.debug "[pgjson] [init_connection] Failed to initialize a connection."
if ! thread[:conn].nil?
thread[:conn].close()
thread[:conn] = nil
end
rescue => err
log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}"
ensure
@last_reset_ts = timestamp
log.debug "#{err}"
end
end

def shutdown
begin
@thread_lock.lock()
if ! @conn.nil?
@conn.close()
@conn = nil
end
rescue => err
log.debug "[pgjson] [shutdown] #{err.class}, #{err.message}"
ensure
@thread_lock.unlock()
def reset_connection
# This function try to fix the broken connection to database.
# if conn == nil, call init_connection
# if conn != nil, call conn.reset
thread = Thread.current
begin
if timestamp - @last_reset_ts > @reset_connection_interval
if thread[:conn].nil?
log.debug "[pgjson] [reset_connection] Call init_connection."
init_connection
else
log.debug "[pgjson] [reset_connection] Reset Connection."
thread[:conn].reset
end
else
log.debug "[pgjson] [reset_connection] Skip reset."
end
rescue => err
log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}"
ensure
@last_reset_ts = timestamp
end
super
end

def timestamp
Time.now.getutc.to_i
end

def formatted_to_msgpack_binary
Expand All @@ -151,43 +141,60 @@ def format(tag, time, record)

def write(chunk)
log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}"
@thread_lock.lock()
init_connection
if ! @conn.nil?
thread = Thread.current
if ! thread.key?(:conn)
init_connection
end
if ! thread[:conn].nil?
begin
@conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'")
tag = chunk.metadata.tag
chunk.msgpack_each do |time, record|
@conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n"
kind = record["objectSnapshot"]["kind"]
log.debug "log type: #{kind}"
if kind == "Framework"
thread[:conn].exec("COPY framework_history (#{@insertedAt_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@historyType_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'")
frameworkName = record["objectSnapshot"]["metadata"]["name"]
attemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"]
historyType = "retry"
snapshot = record_value(record["objectSnapshot"])
thread[:conn].put_copy_data "#{time}\x01#{frameworkName}\x01#{attemptIndex}\x01#{historyType}\x01#{snapshot}\n"
elsif kind == "Pod"
thread[:conn].exec("COPY pods (#{@insertedAt_col}, #{@updatedAt_col}, #{@uid_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@taskroleName_col}, #{@taskroleIndex_col}, #{@taskAttemptIndex_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'")
uid = record["objectSnapshot"]["metadata"]["uid"]
frameworkName = record["objectSnapshot"]["metadata"]["name"]
attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_FRAMEWORK_ATTEMPT_ID"]
taskroleName = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASKROLE_NAME"]
taskroleIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_INDEX"]
taskAttemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_ATTEMPT_ID"]
snapshot = record_value(record["objectSnapshot"])
thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{taskroleName}\x01#{taskroleIndex}\x01#{taskAttemptIndex}\x01#{snapshot}\n"
end
end
rescue PG::ConnectionBad, PG::UnableToSend => err
# connection error
reset_connection # try to reset broken connection, and wait for next retry
errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
raise errmsg
reset_connection # try to reset broken connection, and wait for next retry
log.debug "%s while copy data: %s" % [ err.class.name, err.message ]
retry
rescue PG::Error => err
log.debug "[pgjson] [write] Error while writing, error is #{err.class}"
errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
@conn.put_copy_end( errmsg )
@conn.get_result
thread[:conn].put_copy_end( errmsg )
thread[:conn].get_result
raise errmsg
else
@conn.put_copy_end
res = @conn.get_result
thread[:conn].put_copy_end
res = thread[:conn].get_result
raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK
log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}"
ensure
@thread_lock.unlock()
end
else
@thread_lock.unlock()
raise "Cannot connect to db host."
end
end

def record_value(record)
thread = Thread.current
if @msgpack
"\\#{@conn.escape_bytea(record.to_msgpack)}"
"\\#{thread[:conn].escape_bytea(record.to_msgpack)}"
else
json = @encoder.dump(record)
json.gsub!(/\\/){ '\\\\' }
Expand Down
23 changes: 18 additions & 5 deletions src/postgresql/src/init_table.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
/* This init script will be run every time when the database container is started. */

CREATE TABLE IF NOT EXISTS fc_objectsnapshots (
tag Text,
time Timestamptz,
record JsonB
CREATE TABLE IF NOT EXISTS framework_history (
insertedAt Timestamptz,
uid SERIAL,
frameworkName VARCHAR(64),
attemptIndex INTEGER,
historyType VARCHAR(16),
snapshot TEXT
);
CREATE TABLE IF NOT EXISTS pods (
insertedAt Timestamptz,
updatedAt Timestamptz,
uid VARCHAR(36),
frameworkName VARCHAR(64),
attemptIndex INTEGER,
taskroleName VARCHAR(256),
taskroleIndex INTEGER,
taskAttemptIndex INTEGER,
snapshot TEXT
);
CREATE INDEX IF NOT EXISTS uidindex ON fc_objectsnapshots USING gin ((record -> 'objectSnapshot' -> 'metadata' -> 'uid'));
23 changes: 11 additions & 12 deletions src/rest-server/src/models/v2/job-attempt.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,29 +92,29 @@ if (sequelize && launcherConfig.enabledJobHistory) {
return {status: 404, data: null};
}

const sqlSentence = `SELECT (record->'objectSnapshot') as data FROM fc_objectsnapshots WHERE ` +
`record->'objectSnapshot'->'metadata'->'uid' ? '${uid}' and ` +
`record->'objectSnapshot'->'kind' ? 'Framework' ` +
`ORDER BY cast(record->'objectSnapshot'->'status'->'attemptStatus'->>'id' as INTEGER) ASC;`;
const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` +
`frameworkName = '${encodeName(frameworkName)}' ` +
`ORDER BY uid ASC;`;
const pgResult = (await sequelize.query(sqlSentence))[0];

const jobRetries = await Promise.all(
pgResult.map((row) => {
return convertToJobAttempt(row.data);
return convertToJobAttempt(JSON.parse(row.data));
}),
);
attemptData.push(
...jobRetries.map((jobRetry) => {
return {...jobRetry, isLatest: false};
}),
);

return {status: 200, data: attemptData};
};

const get = async (frameworkName, jobAttemptIndex) => {
let uid;
let attemptFramework;
let response;

try {
response = await k8sModel.getClient().get(
launcherConfig.frameworkPath(encodeName(frameworkName)),
Expand Down Expand Up @@ -146,17 +146,16 @@ if (sequelize && launcherConfig.enabledJobHistory) {
if (isNil(uid)) {
return {status: 404, data: null};
}
const sqlSentence = `SELECT (record->'objectSnapshot') as data FROM fc_objectsnapshots WHERE ` +
`record->'objectSnapshot'->'metadata'->'uid' ? '${uid}' and ` +
`record->'objectSnapshot'->'status'->'attemptStatus'->>'id' = '${jobAttemptIndex}' and ` +
`record->'objectSnapshot'->'kind' ? 'Framework' ` +
`ORDER BY cast(record->'objectSnapshot'->'status'->'attemptStatus'->>'id' as INTEGER) ASC;`;
const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` +
`frameworkName = '${encodeName(frameworkName)}' and ` +
`attemptIndex = '${jobAttemptIndex}' ` +
`ORDER BY uid ASC;`;
const pgResult = (await sequelize.query(sqlSentence))[0];

if (pgResult.length === 0) {
return {status: 404, data: null};
} else {
attemptFramework = pgResult[0].data;
attemptFramework = JSON.parse(pgResult[0].data);
const attemptDetail = await convertToJobAttempt(attemptFramework);
return {status: 200, data: {...attemptDetail, isLatest: false}};
}
Expand Down