From 5044906d40d2d46ce1bd5adccd7ae48a36572f1a Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Wed, 24 Jun 2020 03:38:18 +0000 Subject: [PATCH 01/14] update --- src/fluentd/build/fluentd.k8s.dockerfile | 2 + src/fluentd/build/fluentd.ubuntu | 47 +++++ src/fluentd/build/utils.md | 32 +++ src/fluentd/code/out_pgjson_multiconn.rb | 161 ++++++++++++++ src/fluentd/code/out_pgjson_multiconn_sync.rb | 195 +++++++++++++++++ src/fluentd/code/out_pgjson_original.rb | 133 ++++++++++++ src/fluentd/code/out_pgjson_single.rb | 198 ++++++++++++++++++ src/fluentd/code/out_pgjson_threadlocal.rb | 175 ++++++++++++++++ src/fluentd/err | 0 src/fluentd/run.py | 17 ++ src/fluentd/run.sh | 5 + .../src/fluent-plugin-pgjson/example.conf | 35 ++-- .../lib/fluent/plugin/out_pgjson.rb | 125 +++++------ .../src/fluent-plugin-pgjson/test.conf | 19 ++ 14 files changed, 1057 insertions(+), 87 deletions(-) create mode 100644 src/fluentd/build/fluentd.ubuntu create mode 100644 src/fluentd/build/utils.md create mode 100644 src/fluentd/code/out_pgjson_multiconn.rb create mode 100644 src/fluentd/code/out_pgjson_multiconn_sync.rb create mode 100644 src/fluentd/code/out_pgjson_original.rb create mode 100644 src/fluentd/code/out_pgjson_single.rb create mode 100644 src/fluentd/code/out_pgjson_threadlocal.rb create mode 100644 src/fluentd/err create mode 100644 src/fluentd/run.py create mode 100644 src/fluentd/run.sh create mode 100644 src/fluentd/src/fluent-plugin-pgjson/test.conf diff --git a/src/fluentd/build/fluentd.k8s.dockerfile b/src/fluentd/build/fluentd.k8s.dockerfile index ab2a422aaf..d6ddf98940 100644 --- a/src/fluentd/build/fluentd.k8s.dockerfile +++ b/src/fluentd/build/fluentd.k8s.dockerfile @@ -17,6 +17,8 @@ FROM fluent/fluentd:v1.7-1 +# affine + USER root RUN apk add --no-cache --update --virtual .build-deps \ sudo build-base ruby-dev make gcc libc-dev postgresql-dev git \ diff --git a/src/fluentd/build/fluentd.ubuntu b/src/fluentd/build/fluentd.ubuntu new file mode 100644 index 0000000000..476c00e273 --- /dev/null +++ b/src/fluentd/build/fluentd.ubuntu @@ -0,0 +1,47 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +# docker build -f fluentd.ubuntu.dockerfile -t gusui/fluentd:original . --no-cache +# docker build -f fluentd.ubuntu.dockerfile -t gusui/fluentd:singlethread . --no-cache +# docker build -f fluentd.ubuntu.dockerfile -t gusui/fluentd:multiconn . --no-cache + +FROM fluent/fluentd:v1.7-1 + +USER root +RUN apt-get install postgresql postgresql-client libpq5 libpq-dev \ +&& /usr/sbin/td-agent-gem install fluent-plugin-concat \ +&& sudo /usr/sbin/td-agent-gem install rake bundler \ +&& sudo /usr/sbin/td-agent-gem install pg + +# Build fluent-plugin-pgjson from scratch +# Original fluent-plugin-pgjson is from https://github.com/fluent-plugins-nursery/fluent-plugin-pgjson +# Original plugin cannot retry connecting when database connection is lost, +# and is not thread-safe. These two problems are fixed by modifying codes. +COPY src/fluent-plugin-pgjson /fluent-plugin-pgjson +RUN cd /fluent-plugin-pgjson && \ + git init && \ + git add . && \ + rake build && \ + sudo /usr/sbin/td-agent-gem install --local ./pkg/fluent-plugin-pgjson-1.0.0.gem \ + rm -rf /fluent-plugin-pgjson + +# cleanup +RUN sudo gem sources --clear-all \ + && apk del .build-deps \ + && rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem + +COPY build/fluent.conf /fluentd/etc/ diff --git a/src/fluentd/build/utils.md b/src/fluentd/build/utils.md new file mode 100644 index 0000000000..4826290b65 --- /dev/null +++ b/src/fluentd/build/utils.md @@ -0,0 +1,32 @@ + +# td-agent +sudo /etc/init.d/td-agent restart + +/var/log/td-agent/td-agent.log +sudo vim /etc/td-agent/td-agent.conf + +sudo /usr/sbin/td-agent-gem install pg --version "=1.1.4" +path: /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-pgjson + +# postgres +select count(*) from test_pg; +delete from test_pg; + +# docker +cd src/fluentd +docker build -f build/fluentd.k8s.dockerfile -t gusui/fluentd:multiconn . --no-cache +docker build -f build/fluentd.k8s.dockerfile -t gusui/fluentd:threadlocal . --no-cache +docker push gusui/fluentd:original +docker run -p 8888:8888 -v $(pwd)/tmp:/fluentd/etc -e FLUENTD_CONF=fluentd.conf -d gusui/fluentd:original +docker run -p 8888:8888 -v $(pwd)/tmp:/fluentd/etc -e FLUENTD_CONF=fluentd.conf -d gusui/fluentd:multiconn +docker run -p 8888:8888 -v $(pwd)/tmp:/fluentd/etc -e FLUENTD_CONF=fluentd.conf -d gusui/fluentd:threadlocal + +bash run.sh + +docker ps + +docker logs CONTAINERID +docker logs --tails 10 CONTAINERID + +docker stop $(docker ps -a -q) +docker rm $(docker ps -a -q) \ No newline at end of file diff --git a/src/fluentd/code/out_pgjson_multiconn.rb b/src/fluentd/code/out_pgjson_multiconn.rb new file mode 100644 index 0000000000..d3d800764a --- /dev/null +++ b/src/fluentd/code/out_pgjson_multiconn.rb @@ -0,0 +1,161 @@ +# Copyright (c) 2012 OKUNO Akihiro +# Portions Copyright (c) Microsoft Corporation +# +# Apache License, Version 2.0 + +require "fluent/plugin/output" +require "pg" +require "yajl" +require "json" + +module Fluent::Plugin + class PgJsonOutput < Fluent::Plugin::Output + Fluent::Plugin.register_output("pgjson", self) + + helpers :compat_parameters + + DEFAULT_BUFFER_TYPE = "memory" + + desc "The hostname of PostgreSQL server" + config_param :host, :string, default: "localhost" + desc "The port of PostgreSQL server" + config_param :port, :integer, default: 5432 + desc "Set the sslmode to enable Eavesdropping protection/MITM protection" + config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer + desc "The database name to connect" + config_param :database, :string + desc "The table name to insert records" + config_param :table, :string + desc "The user name to connect database" + config_param :user, :string, default: nil + desc "The password to connect database" + config_param :password, :string, default: nil, secret: true + desc "The column name for the time" + config_param :time_col, :string, default: "time" + desc "The column name for the tag" + config_param :tag_col, :string, default: "tag" + desc "The column name for the record" + config_param :record_col, :string, default: "record" + desc "If true, insert records formatted as msgpack" + config_param :msgpack, :bool, default: false + desc "JSON encoder (yajl/json)" + config_param :encoder, :enum, list: [:yajl, :json], default: :yajl + + config_param :time_format, :string, default: "%F %T.%N %z" + + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + config_set_default :chunk_keys, ["tag"] + end + + def initialize + super + @last_reset_ts = 0 + end + + def configure(conf) + compat_parameters_convert(conf, :buffer) + super + unless @chunk_key_tag + raise Fluent::ConfigError, "'tag' in chunk_keys is required." + end + @encoder = case @encoder + when :yajl + Yajl + when :json + JSON + end + end + + def init_connection + # This function is used to create a connection. + begin + log.info "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." + conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) + rescue PG::Error + log.info "[pgjson] [init_connection] Failed to initialize a connection." + if ! conn.nil? + conn.close() + conn = nil + end + rescue => err + log.info "#{err}" + end + conn + end + + def timestamp + Time.now.getutc.to_i + end + + def shutdown + # begin + # @thread_lock.lock() + # if ! @conn.nil? + # @conn.close() + # @conn = nil + # end + # rescue => err + # log.info "[pgjson] [shutdown] #{err.class}, #{err.message}" + # ensure + # @thread_lock.unlock() + # end + super + end + + def formatted_to_msgpack_binary + true + end + + def multi_workers_ready? + true + end + + def format(tag, time, record) + [Time.at(time).strftime(@time_format), record].to_msgpack + end + + def write(chunk) + log.info "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" + conn = init_connection + if ! conn.nil? + begin + conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") + tag = chunk.metadata.tag + chunk.msgpack_each do |time, record| + conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record, conn)}\n" + end + rescue PG::ConnectionBad, PG::UnableToSend => err + # connection error + conn = init_connection # try to reset broken connection, and wait for next retry + log.info "%s while copy data: %s" % [ err.class.name, err.message ] + retry + rescue PG::Error => err + log.info "[pgjson] [write] Error while writing, error is #{err.class}" + errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] + conn.put_copy_end( errmsg ) + conn.get_result + raise errmsg + else + conn.put_copy_end + res = conn.get_result + raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK + conn.close() + log.info "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" + end + else + raise "Cannot connect to db host." + end + end + + def record_value(record, conn) + if @msgpack + "\\#{conn.escape_bytea(record.to_msgpack)}" + else + json = @encoder.dump(record) + json.gsub!(/\\/){ '\\\\' } + json + end + end + end +end diff --git a/src/fluentd/code/out_pgjson_multiconn_sync.rb b/src/fluentd/code/out_pgjson_multiconn_sync.rb new file mode 100644 index 0000000000..0bbf2d0ebc --- /dev/null +++ b/src/fluentd/code/out_pgjson_multiconn_sync.rb @@ -0,0 +1,195 @@ +# Copyright (c) 2012 OKUNO Akihiro +# Portions Copyright (c) Microsoft Corporation +# +# Apache License, Version 2.0 + +require "fluent/plugin/output" +require "pg" +require "yajl" +require "json" + +module Fluent::Plugin + class PgJsonOutput < Fluent::Plugin::Output + Fluent::Plugin.register_output("pgjson", self) + + helpers :compat_parameters + + DEFAULT_BUFFER_TYPE = "memory" + + desc "The hostname of PostgreSQL server" + config_param :host, :string, default: "localhost" + desc "The port of PostgreSQL server" + config_param :port, :integer, default: 5432 + desc "Set the sslmode to enable Eavesdropping protection/MITM protection" + config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer + desc "The database name to connect" + config_param :database, :string + desc "The table name to insert records" + config_param :table, :string + desc "The user name to connect database" + config_param :user, :string, default: nil + desc "The password to connect database" + config_param :password, :string, default: nil, secret: true + desc "The column name for the time" + config_param :time_col, :string, default: "time" + desc "The column name for the tag" + config_param :tag_col, :string, default: "tag" + desc "The column name for the record" + config_param :record_col, :string, default: "record" + desc "If true, insert records formatted as msgpack" + config_param :msgpack, :bool, default: false + desc "JSON encoder (yajl/json)" + config_param :encoder, :enum, list: [:yajl, :json], default: :yajl + + config_param :time_format, :string, default: "%F %T.%N %z" + + config_param :reset_connection_interval, :integer, default: 5 + + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + config_set_default :chunk_keys, ["tag"] + end + + def initialize + super + # if @conn == nil: The connection has not been established yet. + # if @conn != nil: The connection has been established, but it could be a broken connection. + # @conn = nil + # @thread_lock = Mutex.new + @last_reset_ts = 0 + end + + def configure(conf) + compat_parameters_convert(conf, :buffer) + super + unless @chunk_key_tag + raise Fluent::ConfigError, "'tag' in chunk_keys is required." + end + @encoder = case @encoder + when :yajl + Yajl + when :json + JSON + end + end + + def init_connection + # This function is used to init the connection (first connecting). + # If @conn == nil, try to establish a connection, and set @conn. + # If @conn != nil, skip this function. + # @conn could be a nil or non-nil value after this call. + begin + log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." + conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) + rescue PG::Error + log.debug "[pgjson] [init_connection] Failed to initialize a connection." + if ! conn.nil? + conn.close() + conn = nil + end + rescue => err + log.debug "#{err}" + end + conn + end + + def timestamp + Time.now.getutc.to_i + end + + def reset_connection + # This function try to fix the broken connection to database. + # if @conn == nil, call init_connection + # if @conn != nil, call @conn.reset + # This function must be protected by thread_lock to ensure thread safety. + begin + if timestamp - @last_reset_ts > @reset_connection_interval + if @conn.nil? + log.debug "[pgjson] [reset_connection] Call init_connection." + init_connection + else + log.debug "[pgjson] [reset_connection] Reset Connection." + @conn.reset + end + else + log.debug "[pgjson] [reset_connection] Skip reset." + end + rescue => err + log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" + ensure + @last_reset_ts = timestamp + end + end + + def shutdown + # begin + # @thread_lock.lock() + # if ! @conn.nil? + # @conn.close() + # @conn = nil + # end + # rescue => err + # log.debug "[pgjson] [shutdown] #{err.class}, #{err.message}" + # ensure + # @thread_lock.unlock() + # end + super + end + + def formatted_to_msgpack_binary + true + end + + def multi_workers_ready? + true + end + + def format(tag, time, record) + [Time.at(time).strftime(@time_format), record].to_msgpack + end + + def write(chunk) + log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" + # @thread_lock.lock() + conn = init_connection + if ! conn.nil? + begin + conn.sync_exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") + tag = chunk.metadata.tag + chunk.msgpack_each do |time, record| + conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record, conn)}\n" + end + rescue PG::ConnectionBad, PG::UnableToSend => err + # connection error + conn = init_connection # try to reset broken connection, and wait for next retry + log.debug "%s while copy data: %s" % [ err.class.name, err.message ] + retry + rescue PG::Error => err + log.debug "[pgjson] [write] Error while writing, error is #{err.class}" + errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] + conn.put_copy_end( errmsg ) + conn.get_result + raise errmsg + else + conn.put_copy_end + res = conn.get_result + raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK + conn.close() + log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" + end + else + raise "Cannot connect to db host." + end + end + + def record_value(record, conn) + if @msgpack + "\\#{conn.escape_bytea(record.to_msgpack)}" + else + json = @encoder.dump(record) + json.gsub!(/\\/){ '\\\\' } + json + end + end + end +end diff --git a/src/fluentd/code/out_pgjson_original.rb b/src/fluentd/code/out_pgjson_original.rb new file mode 100644 index 0000000000..ef4f7ec4ec --- /dev/null +++ b/src/fluentd/code/out_pgjson_original.rb @@ -0,0 +1,133 @@ +require "fluent/plugin/output" +require "pg" +require "yajl" +require "json" + +module Fluent::Plugin + class PgJsonOutput < Fluent::Plugin::Output + Fluent::Plugin.register_output("pgjson", self) + + helpers :compat_parameters + + DEFAULT_BUFFER_TYPE = "memory" + + desc "The hostname of PostgreSQL server" + config_param :host, :string, default: "localhost" + desc "The port of PostgreSQL server" + config_param :port, :integer, default: 5432 + desc "Set the sslmode to enable Eavesdropping protection/MITM protection" + config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer + desc "The database name to connect" + config_param :database, :string + desc "The table name to insert records" + config_param :table, :string + desc "The user name to connect database" + config_param :user, :string, default: nil + desc "The password to connect database" + config_param :password, :string, default: nil, secret: true + desc "The column name for the time" + config_param :time_col, :string, default: "time" + desc "The column name for the tag" + config_param :tag_col, :string, default: "tag" + desc "The column name for the record" + config_param :record_col, :string, default: "record" + desc "If true, insert records formatted as msgpack" + config_param :msgpack, :bool, default: false + desc "JSON encoder (yajl/json)" + config_param :encoder, :enum, list: [:yajl, :json], default: :yajl + + config_param :time_format, :string, default: "%F %T.%N %z" + + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + config_set_default :chunk_keys, ["tag"] + end + + def initialize + super + @conn = nil + end + + def configure(conf) + compat_parameters_convert(conf, :buffer) + super + unless @chunk_key_tag + raise Fluent::ConfigError, "'tag' in chunk_keys is required." + end + @encoder = case @encoder + when :yajl + Yajl + when :json + JSON + end + end + + def shutdown + if ! @conn.nil? and ! @conn.finished? + @conn.close() + end + + super + end + + def formatted_to_msgpack_binary + true + end + + def multi_workers_ready? + true + end + + def format(tag, time, record) + [Time.at(time).strftime(@time_format), record].to_msgpack + end + + def write(chunk) + init_connection + @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") + begin + tag = chunk.metadata.tag + chunk.msgpack_each do |time, record| + @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" + end + rescue => err + errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] + @conn.put_copy_end( errmsg ) + @conn.get_result + raise + else + @conn.put_copy_end + res = @conn.get_result + raise res.result_error_message if res.result_status!=PG::PGRES_COMMAND_OK + end + end + + private + + def init_connection + if @conn.nil? + log.debug "connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." + + begin + @conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) + rescue + if ! @conn.nil? + @conn.close() + @conn = nil + end + raise "failed to initialize connection: #$!" + end + end + end + + def record_value(record) + if @msgpack + "\\#{@conn.escape_bytea(record.to_msgpack)}" + else + json = @encoder.dump(record) + json.gsub!(/\\/){ '\\\\' } + json + end + end + end +end diff --git a/src/fluentd/code/out_pgjson_single.rb b/src/fluentd/code/out_pgjson_single.rb new file mode 100644 index 0000000000..75fdafa118 --- /dev/null +++ b/src/fluentd/code/out_pgjson_single.rb @@ -0,0 +1,198 @@ +# Copyright (c) 2012 OKUNO Akihiro +# Portions Copyright (c) Microsoft Corporation +# +# Apache License, Version 2.0 + +require "fluent/plugin/output" +require "pg" +require "yajl" +require "json" + +module Fluent::Plugin + class PgJsonOutput < Fluent::Plugin::Output + Fluent::Plugin.register_output("pgjson", self) + + helpers :compat_parameters + + DEFAULT_BUFFER_TYPE = "memory" + + desc "The hostname of PostgreSQL server" + config_param :host, :string, default: "localhost" + desc "The port of PostgreSQL server" + config_param :port, :integer, default: 5432 + desc "Set the sslmode to enable Eavesdropping protection/MITM protection" + config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer + desc "The database name to connect" + config_param :database, :string + desc "The table name to insert records" + config_param :table, :string + desc "The user name to connect database" + config_param :user, :string, default: nil + desc "The password to connect database" + config_param :password, :string, default: nil, secret: true + desc "The column name for the time" + config_param :time_col, :string, default: "time" + desc "The column name for the tag" + config_param :tag_col, :string, default: "tag" + desc "The column name for the record" + config_param :record_col, :string, default: "record" + desc "If true, insert records formatted as msgpack" + config_param :msgpack, :bool, default: false + desc "JSON encoder (yajl/json)" + config_param :encoder, :enum, list: [:yajl, :json], default: :yajl + + config_param :time_format, :string, default: "%F %T.%N %z" + + config_param :reset_connection_interval, :integer, default: 5 + + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + config_set_default :chunk_keys, ["tag"] + end + + def initialize + super + # if @conn == nil: The connection has not been established yet. + # if @conn != nil: The connection has been established, but it could be a broken connection. + @conn = nil + @thread_lock = Mutex.new + @last_reset_ts = 0 + end + + def configure(conf) + compat_parameters_convert(conf, :buffer) + super + unless @chunk_key_tag + raise Fluent::ConfigError, "'tag' in chunk_keys is required." + end + @encoder = case @encoder + when :yajl + Yajl + when :json + JSON + end + end + + def init_connection + # This function is used to init the connection (first connecting). + # If @conn == nil, try to establish a connection, and set @conn. + # If @conn != nil, skip this function. + # @conn could be a nil or non-nil value after this call. + if @conn.nil? + begin + log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." + @conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) + rescue PG::Error + log.debug "[pgjson] [init_connection] Failed to initialize a connection." + if ! @conn.nil? + @conn.close() + @conn = nil + end + rescue => err + log.debug "#{err}" + end + end + end + + def timestamp + Time.now.getutc.to_i + end + + def reset_connection + # This function try to fix the broken connection to database. + # if @conn == nil, call init_connection + # if @conn != nil, call @conn.reset + # This function must be protected by thread_lock to ensure thread safety. + begin + if timestamp - @last_reset_ts > @reset_connection_interval + if @conn.nil? + log.debug "[pgjson] [reset_connection] Call init_connection." + init_connection + else + log.debug "[pgjson] [reset_connection] Reset Connection." + @conn.reset + end + else + log.debug "[pgjson] [reset_connection] Skip reset." + end + rescue => err + log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" + ensure + @last_reset_ts = timestamp + end + end + + def shutdown + begin + @thread_lock.lock() + if ! @conn.nil? + @conn.close() + @conn = nil + end + rescue => err + log.debug "[pgjson] [shutdown] #{err.class}, #{err.message}" + ensure + @thread_lock.unlock() + end + super + end + + def formatted_to_msgpack_binary + true + end + + def multi_workers_ready? + true + end + + def format(tag, time, record) + [Time.at(time).strftime(@time_format), record].to_msgpack + end + + def write(chunk) + log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" + @thread_lock.lock() + init_connection + if ! @conn.nil? + begin + @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") + tag = chunk.metadata.tag + chunk.msgpack_each do |time, record| + @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" + end + rescue PG::ConnectionBad, PG::UnableToSend => err + # connection error + reset_connection # try to reset broken connection, and wait for next retry + errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] + raise errmsg + rescue PG::Error => err + log.debug "[pgjson] [write] Error while writing, error is #{err.class}" + errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] + @conn.put_copy_end( errmsg ) + @conn.get_result + raise errmsg + else + @conn.put_copy_end + res = @conn.get_result + raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK + log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" + ensure + @thread_lock.unlock() + end + else + @thread_lock.unlock() + raise "Cannot connect to db host." + end + end + + def record_value(record) + if @msgpack + "\\#{@conn.escape_bytea(record.to_msgpack)}" + else + json = @encoder.dump(record) + json.gsub!(/\\/){ '\\\\' } + json + end + end + end +end diff --git a/src/fluentd/code/out_pgjson_threadlocal.rb b/src/fluentd/code/out_pgjson_threadlocal.rb new file mode 100644 index 0000000000..d9805bc797 --- /dev/null +++ b/src/fluentd/code/out_pgjson_threadlocal.rb @@ -0,0 +1,175 @@ +# Copyright (c) 2012 OKUNO Akihiro +# Portions Copyright (c) Microsoft Corporation +# +# Apache License, Version 2.0 + +require "fluent/plugin/output" +require "pg" +require "yajl" +require "json" + +module Fluent::Plugin + class PgJsonOutput < Fluent::Plugin::Output + Fluent::Plugin.register_output("pgjson", self) + + helpers :compat_parameters + + DEFAULT_BUFFER_TYPE = "memory" + + desc "The hostname of PostgreSQL server" + config_param :host, :string, default: "localhost" + desc "The port of PostgreSQL server" + config_param :port, :integer, default: 5432 + desc "Set the sslmode to enable Eavesdropping protection/MITM protection" + config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer + desc "The database name to connect" + config_param :database, :string + desc "The table name to insert records" + config_param :table, :string + desc "The user name to connect database" + config_param :user, :string, default: nil + desc "The password to connect database" + config_param :password, :string, default: nil, secret: true + desc "The column name for the time" + config_param :time_col, :string, default: "time" + desc "The column name for the tag" + config_param :tag_col, :string, default: "tag" + desc "The column name for the record" + config_param :record_col, :string, default: "record" + desc "If true, insert records formatted as msgpack" + config_param :msgpack, :bool, default: false + desc "JSON encoder (yajl/json)" + config_param :encoder, :enum, list: [:yajl, :json], default: :yajl + + config_param :time_format, :string, default: "%F %T.%N %z" + + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + config_set_default :chunk_keys, ["tag"] + end + + def initialize + super + @last_reset_ts = 0 + end + + def configure(conf) + compat_parameters_convert(conf, :buffer) + super + unless @chunk_key_tag + raise Fluent::ConfigError, "'tag' in chunk_keys is required." + end + @encoder = case @encoder + when :yajl + Yajl + when :json + JSON + end + end + + def init_connection + # This function is used to create a connection. + begin + log.info "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." + conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) + rescue PG::Error + log.info "[pgjson] [init_connection] Failed to initialize a connection." + if ! conn.nil? + conn.close() + conn = nil + end + rescue => err + log.info "#{err}" + end + thread = Thread.current + thread[:conn] = conn + end + + def reset_connection + # This function try to fix the broken connection to database. + # if conn == nil, call init_connection + # if conn != nil, call conn.reset + thread = Thread.current + begin + if timestamp - @last_reset_ts > @reset_connection_interval + if thread[:conn].nil? + log.debug "[pgjson] [reset_connection] Call init_connection." + init_connection + else + log.debug "[pgjson] [reset_connection] Reset Connection." + thread[:conn].reset + end + else + log.debug "[pgjson] [reset_connection] Skip reset." + end + rescue => err + log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" + ensure + @last_reset_ts = timestamp + end + conn + end + + def timestamp + Time.now.getutc.to_i + end + + def formatted_to_msgpack_binary + true + end + + def multi_workers_ready? + true + end + + def format(tag, time, record) + [Time.at(time).strftime(@time_format), record].to_msgpack + end + + def write(chunk) + log.info "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" + thread = Thread.current + if ! thread.key?(:conn) + init_connection + if ! thread[:conn].nil? + begin + thread[:conn].exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") + tag = chunk.metadata.tag + chunk.msgpack_each do |time, record| + thread[:conn].put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" + end + rescue PG::ConnectionBad, PG::UnableToSend => err + # connection error + reset_connection # try to reset broken connection, and wait for next retry + log.info "%s while copy data: %s" % [ err.class.name, err.message ] + retry + rescue PG::Error => err + log.info "[pgjson] [write] Error while writing, error is #{err.class}" + errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] + thread[:conn].put_copy_end( errmsg ) + thread[:conn].get_result + # thread[:conn] = nil + raise errmsg + else + thread[:conn].put_copy_end + res = thread[:conn].get_result + raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK + log.info "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" + end + else + raise "Cannot connect to db host." + end + end + + def record_value(record) + thread = Thread.current + if @msgpack + "\\#{thread[:conn].escape_bytea(record.to_msgpack)}" + else + json = @encoder.dump(record) + json.gsub!(/\\/){ '\\\\' } + json + end + end + end +end diff --git a/src/fluentd/err b/src/fluentd/err new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/fluentd/run.py b/src/fluentd/run.py new file mode 100644 index 0000000000..198534d027 --- /dev/null +++ b/src/fluentd/run.py @@ -0,0 +1,17 @@ +import _thread +import time + +# Define a function for the thread +def print_time( threadName, delay): + count = 0 + while count < 5: + # time.sleep(delay) + count += 1 + print ("%s: %s" % ( threadName, time.ctime(time.time()) )) + +# Create two threads as follows +try: + _thread.start_new_thread( print_time, ("Thread-1", 2, ) ) + _thread.start_new_thread( print_time, ("Thread-2", 4, ) ) +except: + print ("Error: unable to start thread") diff --git a/src/fluentd/run.sh b/src/fluentd/run.sh new file mode 100644 index 0000000000..e66fa99a1f --- /dev/null +++ b/src/fluentd/run.sh @@ -0,0 +1,5 @@ +for VAR in $(seq 1 10000) +do + curl -i -X POST -d "json={\"index\":$VAR}" http://localhost:8888/testpg.cycle +done + diff --git a/src/fluentd/src/fluent-plugin-pgjson/example.conf b/src/fluentd/src/fluent-plugin-pgjson/example.conf index e9f94c5b46..60be14ad3e 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/example.conf +++ b/src/fluentd/src/fluent-plugin-pgjson/example.conf @@ -1,17 +1,26 @@ - type forward + @type http + port 8888 + bind 0.0.0.0 - - type pgjson - host localhost - port 5432 - sslmode require - database fluentd - table fluentd - user postgres - password postgres - time_col time - tag_col tag - record_col record + + @type stdout +# +# type forward +# + +# +# type pgjson +# host localhost +# port 5432 +# sslmode require +# database fluentd +# table fluentd +# user postgres +# password postgres +# time_col time +# tag_col tag +# record_col record +# diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index 75fdafa118..4f06fffea0 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -52,10 +52,6 @@ class PgJsonOutput < Fluent::Plugin::Output def initialize super - # if @conn == nil: The connection has not been established yet. - # if @conn != nil: The connection has been established, but it could be a broken connection. - @conn = nil - @thread_lock = Mutex.new @last_reset_ts = 0 end @@ -74,67 +70,48 @@ def configure(conf) end def init_connection - # This function is used to init the connection (first connecting). - # If @conn == nil, try to establish a connection, and set @conn. - # If @conn != nil, skip this function. - # @conn could be a nil or non-nil value after this call. - if @conn.nil? - begin - log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." - @conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) - rescue PG::Error - log.debug "[pgjson] [init_connection] Failed to initialize a connection." - if ! @conn.nil? - @conn.close() - @conn = nil - end - rescue => err - log.debug "#{err}" - end - end - end - - def timestamp - Time.now.getutc.to_i - end - - def reset_connection - # This function try to fix the broken connection to database. - # if @conn == nil, call init_connection - # if @conn != nil, call @conn.reset - # This function must be protected by thread_lock to ensure thread safety. + # This function is used to create a connection. + thread = Thread.current begin - if timestamp - @last_reset_ts > @reset_connection_interval - if @conn.nil? - log.debug "[pgjson] [reset_connection] Call init_connection." - init_connection - else - log.debug "[pgjson] [reset_connection] Reset Connection." - @conn.reset - end - else - log.debug "[pgjson] [reset_connection] Skip reset." + log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." + thread[:conn] = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) + rescue PG::Error + log.debug "[pgjson] [init_connection] Failed to initialize a connection." + if ! thread[:conn].nil? + thread[:conn].close() + thread[:conn] = nil end rescue => err - log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" - ensure - @last_reset_ts = timestamp + log.debug "#{err}" end end - def shutdown - begin - @thread_lock.lock() - if ! @conn.nil? - @conn.close() - @conn = nil - end - rescue => err - log.debug "[pgjson] [shutdown] #{err.class}, #{err.message}" - ensure - @thread_lock.unlock() + def reset_connection + # This function try to fix the broken connection to database. + # if conn == nil, call init_connection + # if conn != nil, call conn.reset + thread = Thread.current + begin + if timestamp - @last_reset_ts > @reset_connection_interval + if thread[:conn].nil? + log.debug "[pgjson] [reset_connection] Call init_connection." + init_connection + else + log.debug "[pgjson] [reset_connection] Reset Connection." + thread[:conn].reset + end + else + log.debug "[pgjson] [reset_connection] Skip reset." + end + rescue => err + log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" + ensure + @last_reset_ts = timestamp end - super + end + + def timestamp + Time.now.getutc.to_i end def formatted_to_msgpack_binary @@ -151,43 +128,43 @@ def format(tag, time, record) def write(chunk) log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" - @thread_lock.lock() - init_connection - if ! @conn.nil? + thread = Thread.current + if ! thread.key?(:conn) + init_connection + end + if ! thread[:conn].nil? begin - @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") + thread[:conn].exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") tag = chunk.metadata.tag chunk.msgpack_each do |time, record| - @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" + thread[:conn].put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" end rescue PG::ConnectionBad, PG::UnableToSend => err # connection error - reset_connection # try to reset broken connection, and wait for next retry - errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - raise errmsg + reset_connection # try to reset broken connection, and wait for next retry + log.debug "%s while copy data: %s" % [ err.class.name, err.message ] + retry rescue PG::Error => err log.debug "[pgjson] [write] Error while writing, error is #{err.class}" errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - @conn.put_copy_end( errmsg ) - @conn.get_result + thread[:conn].put_copy_end( errmsg ) + thread[:conn].get_result raise errmsg else - @conn.put_copy_end - res = @conn.get_result + thread[:conn].put_copy_end + res = thread[:conn].get_result raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" - ensure - @thread_lock.unlock() end else - @thread_lock.unlock() raise "Cannot connect to db host." end end def record_value(record) + thread = Thread.current if @msgpack - "\\#{@conn.escape_bytea(record.to_msgpack)}" + "\\#{thread[:conn].escape_bytea(record.to_msgpack)}" else json = @encoder.dump(record) json.gsub!(/\\/){ '\\\\' } diff --git a/src/fluentd/src/fluent-plugin-pgjson/test.conf b/src/fluentd/src/fluent-plugin-pgjson/test.conf new file mode 100644 index 0000000000..a4cf67a3c0 --- /dev/null +++ b/src/fluentd/src/fluent-plugin-pgjson/test.conf @@ -0,0 +1,19 @@ + + @type http + port 8888 + bind 0.0.0.0 + + + + @type pgjson + host 10.151.40.32 + port 5432 + sslmode require + database openpai + table user_tbl + user root + password rootpass + time_col time + tag_col tag + record_col record + From 5fcb8c07de36eff59819bf9965a11a955f983893 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Tue, 7 Jul 2020 06:51:58 +0000 Subject: [PATCH 02/14] update --- src/fluentd/build/utils.md | 9 +++------ src/fluentd/run.sh | 4 ++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/fluentd/build/utils.md b/src/fluentd/build/utils.md index 4826290b65..4ca9d22c20 100644 --- a/src/fluentd/build/utils.md +++ b/src/fluentd/build/utils.md @@ -9,18 +9,16 @@ sudo /usr/sbin/td-agent-gem install pg --version "=1.1.4" path: /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-pgjson # postgres +psql -h 10.151.40.32 -W -U root -p 5432 openpai select count(*) from test_pg; delete from test_pg; # docker cd src/fluentd -docker build -f build/fluentd.k8s.dockerfile -t gusui/fluentd:multiconn . --no-cache docker build -f build/fluentd.k8s.dockerfile -t gusui/fluentd:threadlocal . --no-cache -docker push gusui/fluentd:original -docker run -p 8888:8888 -v $(pwd)/tmp:/fluentd/etc -e FLUENTD_CONF=fluentd.conf -d gusui/fluentd:original -docker run -p 8888:8888 -v $(pwd)/tmp:/fluentd/etc -e FLUENTD_CONF=fluentd.conf -d gusui/fluentd:multiconn docker run -p 8888:8888 -v $(pwd)/tmp:/fluentd/etc -e FLUENTD_CONF=fluentd.conf -d gusui/fluentd:threadlocal + bash run.sh docker ps @@ -28,5 +26,4 @@ docker ps docker logs CONTAINERID docker logs --tails 10 CONTAINERID -docker stop $(docker ps -a -q) -docker rm $(docker ps -a -q) \ No newline at end of file +docker stop $(docker ps -a -q) && docker rm $(docker ps -a -q) \ No newline at end of file diff --git a/src/fluentd/run.sh b/src/fluentd/run.sh index e66fa99a1f..39d521482e 100644 --- a/src/fluentd/run.sh +++ b/src/fluentd/run.sh @@ -1,5 +1,5 @@ -for VAR in $(seq 1 10000) +for VAR in $(seq 1 10) do - curl -i -X POST -d "json={\"index\":$VAR}" http://localhost:8888/testpg.cycle + curl -i -X POST -d "json={\"index\":$VAR, \"name\":\"piggy\"}" http://localhost:8888/testpg.cycle done From 8e209f74d4e53c592dd417119e5396ff31c60b02 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Mon, 13 Jul 2020 02:36:16 +0000 Subject: [PATCH 03/14] change pgjson for new schema --- src/fluentd/build/fluentd.k8s.dockerfile | 5 +- src/fluentd/build/fluentd.ubuntu | 47 ----- src/fluentd/build/utils.md | 38 +++- ...ut_pgjson_threadlocal.rb => out_pgjson.rb} | 38 ++-- src/fluentd/code/out_pgjson_multiconn.rb | 161 -------------- src/fluentd/code/out_pgjson_multiconn_sync.rb | 195 ----------------- src/fluentd/code/out_pgjson_original.rb | 133 ------------ src/fluentd/code/out_pgjson_single.rb | 198 ------------------ .../deploy/fluentd-config.yaml.template | 2 +- src/fluentd/err | 0 src/fluentd/run.sh | 3 +- .../src/fluent-plugin-pgjson/example.conf | 35 ++-- .../lib/fluent/plugin/out_pgjson.rb | 23 +- 13 files changed, 85 insertions(+), 793 deletions(-) delete mode 100644 src/fluentd/build/fluentd.ubuntu rename src/fluentd/code/{out_pgjson_threadlocal.rb => out_pgjson.rb} (81%) delete mode 100644 src/fluentd/code/out_pgjson_multiconn.rb delete mode 100644 src/fluentd/code/out_pgjson_multiconn_sync.rb delete mode 100644 src/fluentd/code/out_pgjson_original.rb delete mode 100644 src/fluentd/code/out_pgjson_single.rb delete mode 100644 src/fluentd/err diff --git a/src/fluentd/build/fluentd.k8s.dockerfile b/src/fluentd/build/fluentd.k8s.dockerfile index d6ddf98940..8649db6dea 100644 --- a/src/fluentd/build/fluentd.k8s.dockerfile +++ b/src/fluentd/build/fluentd.k8s.dockerfile @@ -17,14 +17,13 @@ FROM fluent/fluentd:v1.7-1 -# affine - USER root RUN apk add --no-cache --update --virtual .build-deps \ sudo build-base ruby-dev make gcc libc-dev postgresql-dev git \ && apk add --no-cache --update libpq \ && sudo gem install fluent-plugin-concat \ - && sudo gem install rake bundler pg + && sudo gem install rake bundler pg \ + && sudo apk add ruby-bigdecimal # Build fluent-plugin-pgjson from scratch # Original fluent-plugin-pgjson is from https://github.com/fluent-plugins-nursery/fluent-plugin-pgjson diff --git a/src/fluentd/build/fluentd.ubuntu b/src/fluentd/build/fluentd.ubuntu deleted file mode 100644 index 476c00e273..0000000000 --- a/src/fluentd/build/fluentd.ubuntu +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -# docker build -f fluentd.ubuntu.dockerfile -t gusui/fluentd:original . --no-cache -# docker build -f fluentd.ubuntu.dockerfile -t gusui/fluentd:singlethread . --no-cache -# docker build -f fluentd.ubuntu.dockerfile -t gusui/fluentd:multiconn . --no-cache - -FROM fluent/fluentd:v1.7-1 - -USER root -RUN apt-get install postgresql postgresql-client libpq5 libpq-dev \ -&& /usr/sbin/td-agent-gem install fluent-plugin-concat \ -&& sudo /usr/sbin/td-agent-gem install rake bundler \ -&& sudo /usr/sbin/td-agent-gem install pg - -# Build fluent-plugin-pgjson from scratch -# Original fluent-plugin-pgjson is from https://github.com/fluent-plugins-nursery/fluent-plugin-pgjson -# Original plugin cannot retry connecting when database connection is lost, -# and is not thread-safe. These two problems are fixed by modifying codes. -COPY src/fluent-plugin-pgjson /fluent-plugin-pgjson -RUN cd /fluent-plugin-pgjson && \ - git init && \ - git add . && \ - rake build && \ - sudo /usr/sbin/td-agent-gem install --local ./pkg/fluent-plugin-pgjson-1.0.0.gem \ - rm -rf /fluent-plugin-pgjson - -# cleanup -RUN sudo gem sources --clear-all \ - && apk del .build-deps \ - && rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem - -COPY build/fluent.conf /fluentd/etc/ diff --git a/src/fluentd/build/utils.md b/src/fluentd/build/utils.md index 4ca9d22c20..a7ba141cc2 100644 --- a/src/fluentd/build/utils.md +++ b/src/fluentd/build/utils.md @@ -15,8 +15,8 @@ delete from test_pg; # docker cd src/fluentd -docker build -f build/fluentd.k8s.dockerfile -t gusui/fluentd:threadlocal . --no-cache -docker run -p 8888:8888 -v $(pwd)/tmp:/fluentd/etc -e FLUENTD_CONF=fluentd.conf -d gusui/fluentd:threadlocal +docker build -f build/fluentd.k8s.dockerfile -t gusui/fluentd:latest . --no-cache +docker run -p 8888:8888 -v $(pwd)/tmp:/fluentd/etc -e FLUENTD_CONF=fluentd.conf -d gusui/fluentd:latest bash run.sh @@ -24,6 +24,36 @@ bash run.sh docker ps docker logs CONTAINERID -docker logs --tails 10 CONTAINERID +docker logs --tail 10 CONTAINERID + +docker stop $(docker ps -a -q) && docker rm $(docker ps -a -q) + + +# log +kubectl get configmap fluentd-config -o yaml +kubectl get po +kubectl logs frameworkcontroller-sts-0 | grep Snapshot | grep kind\":\"Framework + + + +# Database +psql -h 10.151.40.32 -W -U root -p 5432 openpai +rootpass + +CREATE TABLE IF NOT EXISTS fc_objectsnapshots ( tag Text, time Timestamptz, record JsonB ); +CREATE TABLE IF NOT EXISTS framework_history (insertedAt Timestamptz, uid SERIAL, frameworkName VARCHAR(64), attemptIndex INTEGER, historyType VARCHAR(16), snapshot JsonB); + +CREATE TABLE IF NOT EXISTS pods (insertedAt Timestamptz, updatedAt Timestamptz, uid SERIAL, frameworkName VARCHAR(64), attemptIndex INTEGER, taskroleName VARCHAR(256), taskroleIndex INTEGER, snapshot JsonB); + +# fluentd service +./paictl.py config pull -o /cluster-configuration + +## build and push fluentd +./build/pai_build.py build -c /cluster-configuration/ -s fluentd +./build/pai_build.py push -c /cluster-configuration/ -i fluentd + +## restart fluentd service +./paictl.py service stop -n fluentd +./paictl.py service start -n fluentd + -docker stop $(docker ps -a -q) && docker rm $(docker ps -a -q) \ No newline at end of file diff --git a/src/fluentd/code/out_pgjson_threadlocal.rb b/src/fluentd/code/out_pgjson.rb similarity index 81% rename from src/fluentd/code/out_pgjson_threadlocal.rb rename to src/fluentd/code/out_pgjson.rb index d9805bc797..fd3e32d2b9 100644 --- a/src/fluentd/code/out_pgjson_threadlocal.rb +++ b/src/fluentd/code/out_pgjson.rb @@ -43,6 +43,8 @@ class PgJsonOutput < Fluent::Plugin::Output config_param :time_format, :string, default: "%F %T.%N %z" + config_param :reset_connection_interval, :integer, default: 5 + config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE config_set_default :chunk_keys, ["tag"] @@ -69,27 +71,26 @@ def configure(conf) def init_connection # This function is used to create a connection. + thread = Thread.current begin - log.info "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." - conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) + log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." + thread[:conn] = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) rescue PG::Error - log.info "[pgjson] [init_connection] Failed to initialize a connection." - if ! conn.nil? - conn.close() - conn = nil + log.debug "[pgjson] [init_connection] Failed to initialize a connection." + if ! thread[:conn].nil? + thread[:conn].close() + thread[:conn] = nil end rescue => err - log.info "#{err}" + log.debug "#{err}" end - thread = Thread.current - thread[:conn] = conn end def reset_connection # This function try to fix the broken connection to database. # if conn == nil, call init_connection # if conn != nil, call conn.reset - thread = Thread.current + thread = Thread.current begin if timestamp - @last_reset_ts > @reset_connection_interval if thread[:conn].nil? @@ -107,12 +108,11 @@ def reset_connection ensure @last_reset_ts = timestamp end - conn end def timestamp Time.now.getutc.to_i - end + end def formatted_to_msgpack_binary true @@ -127,34 +127,34 @@ def format(tag, time, record) end def write(chunk) - log.info "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" + log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" thread = Thread.current if ! thread.key?(:conn) init_connection + end if ! thread[:conn].nil? begin - thread[:conn].exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") + thread[:conn].exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@name_col}, #{@index_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") tag = chunk.metadata.tag chunk.msgpack_each do |time, record| - thread[:conn].put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" + thread[:conn].put_copy_data "#{tag}\x01#{time}\x01#{record.name}\x01#{record.index}\x01#{record_value(record)}\n" end rescue PG::ConnectionBad, PG::UnableToSend => err # connection error reset_connection # try to reset broken connection, and wait for next retry - log.info "%s while copy data: %s" % [ err.class.name, err.message ] + log.debug "%s while copy data: %s" % [ err.class.name, err.message ] retry rescue PG::Error => err - log.info "[pgjson] [write] Error while writing, error is #{err.class}" + log.debug "[pgjson] [write] Error while writing, error is #{err.class}" errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] thread[:conn].put_copy_end( errmsg ) thread[:conn].get_result - # thread[:conn] = nil raise errmsg else thread[:conn].put_copy_end res = thread[:conn].get_result raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK - log.info "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" + log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" end else raise "Cannot connect to db host." diff --git a/src/fluentd/code/out_pgjson_multiconn.rb b/src/fluentd/code/out_pgjson_multiconn.rb deleted file mode 100644 index d3d800764a..0000000000 --- a/src/fluentd/code/out_pgjson_multiconn.rb +++ /dev/null @@ -1,161 +0,0 @@ -# Copyright (c) 2012 OKUNO Akihiro -# Portions Copyright (c) Microsoft Corporation -# -# Apache License, Version 2.0 - -require "fluent/plugin/output" -require "pg" -require "yajl" -require "json" - -module Fluent::Plugin - class PgJsonOutput < Fluent::Plugin::Output - Fluent::Plugin.register_output("pgjson", self) - - helpers :compat_parameters - - DEFAULT_BUFFER_TYPE = "memory" - - desc "The hostname of PostgreSQL server" - config_param :host, :string, default: "localhost" - desc "The port of PostgreSQL server" - config_param :port, :integer, default: 5432 - desc "Set the sslmode to enable Eavesdropping protection/MITM protection" - config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer - desc "The database name to connect" - config_param :database, :string - desc "The table name to insert records" - config_param :table, :string - desc "The user name to connect database" - config_param :user, :string, default: nil - desc "The password to connect database" - config_param :password, :string, default: nil, secret: true - desc "The column name for the time" - config_param :time_col, :string, default: "time" - desc "The column name for the tag" - config_param :tag_col, :string, default: "tag" - desc "The column name for the record" - config_param :record_col, :string, default: "record" - desc "If true, insert records formatted as msgpack" - config_param :msgpack, :bool, default: false - desc "JSON encoder (yajl/json)" - config_param :encoder, :enum, list: [:yajl, :json], default: :yajl - - config_param :time_format, :string, default: "%F %T.%N %z" - - config_section :buffer do - config_set_default :@type, DEFAULT_BUFFER_TYPE - config_set_default :chunk_keys, ["tag"] - end - - def initialize - super - @last_reset_ts = 0 - end - - def configure(conf) - compat_parameters_convert(conf, :buffer) - super - unless @chunk_key_tag - raise Fluent::ConfigError, "'tag' in chunk_keys is required." - end - @encoder = case @encoder - when :yajl - Yajl - when :json - JSON - end - end - - def init_connection - # This function is used to create a connection. - begin - log.info "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." - conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) - rescue PG::Error - log.info "[pgjson] [init_connection] Failed to initialize a connection." - if ! conn.nil? - conn.close() - conn = nil - end - rescue => err - log.info "#{err}" - end - conn - end - - def timestamp - Time.now.getutc.to_i - end - - def shutdown - # begin - # @thread_lock.lock() - # if ! @conn.nil? - # @conn.close() - # @conn = nil - # end - # rescue => err - # log.info "[pgjson] [shutdown] #{err.class}, #{err.message}" - # ensure - # @thread_lock.unlock() - # end - super - end - - def formatted_to_msgpack_binary - true - end - - def multi_workers_ready? - true - end - - def format(tag, time, record) - [Time.at(time).strftime(@time_format), record].to_msgpack - end - - def write(chunk) - log.info "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" - conn = init_connection - if ! conn.nil? - begin - conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") - tag = chunk.metadata.tag - chunk.msgpack_each do |time, record| - conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record, conn)}\n" - end - rescue PG::ConnectionBad, PG::UnableToSend => err - # connection error - conn = init_connection # try to reset broken connection, and wait for next retry - log.info "%s while copy data: %s" % [ err.class.name, err.message ] - retry - rescue PG::Error => err - log.info "[pgjson] [write] Error while writing, error is #{err.class}" - errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - conn.put_copy_end( errmsg ) - conn.get_result - raise errmsg - else - conn.put_copy_end - res = conn.get_result - raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK - conn.close() - log.info "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" - end - else - raise "Cannot connect to db host." - end - end - - def record_value(record, conn) - if @msgpack - "\\#{conn.escape_bytea(record.to_msgpack)}" - else - json = @encoder.dump(record) - json.gsub!(/\\/){ '\\\\' } - json - end - end - end -end diff --git a/src/fluentd/code/out_pgjson_multiconn_sync.rb b/src/fluentd/code/out_pgjson_multiconn_sync.rb deleted file mode 100644 index 0bbf2d0ebc..0000000000 --- a/src/fluentd/code/out_pgjson_multiconn_sync.rb +++ /dev/null @@ -1,195 +0,0 @@ -# Copyright (c) 2012 OKUNO Akihiro -# Portions Copyright (c) Microsoft Corporation -# -# Apache License, Version 2.0 - -require "fluent/plugin/output" -require "pg" -require "yajl" -require "json" - -module Fluent::Plugin - class PgJsonOutput < Fluent::Plugin::Output - Fluent::Plugin.register_output("pgjson", self) - - helpers :compat_parameters - - DEFAULT_BUFFER_TYPE = "memory" - - desc "The hostname of PostgreSQL server" - config_param :host, :string, default: "localhost" - desc "The port of PostgreSQL server" - config_param :port, :integer, default: 5432 - desc "Set the sslmode to enable Eavesdropping protection/MITM protection" - config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer - desc "The database name to connect" - config_param :database, :string - desc "The table name to insert records" - config_param :table, :string - desc "The user name to connect database" - config_param :user, :string, default: nil - desc "The password to connect database" - config_param :password, :string, default: nil, secret: true - desc "The column name for the time" - config_param :time_col, :string, default: "time" - desc "The column name for the tag" - config_param :tag_col, :string, default: "tag" - desc "The column name for the record" - config_param :record_col, :string, default: "record" - desc "If true, insert records formatted as msgpack" - config_param :msgpack, :bool, default: false - desc "JSON encoder (yajl/json)" - config_param :encoder, :enum, list: [:yajl, :json], default: :yajl - - config_param :time_format, :string, default: "%F %T.%N %z" - - config_param :reset_connection_interval, :integer, default: 5 - - config_section :buffer do - config_set_default :@type, DEFAULT_BUFFER_TYPE - config_set_default :chunk_keys, ["tag"] - end - - def initialize - super - # if @conn == nil: The connection has not been established yet. - # if @conn != nil: The connection has been established, but it could be a broken connection. - # @conn = nil - # @thread_lock = Mutex.new - @last_reset_ts = 0 - end - - def configure(conf) - compat_parameters_convert(conf, :buffer) - super - unless @chunk_key_tag - raise Fluent::ConfigError, "'tag' in chunk_keys is required." - end - @encoder = case @encoder - when :yajl - Yajl - when :json - JSON - end - end - - def init_connection - # This function is used to init the connection (first connecting). - # If @conn == nil, try to establish a connection, and set @conn. - # If @conn != nil, skip this function. - # @conn could be a nil or non-nil value after this call. - begin - log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." - conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) - rescue PG::Error - log.debug "[pgjson] [init_connection] Failed to initialize a connection." - if ! conn.nil? - conn.close() - conn = nil - end - rescue => err - log.debug "#{err}" - end - conn - end - - def timestamp - Time.now.getutc.to_i - end - - def reset_connection - # This function try to fix the broken connection to database. - # if @conn == nil, call init_connection - # if @conn != nil, call @conn.reset - # This function must be protected by thread_lock to ensure thread safety. - begin - if timestamp - @last_reset_ts > @reset_connection_interval - if @conn.nil? - log.debug "[pgjson] [reset_connection] Call init_connection." - init_connection - else - log.debug "[pgjson] [reset_connection] Reset Connection." - @conn.reset - end - else - log.debug "[pgjson] [reset_connection] Skip reset." - end - rescue => err - log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" - ensure - @last_reset_ts = timestamp - end - end - - def shutdown - # begin - # @thread_lock.lock() - # if ! @conn.nil? - # @conn.close() - # @conn = nil - # end - # rescue => err - # log.debug "[pgjson] [shutdown] #{err.class}, #{err.message}" - # ensure - # @thread_lock.unlock() - # end - super - end - - def formatted_to_msgpack_binary - true - end - - def multi_workers_ready? - true - end - - def format(tag, time, record) - [Time.at(time).strftime(@time_format), record].to_msgpack - end - - def write(chunk) - log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" - # @thread_lock.lock() - conn = init_connection - if ! conn.nil? - begin - conn.sync_exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") - tag = chunk.metadata.tag - chunk.msgpack_each do |time, record| - conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record, conn)}\n" - end - rescue PG::ConnectionBad, PG::UnableToSend => err - # connection error - conn = init_connection # try to reset broken connection, and wait for next retry - log.debug "%s while copy data: %s" % [ err.class.name, err.message ] - retry - rescue PG::Error => err - log.debug "[pgjson] [write] Error while writing, error is #{err.class}" - errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - conn.put_copy_end( errmsg ) - conn.get_result - raise errmsg - else - conn.put_copy_end - res = conn.get_result - raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK - conn.close() - log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" - end - else - raise "Cannot connect to db host." - end - end - - def record_value(record, conn) - if @msgpack - "\\#{conn.escape_bytea(record.to_msgpack)}" - else - json = @encoder.dump(record) - json.gsub!(/\\/){ '\\\\' } - json - end - end - end -end diff --git a/src/fluentd/code/out_pgjson_original.rb b/src/fluentd/code/out_pgjson_original.rb deleted file mode 100644 index ef4f7ec4ec..0000000000 --- a/src/fluentd/code/out_pgjson_original.rb +++ /dev/null @@ -1,133 +0,0 @@ -require "fluent/plugin/output" -require "pg" -require "yajl" -require "json" - -module Fluent::Plugin - class PgJsonOutput < Fluent::Plugin::Output - Fluent::Plugin.register_output("pgjson", self) - - helpers :compat_parameters - - DEFAULT_BUFFER_TYPE = "memory" - - desc "The hostname of PostgreSQL server" - config_param :host, :string, default: "localhost" - desc "The port of PostgreSQL server" - config_param :port, :integer, default: 5432 - desc "Set the sslmode to enable Eavesdropping protection/MITM protection" - config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer - desc "The database name to connect" - config_param :database, :string - desc "The table name to insert records" - config_param :table, :string - desc "The user name to connect database" - config_param :user, :string, default: nil - desc "The password to connect database" - config_param :password, :string, default: nil, secret: true - desc "The column name for the time" - config_param :time_col, :string, default: "time" - desc "The column name for the tag" - config_param :tag_col, :string, default: "tag" - desc "The column name for the record" - config_param :record_col, :string, default: "record" - desc "If true, insert records formatted as msgpack" - config_param :msgpack, :bool, default: false - desc "JSON encoder (yajl/json)" - config_param :encoder, :enum, list: [:yajl, :json], default: :yajl - - config_param :time_format, :string, default: "%F %T.%N %z" - - config_section :buffer do - config_set_default :@type, DEFAULT_BUFFER_TYPE - config_set_default :chunk_keys, ["tag"] - end - - def initialize - super - @conn = nil - end - - def configure(conf) - compat_parameters_convert(conf, :buffer) - super - unless @chunk_key_tag - raise Fluent::ConfigError, "'tag' in chunk_keys is required." - end - @encoder = case @encoder - when :yajl - Yajl - when :json - JSON - end - end - - def shutdown - if ! @conn.nil? and ! @conn.finished? - @conn.close() - end - - super - end - - def formatted_to_msgpack_binary - true - end - - def multi_workers_ready? - true - end - - def format(tag, time, record) - [Time.at(time).strftime(@time_format), record].to_msgpack - end - - def write(chunk) - init_connection - @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") - begin - tag = chunk.metadata.tag - chunk.msgpack_each do |time, record| - @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" - end - rescue => err - errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - @conn.put_copy_end( errmsg ) - @conn.get_result - raise - else - @conn.put_copy_end - res = @conn.get_result - raise res.result_error_message if res.result_status!=PG::PGRES_COMMAND_OK - end - end - - private - - def init_connection - if @conn.nil? - log.debug "connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." - - begin - @conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) - rescue - if ! @conn.nil? - @conn.close() - @conn = nil - end - raise "failed to initialize connection: #$!" - end - end - end - - def record_value(record) - if @msgpack - "\\#{@conn.escape_bytea(record.to_msgpack)}" - else - json = @encoder.dump(record) - json.gsub!(/\\/){ '\\\\' } - json - end - end - end -end diff --git a/src/fluentd/code/out_pgjson_single.rb b/src/fluentd/code/out_pgjson_single.rb deleted file mode 100644 index 75fdafa118..0000000000 --- a/src/fluentd/code/out_pgjson_single.rb +++ /dev/null @@ -1,198 +0,0 @@ -# Copyright (c) 2012 OKUNO Akihiro -# Portions Copyright (c) Microsoft Corporation -# -# Apache License, Version 2.0 - -require "fluent/plugin/output" -require "pg" -require "yajl" -require "json" - -module Fluent::Plugin - class PgJsonOutput < Fluent::Plugin::Output - Fluent::Plugin.register_output("pgjson", self) - - helpers :compat_parameters - - DEFAULT_BUFFER_TYPE = "memory" - - desc "The hostname of PostgreSQL server" - config_param :host, :string, default: "localhost" - desc "The port of PostgreSQL server" - config_param :port, :integer, default: 5432 - desc "Set the sslmode to enable Eavesdropping protection/MITM protection" - config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer - desc "The database name to connect" - config_param :database, :string - desc "The table name to insert records" - config_param :table, :string - desc "The user name to connect database" - config_param :user, :string, default: nil - desc "The password to connect database" - config_param :password, :string, default: nil, secret: true - desc "The column name for the time" - config_param :time_col, :string, default: "time" - desc "The column name for the tag" - config_param :tag_col, :string, default: "tag" - desc "The column name for the record" - config_param :record_col, :string, default: "record" - desc "If true, insert records formatted as msgpack" - config_param :msgpack, :bool, default: false - desc "JSON encoder (yajl/json)" - config_param :encoder, :enum, list: [:yajl, :json], default: :yajl - - config_param :time_format, :string, default: "%F %T.%N %z" - - config_param :reset_connection_interval, :integer, default: 5 - - config_section :buffer do - config_set_default :@type, DEFAULT_BUFFER_TYPE - config_set_default :chunk_keys, ["tag"] - end - - def initialize - super - # if @conn == nil: The connection has not been established yet. - # if @conn != nil: The connection has been established, but it could be a broken connection. - @conn = nil - @thread_lock = Mutex.new - @last_reset_ts = 0 - end - - def configure(conf) - compat_parameters_convert(conf, :buffer) - super - unless @chunk_key_tag - raise Fluent::ConfigError, "'tag' in chunk_keys is required." - end - @encoder = case @encoder - when :yajl - Yajl - when :json - JSON - end - end - - def init_connection - # This function is used to init the connection (first connecting). - # If @conn == nil, try to establish a connection, and set @conn. - # If @conn != nil, skip this function. - # @conn could be a nil or non-nil value after this call. - if @conn.nil? - begin - log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." - @conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) - rescue PG::Error - log.debug "[pgjson] [init_connection] Failed to initialize a connection." - if ! @conn.nil? - @conn.close() - @conn = nil - end - rescue => err - log.debug "#{err}" - end - end - end - - def timestamp - Time.now.getutc.to_i - end - - def reset_connection - # This function try to fix the broken connection to database. - # if @conn == nil, call init_connection - # if @conn != nil, call @conn.reset - # This function must be protected by thread_lock to ensure thread safety. - begin - if timestamp - @last_reset_ts > @reset_connection_interval - if @conn.nil? - log.debug "[pgjson] [reset_connection] Call init_connection." - init_connection - else - log.debug "[pgjson] [reset_connection] Reset Connection." - @conn.reset - end - else - log.debug "[pgjson] [reset_connection] Skip reset." - end - rescue => err - log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" - ensure - @last_reset_ts = timestamp - end - end - - def shutdown - begin - @thread_lock.lock() - if ! @conn.nil? - @conn.close() - @conn = nil - end - rescue => err - log.debug "[pgjson] [shutdown] #{err.class}, #{err.message}" - ensure - @thread_lock.unlock() - end - super - end - - def formatted_to_msgpack_binary - true - end - - def multi_workers_ready? - true - end - - def format(tag, time, record) - [Time.at(time).strftime(@time_format), record].to_msgpack - end - - def write(chunk) - log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" - @thread_lock.lock() - init_connection - if ! @conn.nil? - begin - @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") - tag = chunk.metadata.tag - chunk.msgpack_each do |time, record| - @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" - end - rescue PG::ConnectionBad, PG::UnableToSend => err - # connection error - reset_connection # try to reset broken connection, and wait for next retry - errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - raise errmsg - rescue PG::Error => err - log.debug "[pgjson] [write] Error while writing, error is #{err.class}" - errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - @conn.put_copy_end( errmsg ) - @conn.get_result - raise errmsg - else - @conn.put_copy_end - res = @conn.get_result - raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK - log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" - ensure - @thread_lock.unlock() - end - else - @thread_lock.unlock() - raise "Cannot connect to db host." - end - end - - def record_value(record) - if @msgpack - "\\#{@conn.escape_bytea(record.to_msgpack)}" - else - json = @encoder.dump(record) - json.gsub!(/\\/){ '\\\\' } - json - end - end - end -end diff --git a/src/fluentd/deploy/fluentd-config.yaml.template b/src/fluentd/deploy/fluentd-config.yaml.template index 198c8b0001..72b7637d53 100644 --- a/src/fluentd/deploy/fluentd-config.yaml.template +++ b/src/fluentd/deploy/fluentd-config.yaml.template @@ -118,7 +118,7 @@ data: user {{ cluster_cfg['postgresql']['user'] }} password {{ cluster_cfg['postgresql']['passwd'] }} database {{ cluster_cfg['postgresql']['db'] }} - table fc_objectsnapshots + table framework_history record_col record @type memory diff --git a/src/fluentd/err b/src/fluentd/err deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/fluentd/run.sh b/src/fluentd/run.sh index 39d521482e..a3d2eea674 100644 --- a/src/fluentd/run.sh +++ b/src/fluentd/run.sh @@ -1,5 +1,4 @@ for VAR in $(seq 1 10) do - curl -i -X POST -d "json={\"index\":$VAR, \"name\":\"piggy\"}" http://localhost:8888/testpg.cycle + curl -i -X POST -d "json={\"index\":$VAR, \"metadata\":{\"name\":\"piggy\"}}" http://localhost:8888/testpg.cycle done - diff --git a/src/fluentd/src/fluent-plugin-pgjson/example.conf b/src/fluentd/src/fluent-plugin-pgjson/example.conf index 60be14ad3e..e9f94c5b46 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/example.conf +++ b/src/fluentd/src/fluent-plugin-pgjson/example.conf @@ -1,26 +1,17 @@ - @type http - port 8888 - bind 0.0.0.0 + type forward - - @type stdout + + type pgjson + host localhost + port 5432 + sslmode require + database fluentd + table fluentd + user postgres + password postgres + time_col time + tag_col tag + record_col record -# -# type forward -# - -# -# type pgjson -# host localhost -# port 5432 -# sslmode require -# database fluentd -# table fluentd -# user postgres -# password postgres -# time_col time -# tag_col tag -# record_col record -# diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index 4f06fffea0..b9d0ec890d 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -30,12 +30,16 @@ class PgJsonOutput < Fluent::Plugin::Output config_param :user, :string, default: nil desc "The password to connect database" config_param :password, :string, default: nil, secret: true - desc "The column name for the time" - config_param :time_col, :string, default: "time" - desc "The column name for the tag" - config_param :tag_col, :string, default: "tag" - desc "The column name for the record" - config_param :record_col, :string, default: "record" + desc "The column name for the insertedAt" + config_param :insertedAt_col, :string, default: "insertedAt" + desc "The column name for the frameworkName" + config_param :frameworkName_col, :string, default: "frameworkName" + desc "The column name for the attemptIndex" + config_param :attemptIndex_col, :string, default: "attemptIndex" + desc "The column name for the historyType" + config_param :historyType_col, :string, default: "historyType" + desc "The column name for the snapshot" + config_param :snapshot_col, :string, default: "snapshot" desc "If true, insert records formatted as msgpack" config_param :msgpack, :bool, default: false desc "JSON encoder (yajl/json)" @@ -134,10 +138,13 @@ def write(chunk) end if ! thread[:conn].nil? begin - thread[:conn].exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") + thread[:conn].exec("COPY #{@table} (#{@insertedAt_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@historyType_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") tag = chunk.metadata.tag + # record is of type 'Hash' chunk.msgpack_each do |time, record| - thread[:conn].put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" + attempt_index = 0 + log.debug "#{record}" + thread[:conn].put_copy_data "#{time}\x01#{record["objectSnapshot"]["metadata"]["name"]}\x01#{attempt_index}\x01#{"framework"}\x01#{record_value(record)}\n" end rescue PG::ConnectionBad, PG::UnableToSend => err # connection error From 1c8759d31aecc3d0e1e0d1dda642d43e6a4f2584 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Tue, 14 Jul 2020 17:52:40 +0800 Subject: [PATCH 04/14] change rest API to fit new schema --- .../lib/fluent/plugin/out_pgjson.rb | 3 +-- src/rest-server/src/models/v2/job-attempt.js | 17 ++++++++--------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index b9d0ec890d..539fa95db7 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -142,9 +142,8 @@ def write(chunk) tag = chunk.metadata.tag # record is of type 'Hash' chunk.msgpack_each do |time, record| - attempt_index = 0 log.debug "#{record}" - thread[:conn].put_copy_data "#{time}\x01#{record["objectSnapshot"]["metadata"]["name"]}\x01#{attempt_index}\x01#{"framework"}\x01#{record_value(record)}\n" + thread[:conn].put_copy_data "#{time}\x01#{record["objectSnapshot"]["metadata"]["name"]}\x01#{record["objectSnapshot"]["status"]["attemptStatus"]["id"]}\x01#{"retry"}\x01#{record_value(record)}\n" end rescue PG::ConnectionBad, PG::UnableToSend => err # connection error diff --git a/src/rest-server/src/models/v2/job-attempt.js b/src/rest-server/src/models/v2/job-attempt.js index 5c9ea86371..5f5522acf8 100644 --- a/src/rest-server/src/models/v2/job-attempt.js +++ b/src/rest-server/src/models/v2/job-attempt.js @@ -92,10 +92,9 @@ if (sequelize && launcherConfig.enabledJobHistory) { return {status: 404, data: null}; } - const sqlSentence = `SELECT (record->'objectSnapshot') as data FROM fc_objectsnapshots WHERE ` + - `record->'objectSnapshot'->'metadata'->'uid' ? '${uid}' and ` + - `record->'objectSnapshot'->'kind' ? 'Framework' ` + - `ORDER BY cast(record->'objectSnapshot'->'status'->'attemptStatus'->>'id' as INTEGER) ASC;`; + const sqlSentence = `SELECT (snapshot->'objectSnapshot') as data FROM framework_history WHERE ` + + `snapshot->'objectSnapshot'->'metadata'->'uid' ? '${uid}' ` + + `ORDER BY uid ASC;`; const pgResult = (await sequelize.query(sqlSentence))[0]; const jobRetries = await Promise.all( pgResult.map((row) => { @@ -146,11 +145,11 @@ if (sequelize && launcherConfig.enabledJobHistory) { if (isNil(uid)) { return {status: 404, data: null}; } - const sqlSentence = `SELECT (record->'objectSnapshot') as data FROM fc_objectsnapshots WHERE ` + - `record->'objectSnapshot'->'metadata'->'uid' ? '${uid}' and ` + - `record->'objectSnapshot'->'status'->'attemptStatus'->>'id' = '${jobAttemptIndex}' and ` + - `record->'objectSnapshot'->'kind' ? 'Framework' ` + - `ORDER BY cast(record->'objectSnapshot'->'status'->'attemptStatus'->>'id' as INTEGER) ASC;`; + const sqlSentence = `SELECT (snapshot->'objectSnapshot') as data FROM framework_history WHERE ` + + `snapshot->'objectSnapshot'->'metadata'->'uid' ? '${uid}' and ` + + `attemptIndex = '${jobAttemptIndex}'` + + `ORDER BY uid ASC;`; + const pgResult = (await sequelize.query(sqlSentence))[0]; if (pgResult.length === 0) { From 20ff7bc861b6330c59eccca218f3b6212a4a8797 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Wed, 15 Jul 2020 06:00:21 +0000 Subject: [PATCH 05/14] use db as ground truth update --- src/fluentd/build/utils.md | 59 ------ src/fluentd/code/out_pgjson.rb | 175 ------------------ src/fluentd/run.py | 17 -- src/fluentd/run.sh | 4 - .../lib/fluent/plugin/out_pgjson.rb | 6 +- .../src/fluent-plugin-pgjson/test.conf | 19 -- src/rest-server/src/models/v2/job-attempt.js | 108 ++--------- 7 files changed, 20 insertions(+), 368 deletions(-) delete mode 100644 src/fluentd/build/utils.md delete mode 100644 src/fluentd/code/out_pgjson.rb delete mode 100644 src/fluentd/run.py delete mode 100644 src/fluentd/run.sh delete mode 100644 src/fluentd/src/fluent-plugin-pgjson/test.conf diff --git a/src/fluentd/build/utils.md b/src/fluentd/build/utils.md deleted file mode 100644 index a7ba141cc2..0000000000 --- a/src/fluentd/build/utils.md +++ /dev/null @@ -1,59 +0,0 @@ - -# td-agent -sudo /etc/init.d/td-agent restart - -/var/log/td-agent/td-agent.log -sudo vim /etc/td-agent/td-agent.conf - -sudo /usr/sbin/td-agent-gem install pg --version "=1.1.4" -path: /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-pgjson - -# postgres -psql -h 10.151.40.32 -W -U root -p 5432 openpai -select count(*) from test_pg; -delete from test_pg; - -# docker -cd src/fluentd -docker build -f build/fluentd.k8s.dockerfile -t gusui/fluentd:latest . --no-cache -docker run -p 8888:8888 -v $(pwd)/tmp:/fluentd/etc -e FLUENTD_CONF=fluentd.conf -d gusui/fluentd:latest - - -bash run.sh - -docker ps - -docker logs CONTAINERID -docker logs --tail 10 CONTAINERID - -docker stop $(docker ps -a -q) && docker rm $(docker ps -a -q) - - -# log -kubectl get configmap fluentd-config -o yaml -kubectl get po -kubectl logs frameworkcontroller-sts-0 | grep Snapshot | grep kind\":\"Framework - - - -# Database -psql -h 10.151.40.32 -W -U root -p 5432 openpai -rootpass - -CREATE TABLE IF NOT EXISTS fc_objectsnapshots ( tag Text, time Timestamptz, record JsonB ); -CREATE TABLE IF NOT EXISTS framework_history (insertedAt Timestamptz, uid SERIAL, frameworkName VARCHAR(64), attemptIndex INTEGER, historyType VARCHAR(16), snapshot JsonB); - -CREATE TABLE IF NOT EXISTS pods (insertedAt Timestamptz, updatedAt Timestamptz, uid SERIAL, frameworkName VARCHAR(64), attemptIndex INTEGER, taskroleName VARCHAR(256), taskroleIndex INTEGER, snapshot JsonB); - -# fluentd service -./paictl.py config pull -o /cluster-configuration - -## build and push fluentd -./build/pai_build.py build -c /cluster-configuration/ -s fluentd -./build/pai_build.py push -c /cluster-configuration/ -i fluentd - -## restart fluentd service -./paictl.py service stop -n fluentd -./paictl.py service start -n fluentd - - diff --git a/src/fluentd/code/out_pgjson.rb b/src/fluentd/code/out_pgjson.rb deleted file mode 100644 index fd3e32d2b9..0000000000 --- a/src/fluentd/code/out_pgjson.rb +++ /dev/null @@ -1,175 +0,0 @@ -# Copyright (c) 2012 OKUNO Akihiro -# Portions Copyright (c) Microsoft Corporation -# -# Apache License, Version 2.0 - -require "fluent/plugin/output" -require "pg" -require "yajl" -require "json" - -module Fluent::Plugin - class PgJsonOutput < Fluent::Plugin::Output - Fluent::Plugin.register_output("pgjson", self) - - helpers :compat_parameters - - DEFAULT_BUFFER_TYPE = "memory" - - desc "The hostname of PostgreSQL server" - config_param :host, :string, default: "localhost" - desc "The port of PostgreSQL server" - config_param :port, :integer, default: 5432 - desc "Set the sslmode to enable Eavesdropping protection/MITM protection" - config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer - desc "The database name to connect" - config_param :database, :string - desc "The table name to insert records" - config_param :table, :string - desc "The user name to connect database" - config_param :user, :string, default: nil - desc "The password to connect database" - config_param :password, :string, default: nil, secret: true - desc "The column name for the time" - config_param :time_col, :string, default: "time" - desc "The column name for the tag" - config_param :tag_col, :string, default: "tag" - desc "The column name for the record" - config_param :record_col, :string, default: "record" - desc "If true, insert records formatted as msgpack" - config_param :msgpack, :bool, default: false - desc "JSON encoder (yajl/json)" - config_param :encoder, :enum, list: [:yajl, :json], default: :yajl - - config_param :time_format, :string, default: "%F %T.%N %z" - - config_param :reset_connection_interval, :integer, default: 5 - - config_section :buffer do - config_set_default :@type, DEFAULT_BUFFER_TYPE - config_set_default :chunk_keys, ["tag"] - end - - def initialize - super - @last_reset_ts = 0 - end - - def configure(conf) - compat_parameters_convert(conf, :buffer) - super - unless @chunk_key_tag - raise Fluent::ConfigError, "'tag' in chunk_keys is required." - end - @encoder = case @encoder - when :yajl - Yajl - when :json - JSON - end - end - - def init_connection - # This function is used to create a connection. - thread = Thread.current - begin - log.debug "[pgjson] [init_connection] Connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." - thread[:conn] = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) - rescue PG::Error - log.debug "[pgjson] [init_connection] Failed to initialize a connection." - if ! thread[:conn].nil? - thread[:conn].close() - thread[:conn] = nil - end - rescue => err - log.debug "#{err}" - end - end - - def reset_connection - # This function try to fix the broken connection to database. - # if conn == nil, call init_connection - # if conn != nil, call conn.reset - thread = Thread.current - begin - if timestamp - @last_reset_ts > @reset_connection_interval - if thread[:conn].nil? - log.debug "[pgjson] [reset_connection] Call init_connection." - init_connection - else - log.debug "[pgjson] [reset_connection] Reset Connection." - thread[:conn].reset - end - else - log.debug "[pgjson] [reset_connection] Skip reset." - end - rescue => err - log.debug "[pgjson] [reset_connection] #{err.class}, #{err.message}" - ensure - @last_reset_ts = timestamp - end - end - - def timestamp - Time.now.getutc.to_i - end - - def formatted_to_msgpack_binary - true - end - - def multi_workers_ready? - true - end - - def format(tag, time, record) - [Time.at(time).strftime(@time_format), record].to_msgpack - end - - def write(chunk) - log.debug "[pgjson] in write, chunk id #{dump_unique_id_hex chunk.unique_id}" - thread = Thread.current - if ! thread.key?(:conn) - init_connection - end - if ! thread[:conn].nil? - begin - thread[:conn].exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@name_col}, #{@index_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") - tag = chunk.metadata.tag - chunk.msgpack_each do |time, record| - thread[:conn].put_copy_data "#{tag}\x01#{time}\x01#{record.name}\x01#{record.index}\x01#{record_value(record)}\n" - end - rescue PG::ConnectionBad, PG::UnableToSend => err - # connection error - reset_connection # try to reset broken connection, and wait for next retry - log.debug "%s while copy data: %s" % [ err.class.name, err.message ] - retry - rescue PG::Error => err - log.debug "[pgjson] [write] Error while writing, error is #{err.class}" - errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] - thread[:conn].put_copy_end( errmsg ) - thread[:conn].get_result - raise errmsg - else - thread[:conn].put_copy_end - res = thread[:conn].get_result - raise res.result_error_message if res.result_status != PG::PGRES_COMMAND_OK - log.debug "[pgjson] write successfully, chunk id #{dump_unique_id_hex chunk.unique_id}" - end - else - raise "Cannot connect to db host." - end - end - - def record_value(record) - thread = Thread.current - if @msgpack - "\\#{thread[:conn].escape_bytea(record.to_msgpack)}" - else - json = @encoder.dump(record) - json.gsub!(/\\/){ '\\\\' } - json - end - end - end -end diff --git a/src/fluentd/run.py b/src/fluentd/run.py deleted file mode 100644 index 198534d027..0000000000 --- a/src/fluentd/run.py +++ /dev/null @@ -1,17 +0,0 @@ -import _thread -import time - -# Define a function for the thread -def print_time( threadName, delay): - count = 0 - while count < 5: - # time.sleep(delay) - count += 1 - print ("%s: %s" % ( threadName, time.ctime(time.time()) )) - -# Create two threads as follows -try: - _thread.start_new_thread( print_time, ("Thread-1", 2, ) ) - _thread.start_new_thread( print_time, ("Thread-2", 4, ) ) -except: - print ("Error: unable to start thread") diff --git a/src/fluentd/run.sh b/src/fluentd/run.sh deleted file mode 100644 index a3d2eea674..0000000000 --- a/src/fluentd/run.sh +++ /dev/null @@ -1,4 +0,0 @@ -for VAR in $(seq 1 10) -do - curl -i -X POST -d "json={\"index\":$VAR, \"metadata\":{\"name\":\"piggy\"}}" http://localhost:8888/testpg.cycle -done diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index 539fa95db7..b2b4e26828 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -139,10 +139,14 @@ def write(chunk) if ! thread[:conn].nil? begin thread[:conn].exec("COPY #{@table} (#{@insertedAt_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@historyType_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") - tag = chunk.metadata.tag # record is of type 'Hash' chunk.msgpack_each do |time, record| log.debug "#{record}" + log.info "status class : #{record["objectSnapshot"]["status"].class}" + log.info "attempStatus class : #{record["objectSnapshot"]["status"]["attemptStatus"].class}" + log.info "name: #{record["objectSnapshot"]["metadata"]["name"]}" + log.info "id: #{record["objectSnapshot"]["status"]["attemptStatus"]["id"]}" + log.info "class 1: #{record_value(record).class}" thread[:conn].put_copy_data "#{time}\x01#{record["objectSnapshot"]["metadata"]["name"]}\x01#{record["objectSnapshot"]["status"]["attemptStatus"]["id"]}\x01#{"retry"}\x01#{record_value(record)}\n" end rescue PG::ConnectionBad, PG::UnableToSend => err diff --git a/src/fluentd/src/fluent-plugin-pgjson/test.conf b/src/fluentd/src/fluent-plugin-pgjson/test.conf deleted file mode 100644 index a4cf67a3c0..0000000000 --- a/src/fluentd/src/fluent-plugin-pgjson/test.conf +++ /dev/null @@ -1,19 +0,0 @@ - - @type http - port 8888 - bind 0.0.0.0 - - - - @type pgjson - host 10.151.40.32 - port 5432 - sslmode require - database openpai - table user_tbl - user root - password rootpass - time_col time - tag_col tag - record_col record - diff --git a/src/rest-server/src/models/v2/job-attempt.js b/src/rest-server/src/models/v2/job-attempt.js index 5f5522acf8..ee7df4b8b2 100644 --- a/src/rest-server/src/models/v2/job-attempt.js +++ b/src/rest-server/src/models/v2/job-attempt.js @@ -22,7 +22,6 @@ const {isNil} = require('lodash'); const {convertToJobAttempt} = require('@pai/utils/frameworkConverter'); const launcherConfig = require('@pai/config/launcher'); const createError = require('@pai/utils/error'); -const k8sModel = require('@pai/models/kubernetes/kubernetes'); const logger = require('@pai/config/logger'); const {sequelize} = require('@pai/utils/postgresUtil'); @@ -54,51 +53,14 @@ if (sequelize && launcherConfig.enabledJobHistory) { const list = async (frameworkName) => { let attemptData = []; - let uid; - - // get latest framework from k8s API - let response; - try { - response = await k8sModel.getClient().get( - launcherConfig.frameworkPath(encodeName(frameworkName)), - { - headers: launcherConfig.requestHeaders, - } - ); - } catch (error) { - logger.error(`error when getting framework from k8s api: ${error.message}`); - if (error.response != null) { - response = error.response; - } else { - throw error; - } - } - - if (response.status === 200) { - // get UID from k8s framework API - uid = response.data.metadata.uid; - attemptData.push({ - ...(await convertToJobAttempt(response.data)), - isLatest: true, - }); - } else if (response.status === 404) { - logger.warn(`could not get framework ${uid} from k8s: ${JSON.stringify(response)}`); - return {status: 404, data: null}; - } else { - throw createError(response.status, 'UnknownError', response.data.message); - } - - if (isNil(uid)) { - return {status: 404, data: null}; - } - - const sqlSentence = `SELECT (snapshot->'objectSnapshot') as data FROM framework_history WHERE ` + - `snapshot->'objectSnapshot'->'metadata'->'uid' ? '${uid}' ` + + + const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` + + `frameworkName = '${encodeName(frameworkName)}' ` + `ORDER BY uid ASC;`; const pgResult = (await sequelize.query(sqlSentence))[0]; const jobRetries = await Promise.all( pgResult.map((row) => { - return convertToJobAttempt(row.data); + return convertToJobAttempt(JSON.parse(row.data)["objectSnapshot"]); }), ); attemptData.push( @@ -111,63 +73,23 @@ if (sequelize && launcherConfig.enabledJobHistory) { }; const get = async (frameworkName, jobAttemptIndex) => { - let uid; let attemptFramework; - let response; - try { - response = await k8sModel.getClient().get( - launcherConfig.frameworkPath(encodeName(frameworkName)), - { - headers: launcherConfig.requestHeaders, - } - ); - } catch (error) { - logger.error(`error when getting framework from k8s api: ${error.message}`); - if (error.response != null) { - response = error.response; - } else { - throw error; - } - } - - if (response.status === 200) { - // get uid from k8s framwork API - uid = response.data.metadata.uid; - attemptFramework = response.data; - } else if (response.status === 404) { - logger.warn(`could not get framework ${uid} from k8s: ${JSON.stringify(response)}`); - return {status: 404, data: null}; - } else { - throw createError(response.status, 'UnknownError', response.data.message); - } - if (jobAttemptIndex < attemptFramework.spec.retryPolicy.maxRetryCount) { - if (isNil(uid)) { - return {status: 404, data: null}; - } - const sqlSentence = `SELECT (snapshot->'objectSnapshot') as data FROM framework_history WHERE ` + - `snapshot->'objectSnapshot'->'metadata'->'uid' ? '${uid}' and ` + - `attemptIndex = '${jobAttemptIndex}'` + - `ORDER BY uid ASC;`; + const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` + + `frameworkName = '${encodeName(frameworkName)}' and ` + + `attemptIndex = '${jobAttemptIndex}'` + + `ORDER BY uid ASC;`; - const pgResult = (await sequelize.query(sqlSentence))[0]; + const pgResult = (await sequelize.query(sqlSentence))[0]; - if (pgResult.length === 0) { - return {status: 404, data: null}; - } else { - attemptFramework = pgResult[0].data; - const attemptDetail = await convertToJobAttempt(attemptFramework); - return {status: 200, data: {...attemptDetail, isLatest: false}}; - } - } else if ( - jobAttemptIndex === attemptFramework.spec.retryPolicy.maxRetryCount - ) { - // get latest frameworks from k8s API - const attemptDetail = await convertToJobAttempt(attemptFramework); - return {status: 200, data: {...attemptDetail, isLatest: true}}; - } else { + if (pgResult.length === 0) { return {status: 404, data: null}; + } else { + attemptFramework = JSON.parse(pgResult[0].data)["objectSnapshot"]; + const attemptDetail = await convertToJobAttempt(attemptFramework); + return {status: 200, data: {...attemptDetail, isLatest: false}}; } + }; module.exports = { From 661e8fae7f6c43cdf31b0edc1537bad5738aab96 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Wed, 15 Jul 2020 19:46:07 +0800 Subject: [PATCH 06/14] write to pods table --- .../deploy/fluentd-config.yaml.template | 3 +- .../lib/fluent/plugin/out_pgjson.rb | 35 +++++++++++++------ 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/fluentd/deploy/fluentd-config.yaml.template b/src/fluentd/deploy/fluentd-config.yaml.template index 72b7637d53..a01a368896 100644 --- a/src/fluentd/deploy/fluentd-config.yaml.template +++ b/src/fluentd/deploy/fluentd-config.yaml.template @@ -113,13 +113,12 @@ data: @type copy @type pgjson + @log_level debug host {{ cluster_cfg['postgresql']['host'] }} port {{ cluster_cfg['postgresql']['port'] }} user {{ cluster_cfg['postgresql']['user'] }} password {{ cluster_cfg['postgresql']['passwd'] }} database {{ cluster_cfg['postgresql']['db'] }} - table framework_history - record_col record @type memory flush_mode immediate diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index b2b4e26828..61031ab53a 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -24,20 +24,26 @@ class PgJsonOutput < Fluent::Plugin::Output config_param :sslmode, :enum, list: %i[disable allow prefer require verify-ca verify-full], default: :prefer desc "The database name to connect" config_param :database, :string - desc "The table name to insert records" - config_param :table, :string desc "The user name to connect database" config_param :user, :string, default: nil desc "The password to connect database" config_param :password, :string, default: nil, secret: true desc "The column name for the insertedAt" config_param :insertedAt_col, :string, default: "insertedAt" + desc "The column name for the updatedAt" + config_param :updatedAt_col, :string, default: "updatedAt" + desc "The column name for the uid" + config_param :uid_col, :string, default: "uid" desc "The column name for the frameworkName" config_param :frameworkName_col, :string, default: "frameworkName" desc "The column name for the attemptIndex" config_param :attemptIndex_col, :string, default: "attemptIndex" desc "The column name for the historyType" config_param :historyType_col, :string, default: "historyType" + desc "The column name for the taskroleName" + config_param :taskroleName_col, :string, default: "taskroleName" + desc "The column name for the taskroleIndex" + config_param :taskroleIndex_col, :string, default: "taskroleIndex" desc "The column name for the snapshot" config_param :snapshot_col, :string, default: "snapshot" desc "If true, insert records formatted as msgpack" @@ -138,16 +144,23 @@ def write(chunk) end if ! thread[:conn].nil? begin - thread[:conn].exec("COPY #{@table} (#{@insertedAt_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@historyType_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") - # record is of type 'Hash' chunk.msgpack_each do |time, record| - log.debug "#{record}" - log.info "status class : #{record["objectSnapshot"]["status"].class}" - log.info "attempStatus class : #{record["objectSnapshot"]["status"]["attemptStatus"].class}" - log.info "name: #{record["objectSnapshot"]["metadata"]["name"]}" - log.info "id: #{record["objectSnapshot"]["status"]["attemptStatus"]["id"]}" - log.info "class 1: #{record_value(record).class}" - thread[:conn].put_copy_data "#{time}\x01#{record["objectSnapshot"]["metadata"]["name"]}\x01#{record["objectSnapshot"]["status"]["attemptStatus"]["id"]}\x01#{"retry"}\x01#{record_value(record)}\n" + kind = record["objectSnapshot"]["kind"] + if kind == "Framework" + thread[:conn].exec("COPY framework_history (#{@insertedAt_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@historyType_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") + frameworkName = record["objectSnapshot"]["metadata"]["name"] + attemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"] + historyType = "retry" + thread[:conn].put_copy_data "#{time}\x01#{frameworkName}\x01#{attemptIndex}\x01#{historyType}\x01#{record_value(record)}\n" + elsif kind == "Pod" + thread[:conn].exec("COPY pods (#{@insertedAt_col}, #{@updatedAt_col}, #{@uid_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@taskroleName_col}, #{@taskroleIndex_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") + uid = record["objectSnapshot"]["metadata"]["uid"] + frameworkName = record["objectSnapshot"]["metadata"]["name"] + attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_ATTEMPT_ID"] + taskroleName = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASKROLE_NAME"] + taskroleIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_INDEX"] + thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{taskroleName}\x01#{taskroleIndex}\x01#{record_value(record)}\n" + end end rescue PG::ConnectionBad, PG::UnableToSend => err # connection error From 18a34a1ecb53d265b5eb7341477c525632149ad4 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Thu, 16 Jul 2020 14:00:33 +0800 Subject: [PATCH 07/14] code cleaning --- src/fluentd/deploy/fluentd-config.yaml.template | 1 - src/rest-server/src/models/v2/job-attempt.js | 9 +++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/fluentd/deploy/fluentd-config.yaml.template b/src/fluentd/deploy/fluentd-config.yaml.template index a01a368896..515217202a 100644 --- a/src/fluentd/deploy/fluentd-config.yaml.template +++ b/src/fluentd/deploy/fluentd-config.yaml.template @@ -113,7 +113,6 @@ data: @type copy @type pgjson - @log_level debug host {{ cluster_cfg['postgresql']['host'] }} port {{ cluster_cfg['postgresql']['port'] }} user {{ cluster_cfg['postgresql']['user'] }} diff --git a/src/rest-server/src/models/v2/job-attempt.js b/src/rest-server/src/models/v2/job-attempt.js index ee7df4b8b2..0d8ab4213f 100644 --- a/src/rest-server/src/models/v2/job-attempt.js +++ b/src/rest-server/src/models/v2/job-attempt.js @@ -17,11 +17,9 @@ // module dependencies const crypto = require('crypto'); -const {isNil} = require('lodash'); const {convertToJobAttempt} = require('@pai/utils/frameworkConverter'); const launcherConfig = require('@pai/config/launcher'); -const createError = require('@pai/utils/error'); const logger = require('@pai/config/logger'); const {sequelize} = require('@pai/utils/postgresUtil'); @@ -53,14 +51,14 @@ if (sequelize && launcherConfig.enabledJobHistory) { const list = async (frameworkName) => { let attemptData = []; - + const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` + `frameworkName = '${encodeName(frameworkName)}' ` + `ORDER BY uid ASC;`; const pgResult = (await sequelize.query(sqlSentence))[0]; const jobRetries = await Promise.all( pgResult.map((row) => { - return convertToJobAttempt(JSON.parse(row.data)["objectSnapshot"]); + return convertToJobAttempt(JSON.parse(row.data)['objectSnapshot']); }), ); attemptData.push( @@ -85,11 +83,10 @@ if (sequelize && launcherConfig.enabledJobHistory) { if (pgResult.length === 0) { return {status: 404, data: null}; } else { - attemptFramework = JSON.parse(pgResult[0].data)["objectSnapshot"]; + attemptFramework = JSON.parse(pgResult[0].data)['objectSnapshot']; const attemptDetail = await convertToJobAttempt(attemptFramework); return {status: 200, data: {...attemptDetail, isLatest: false}}; } - }; module.exports = { From 361dee4a334800f5a2ef0184d220c673c197b8b3 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Thu, 16 Jul 2020 14:24:58 +0800 Subject: [PATCH 08/14] verify column value --- .../lib/fluent/plugin/out_pgjson.rb | 13 +++++++++---- src/rest-server/src/models/v2/job-attempt.js | 4 ++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index 61031ab53a..b60095e907 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -44,6 +44,8 @@ class PgJsonOutput < Fluent::Plugin::Output config_param :taskroleName_col, :string, default: "taskroleName" desc "The column name for the taskroleIndex" config_param :taskroleIndex_col, :string, default: "taskroleIndex" + desc "The column name for the taskAttemptIndex" + config_param :taskAttemptIndex_col, :string, default: "taskAttemptIndex" desc "The column name for the snapshot" config_param :snapshot_col, :string, default: "snapshot" desc "If true, insert records formatted as msgpack" @@ -151,15 +153,18 @@ def write(chunk) frameworkName = record["objectSnapshot"]["metadata"]["name"] attemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"] historyType = "retry" - thread[:conn].put_copy_data "#{time}\x01#{frameworkName}\x01#{attemptIndex}\x01#{historyType}\x01#{record_value(record)}\n" + snapshot = record_value(record["objectSnapshot"]) + thread[:conn].put_copy_data "#{time}\x01#{frameworkName}\x01#{attemptIndex}\x01#{historyType}\x01#{snapshot}\n" elsif kind == "Pod" - thread[:conn].exec("COPY pods (#{@insertedAt_col}, #{@updatedAt_col}, #{@uid_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@taskroleName_col}, #{@taskroleIndex_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") + thread[:conn].exec("COPY pods (#{@insertedAt_col}, #{@updatedAt_col}, #{@uid_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@taskroleName_col}, #{@taskroleIndex_col}, #{@taskAttemptIndex_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") uid = record["objectSnapshot"]["metadata"]["uid"] frameworkName = record["objectSnapshot"]["metadata"]["name"] - attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_ATTEMPT_ID"] + attemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_FRAMEWORK_ATTEMPT_ID"] taskroleName = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASKROLE_NAME"] taskroleIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_INDEX"] - thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{taskroleName}\x01#{taskroleIndex}\x01#{record_value(record)}\n" + taskAttemptIndex = record["objectSnapshot"]["metadata"]["annotations"]["FC_TASK_ATTEMPT_ID"] + snapshot = record_value(record["objectSnapshot"]) + thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{taskroleName}\x01#{taskroleIndex}\x01#{taskAttemptIndex}\x01#{snapshot}\n" end end rescue PG::ConnectionBad, PG::UnableToSend => err diff --git a/src/rest-server/src/models/v2/job-attempt.js b/src/rest-server/src/models/v2/job-attempt.js index 0d8ab4213f..a6ec389cec 100644 --- a/src/rest-server/src/models/v2/job-attempt.js +++ b/src/rest-server/src/models/v2/job-attempt.js @@ -58,7 +58,7 @@ if (sequelize && launcherConfig.enabledJobHistory) { const pgResult = (await sequelize.query(sqlSentence))[0]; const jobRetries = await Promise.all( pgResult.map((row) => { - return convertToJobAttempt(JSON.parse(row.data)['objectSnapshot']); + return convertToJobAttempt(JOSN.parse(row.data)); }), ); attemptData.push( @@ -83,7 +83,7 @@ if (sequelize && launcherConfig.enabledJobHistory) { if (pgResult.length === 0) { return {status: 404, data: null}; } else { - attemptFramework = JSON.parse(pgResult[0].data)['objectSnapshot']; + attemptFramework = JSON.parse(pgResult[0].data); const attemptDetail = await convertToJobAttempt(attemptFramework); return {status: 200, data: {...attemptDetail, isLatest: false}}; } From 738b86e47548b1100f6167f2ff86f3ff8121aaef Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Thu, 16 Jul 2020 14:36:04 +0800 Subject: [PATCH 09/14] fix typo --- src/rest-server/src/models/v2/job-attempt.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rest-server/src/models/v2/job-attempt.js b/src/rest-server/src/models/v2/job-attempt.js index a6ec389cec..ca28374b6c 100644 --- a/src/rest-server/src/models/v2/job-attempt.js +++ b/src/rest-server/src/models/v2/job-attempt.js @@ -58,7 +58,7 @@ if (sequelize && launcherConfig.enabledJobHistory) { const pgResult = (await sequelize.query(sqlSentence))[0]; const jobRetries = await Promise.all( pgResult.map((row) => { - return convertToJobAttempt(JOSN.parse(row.data)); + return convertToJobAttempt(JSON.parse(row.data)); }), ); attemptData.push( @@ -75,7 +75,7 @@ if (sequelize && launcherConfig.enabledJobHistory) { const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` + `frameworkName = '${encodeName(frameworkName)}' and ` + - `attemptIndex = '${jobAttemptIndex}'` + + `attemptIndex = '${jobAttemptIndex}' ` + `ORDER BY uid ASC;`; const pgResult = (await sequelize.query(sqlSentence))[0]; From 7cf2467dd90d81dc1b52a1bdb434547af0f9ad57 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Thu, 16 Jul 2020 16:19:42 +0800 Subject: [PATCH 10/14] create framework_history and pods table --- src/postgresql/src/init_table.sql | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/postgresql/src/init_table.sql b/src/postgresql/src/init_table.sql index 65ae96b0a8..b9a7ad6235 100644 --- a/src/postgresql/src/init_table.sql +++ b/src/postgresql/src/init_table.sql @@ -1,8 +1,21 @@ /* This init script will be run every time when the database container is started. */ -CREATE TABLE IF NOT EXISTS fc_objectsnapshots ( - tag Text, - time Timestamptz, - record JsonB +CREATE TABLE IF NOT EXISTS framework_history ( + insertedAt Timestamptz, + uid SERIAL, + frameworkName VARCHAR(64), + attemptIndex INTEGER, + historyType VARCHAR(16), + snapshot TEXT +); +CREATE TABLE IF NOT EXISTS pods ( + insertedAt Timestamptz, + updatedAt Timestamptz, uid VARCHAR(36), + frameworkName VARCHAR(64), + attemptIndex INTEGER, + taskroleName VARCHAR(256), + taskroleIndex INTEGER, + taskAttemptIndex INTEGER, + snapshot TEXT ); CREATE INDEX IF NOT EXISTS uidindex ON fc_objectsnapshots USING gin ((record -> 'objectSnapshot' -> 'metadata' -> 'uid')); From b0c85e836ca36968420b6243f7df6aa8a6b7f75c Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Fri, 17 Jul 2020 10:20:12 +0800 Subject: [PATCH 11/14] create table index --- src/postgresql/src/init_table.sql | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/postgresql/src/init_table.sql b/src/postgresql/src/init_table.sql index b9a7ad6235..5ca36e5968 100644 --- a/src/postgresql/src/init_table.sql +++ b/src/postgresql/src/init_table.sql @@ -10,7 +10,8 @@ CREATE TABLE IF NOT EXISTS framework_history ( ); CREATE TABLE IF NOT EXISTS pods ( insertedAt Timestamptz, - updatedAt Timestamptz, uid VARCHAR(36), + updatedAt Timestamptz, + uid VARCHAR(36), frameworkName VARCHAR(64), attemptIndex INTEGER, taskroleName VARCHAR(256), @@ -18,4 +19,5 @@ CREATE TABLE IF NOT EXISTS pods ( taskAttemptIndex INTEGER, snapshot TEXT ); -CREATE INDEX IF NOT EXISTS uidindex ON fc_objectsnapshots USING gin ((record -> 'objectSnapshot' -> 'metadata' -> 'uid')); +CREATE INDEX IF NOT EXISTS uidindex ON framework_history USING gin ((record -> 'uid')); +CREATE INDEX IF NOT EXISTS uidindex ON pods USING gin ((record -> 'uid')); From d1d34eae278891559469c93ee7096f10ce4aa3c6 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Tue, 21 Jul 2020 11:54:59 +0800 Subject: [PATCH 12/14] raise err when 404 --- .../lib/fluent/plugin/out_pgjson.rb | 1 + src/postgresql/src/init_table.sql | 2 -- src/rest-server/src/models/v2/job-attempt.js | 27 +++++++++++-------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index b60095e907..c210980bdb 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -148,6 +148,7 @@ def write(chunk) begin chunk.msgpack_each do |time, record| kind = record["objectSnapshot"]["kind"] + log.debug "log type: #{kind}" if kind == "Framework" thread[:conn].exec("COPY framework_history (#{@insertedAt_col}, #{@frameworkName_col}, #{@attemptIndex_col}, #{@historyType_col}, #{@snapshot_col}) FROM STDIN WITH DELIMITER E'\\x01'") frameworkName = record["objectSnapshot"]["metadata"]["name"] diff --git a/src/postgresql/src/init_table.sql b/src/postgresql/src/init_table.sql index 5ca36e5968..e467c5af66 100644 --- a/src/postgresql/src/init_table.sql +++ b/src/postgresql/src/init_table.sql @@ -19,5 +19,3 @@ CREATE TABLE IF NOT EXISTS pods ( taskAttemptIndex INTEGER, snapshot TEXT ); -CREATE INDEX IF NOT EXISTS uidindex ON framework_history USING gin ((record -> 'uid')); -CREATE INDEX IF NOT EXISTS uidindex ON pods USING gin ((record -> 'uid')); diff --git a/src/rest-server/src/models/v2/job-attempt.js b/src/rest-server/src/models/v2/job-attempt.js index ca28374b6c..b990afd2da 100644 --- a/src/rest-server/src/models/v2/job-attempt.js +++ b/src/rest-server/src/models/v2/job-attempt.js @@ -22,6 +22,7 @@ const {convertToJobAttempt} = require('@pai/utils/frameworkConverter'); const launcherConfig = require('@pai/config/launcher'); const logger = require('@pai/config/logger'); const {sequelize} = require('@pai/utils/postgresUtil'); +const createError = require('@pai/utils/error'); const convertName = (name) => { // convert framework name to fit framework controller spec @@ -56,17 +57,21 @@ if (sequelize && launcherConfig.enabledJobHistory) { `frameworkName = '${encodeName(frameworkName)}' ` + `ORDER BY uid ASC;`; const pgResult = (await sequelize.query(sqlSentence))[0]; - const jobRetries = await Promise.all( - pgResult.map((row) => { - return convertToJobAttempt(JSON.parse(row.data)); - }), - ); - attemptData.push( - ...jobRetries.map((jobRetry) => { - return {...jobRetry, isLatest: false}; - }), - ); + if (pgResult.length === 0) { + throw createError('Not Found', 'NoJobError', `Job ${frameworkName} not found.`); + } else { + const jobRetries = await Promise.all( + pgResult.map((row) => { + return convertToJobAttempt(JSON.parse(row.data)); + }), + ); + attemptData.push( + ...jobRetries.map((jobRetry) => { + return {...jobRetry, isLatest: false}; + }), + ); + } return {status: 200, data: attemptData}; }; @@ -81,7 +86,7 @@ if (sequelize && launcherConfig.enabledJobHistory) { const pgResult = (await sequelize.query(sqlSentence))[0]; if (pgResult.length === 0) { - return {status: 404, data: null}; + throw createError('Not Found', 'NoJobError', `Job ${frameworkName} with attemptIndex ${jobAttemptIndex} not found.`); } else { attemptFramework = JSON.parse(pgResult[0].data); const attemptDetail = await convertToJobAttempt(attemptFramework); From bd673053079573c369a09d886470015665a0c994 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Tue, 21 Jul 2020 15:17:14 +0800 Subject: [PATCH 13/14] fix typo --- src/rest-server/src/models/v2/job-attempt.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rest-server/src/models/v2/job-attempt.js b/src/rest-server/src/models/v2/job-attempt.js index b990afd2da..fc64f317a6 100644 --- a/src/rest-server/src/models/v2/job-attempt.js +++ b/src/rest-server/src/models/v2/job-attempt.js @@ -71,8 +71,8 @@ if (sequelize && launcherConfig.enabledJobHistory) { return {...jobRetry, isLatest: false}; }), ); + return {status: 200, data: attemptData}; } - return {status: 200, data: attemptData}; }; const get = async (frameworkName, jobAttemptIndex) => { From 72508719ea7db0a78efe7d27ab838499981b7081 Mon Sep 17 00:00:00 2001 From: suiguoxin Date: Tue, 21 Jul 2020 17:11:39 +0800 Subject: [PATCH 14/14] consistent with older API --- .../lib/fluent/plugin/out_pgjson.rb | 1 + src/rest-server/src/models/v2/job-attempt.js | 128 ++++++++++++++---- 2 files changed, 103 insertions(+), 26 deletions(-) diff --git a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb index c210980bdb..4c23e50c0c 100644 --- a/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb +++ b/src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb @@ -56,6 +56,7 @@ class PgJsonOutput < Fluent::Plugin::Output config_param :time_format, :string, default: "%F %T.%N %z" config_param :reset_connection_interval, :integer, default: 5 + desc "Control the reset connection interval to be greater than 5 seconds globally" config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE diff --git a/src/rest-server/src/models/v2/job-attempt.js b/src/rest-server/src/models/v2/job-attempt.js index fc64f317a6..b621fd2ef3 100644 --- a/src/rest-server/src/models/v2/job-attempt.js +++ b/src/rest-server/src/models/v2/job-attempt.js @@ -17,12 +17,14 @@ // module dependencies const crypto = require('crypto'); +const {isNil} = require('lodash'); const {convertToJobAttempt} = require('@pai/utils/frameworkConverter'); const launcherConfig = require('@pai/config/launcher'); +const createError = require('@pai/utils/error'); +const k8sModel = require('@pai/models/kubernetes/kubernetes'); const logger = require('@pai/config/logger'); const {sequelize} = require('@pai/utils/postgresUtil'); -const createError = require('@pai/utils/error'); const convertName = (name) => { // convert framework name to fit framework controller spec @@ -52,45 +54,119 @@ if (sequelize && launcherConfig.enabledJobHistory) { const list = async (frameworkName) => { let attemptData = []; + let uid; + + // get latest framework from k8s API + let response; + try { + response = await k8sModel.getClient().get( + launcherConfig.frameworkPath(encodeName(frameworkName)), + { + headers: launcherConfig.requestHeaders, + } + ); + } catch (error) { + logger.error(`error when getting framework from k8s api: ${error.message}`); + if (error.response != null) { + response = error.response; + } else { + throw error; + } + } + + if (response.status === 200) { + // get UID from k8s framework API + uid = response.data.metadata.uid; + attemptData.push({ + ...(await convertToJobAttempt(response.data)), + isLatest: true, + }); + } else if (response.status === 404) { + logger.warn(`could not get framework ${uid} from k8s: ${JSON.stringify(response)}`); + return {status: 404, data: null}; + } else { + throw createError(response.status, 'UnknownError', response.data.message); + } + + if (isNil(uid)) { + return {status: 404, data: null}; + } const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` + `frameworkName = '${encodeName(frameworkName)}' ` + `ORDER BY uid ASC;`; const pgResult = (await sequelize.query(sqlSentence))[0]; - if (pgResult.length === 0) { - throw createError('Not Found', 'NoJobError', `Job ${frameworkName} not found.`); - } else { - const jobRetries = await Promise.all( - pgResult.map((row) => { - return convertToJobAttempt(JSON.parse(row.data)); - }), - ); - attemptData.push( - ...jobRetries.map((jobRetry) => { - return {...jobRetry, isLatest: false}; - }), - ); - return {status: 200, data: attemptData}; - } + const jobRetries = await Promise.all( + pgResult.map((row) => { + return convertToJobAttempt(JSON.parse(row.data)); + }), + ); + attemptData.push( + ...jobRetries.map((jobRetry) => { + return {...jobRetry, isLatest: false}; + }), + ); + return {status: 200, data: attemptData}; }; const get = async (frameworkName, jobAttemptIndex) => { + let uid; let attemptFramework; + let response; - const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` + - `frameworkName = '${encodeName(frameworkName)}' and ` + - `attemptIndex = '${jobAttemptIndex}' ` + - `ORDER BY uid ASC;`; - - const pgResult = (await sequelize.query(sqlSentence))[0]; + try { + response = await k8sModel.getClient().get( + launcherConfig.frameworkPath(encodeName(frameworkName)), + { + headers: launcherConfig.requestHeaders, + } + ); + } catch (error) { + logger.error(`error when getting framework from k8s api: ${error.message}`); + if (error.response != null) { + response = error.response; + } else { + throw error; + } + } - if (pgResult.length === 0) { - throw createError('Not Found', 'NoJobError', `Job ${frameworkName} with attemptIndex ${jobAttemptIndex} not found.`); + if (response.status === 200) { + // get uid from k8s framwork API + uid = response.data.metadata.uid; + attemptFramework = response.data; + } else if (response.status === 404) { + logger.warn(`could not get framework ${uid} from k8s: ${JSON.stringify(response)}`); + return {status: 404, data: null}; } else { - attemptFramework = JSON.parse(pgResult[0].data); + throw createError(response.status, 'UnknownError', response.data.message); + } + + if (jobAttemptIndex < attemptFramework.spec.retryPolicy.maxRetryCount) { + if (isNil(uid)) { + return {status: 404, data: null}; + } + const sqlSentence = `SELECT snapshot as data FROM framework_history WHERE ` + + `frameworkName = '${encodeName(frameworkName)}' and ` + + `attemptIndex = '${jobAttemptIndex}' ` + + `ORDER BY uid ASC;`; + const pgResult = (await sequelize.query(sqlSentence))[0]; + + if (pgResult.length === 0) { + return {status: 404, data: null}; + } else { + attemptFramework = JSON.parse(pgResult[0].data); + const attemptDetail = await convertToJobAttempt(attemptFramework); + return {status: 200, data: {...attemptDetail, isLatest: false}}; + } + } else if ( + jobAttemptIndex === attemptFramework.spec.retryPolicy.maxRetryCount + ) { + // get latest frameworks from k8s API const attemptDetail = await convertToJobAttempt(attemptFramework); - return {status: 200, data: {...attemptDetail, isLatest: false}}; + return {status: 200, data: {...attemptDetail, isLatest: true}}; + } else { + return {status: 404, data: null}; } };