Skip to content
This repository was archived by the owner on Jun 6, 2024. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fluentd fix
  • Loading branch information
hzy46 committed Aug 10, 2020
commit 2dce698440486e2aaaed391fbc60da9fd586ced5
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "pg"
require "yajl"
require "json"
require 'digest'

module Fluent::Plugin
class PgJsonOutput < Fluent::Plugin::Output
Expand Down Expand Up @@ -152,11 +153,12 @@ def write(chunk)
log.debug "log type: #{kind}"
if kind == "Framework"
thread[:conn].exec("COPY framework_history (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@historyType_col}\", \"#{@snapshot_col}\") FROM STDIN WITH DELIMITER E'\\x01'")
uid = (0...36).map { (65 + rand(26)).chr }.join
frameworkName = record["objectSnapshot"]["metadata"]["name"]
attemptIndex = record["objectSnapshot"]["status"]["attemptStatus"]["id"]
historyType = "retry"
snapshot = record_value(record["objectSnapshot"])
# use frameworkName + attemptIndex + historyType to generate a uid
uid = Digest::MD5.hexdigest "#{frameworkName}+#{attemptIndex}+#{historyType}"
thread[:conn].put_copy_data "#{time}\x01#{time}\x01#{uid}\x01#{frameworkName}\x01#{attemptIndex}\x01#{historyType}\x01#{snapshot}\n"
elsif kind == "Pod"
Copy link
Member

@yqwang-ms yqwang-ms Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment that if duplicate, what is the behaiour here?
Crash, retry, ignore or log #Closed

Copy link
Contributor Author

@hzy46 hzy46 Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will raise an error and log the error. #Closed

thread[:conn].exec("COPY pods (\"#{@insertedAt_col}\", \"#{@updatedAt_col}\", \"#{@uid_col}\", \"#{@frameworkName_col}\", \"#{@attemptIndex_col}\", \"#{@taskroleName_col}\", \"#{@taskroleIndex_col}\", \"#{@taskAttemptIndex_col}\", \"#{@snapshot_col}\") FROM STDIN WITH DELIMITER E'\\x01'")
Expand Down