Skip to content

Commit 83ceb34

Browse files
authored
Merge pull request #221 from 0kate/add-missing-location-arguments
add missing location arguments for JobReference struct
2 parents 458bca5 + bc6c534 commit 83ceb34

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

lib/fluent/plugin/bigquery/writer.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s
144144
configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id
145145

146146
begin
147-
# Check table existance
148-
client.get_table(project, dataset, table_id)
147+
# Check table existance and use its location for the result when the load jobs is duplicated.
148+
table = client.get_table(project, dataset, table_id)
149149
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
150150
if e.status_code == 404 && /Not Found: Table/i =~ e.message
151151
raise Fluent::BigQuery::UnRetryableError.new("Table is not found") unless @options[:auto_create_table]
@@ -167,7 +167,9 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s
167167
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message
168168

169169
if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
170-
return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id)
170+
# If a load job is duplicated, the API response may not be available to create the result.
171+
# Therefore, we need to use the location of the table instead of the job's location to determine the result.
172+
return JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, job_id, table.location)
171173
end
172174

173175
raise Fluent::BigQuery::Error.wrap(e)

0 commit comments

Comments
 (0)