Skip to content

Commit d13c782

Browse files
authored
Merge pull request #8 from ykoyano/feature/add-asynchronous-method
add asynchronous_method and related option
2 parents cbfb729 + f509e12 commit d13c782

File tree

2 files changed

+66
-18
lines changed

2 files changed

+66
-18
lines changed

embulk-input-bigquery.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ Gem::Specification.new do |spec|
2020

2121
spec.add_development_dependency "bundler", "~> 1.3"
2222
spec.add_development_dependency "rake"
23-
spec.add_dependency "google-cloud-bigquery", '~> 0.23'
23+
spec.add_dependency "google-cloud-bigquery", '~> 0.26.0'
2424
end

lib/embulk/input/bigquery.rb

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,34 @@ def self.transaction(config, &control)
2727
sql: sql,
2828
columns: config[:columns],
2929
params: params,
30+
synchronous_method: config[:synchronous_method],
31+
asynchronous_method: config[:asynchronous_method],
32+
dataset: config[:dataset],
33+
table: config[:table],
3034
option: {
31-
max: config[:max],
3235
cache: config[:cache],
33-
timeout: config[:timeout],
34-
dryrun: config[:dryrun],
3536
standard_sql: config[:standard_sql],
36-
legacy_sql: config[:legacy_sql]
37+
legacy_sql: config[:legacy_sql],
3738
}
3839
}
3940

41+
if task[:synchronous_method] || !task[:asynchronous_method]
42+
task[:option].merge!(
43+
{
44+
max: config[:max],
45+
timeout: config[:timeout],
46+
dryrun: config[:dryrun],
47+
}
48+
)
49+
else
50+
task[:option].merge!(
51+
{
52+
large_results: config[:legacy_sql],
53+
write: config[:write],
54+
}
55+
)
56+
end
57+
4058
columns = []
4159
config[:columns].each_with_index do |c, i|
4260
columns << Column.new(i, c['name'], c['type'].to_sym)
@@ -52,26 +70,56 @@ def run
5270
params = @task[:params]
5371
@task[:columns] = values_to_sym(@task[:columns], 'name')
5472
option = keys_to_sym(@task[:option])
55-
rows = bq.query(@task[:sql], **option)
56-
57-
@task[:columns] = values_to_sym(@task[:columns], 'name')
58-
59-
rows.each do |row|
60-
columns = []
61-
@task[:columns].each do |c|
62-
val = row[c['name']]
63-
if c['eval']
64-
val = eval(c['eval'], binding)
73+
if @task[:synchronous_method] || @task[:asynchronous_method].nil?
74+
run_synchronous_query(bq, option)
75+
else
76+
if @task[:dataset]
77+
dataset = bq.dataset(@task[:dataset])
78+
option[:table] = dataset.table(@task[:table])
79+
if option[:table].nil?
80+
option[:table] = dataset.create_table(@task[:table])
6581
end
66-
columns << val
6782
end
68-
69-
@page_builder.add(columns)
83+
run_asynchronous_query(bq, option)
7084
end
7185
@page_builder.finish
7286
return {}
7387
end
7488

89+
def run_synchronous_query(bq, option)
90+
rows = bq.query(@task[:sql], **option)
91+
rows.each do |row|
92+
record = extract_record(row)
93+
@page_builder.add(record)
94+
end
95+
end
96+
97+
def run_asynchronous_query(bq, option)
98+
job = bq.query_job(@task[:sql], **option)
99+
job.wait_until_done!
100+
return {} if job.failed?
101+
results = job.query_results
102+
while results
103+
results.each do |row|
104+
record = extract_record(row)
105+
@page_builder.add(record)
106+
end
107+
results = results.next
108+
end
109+
end
110+
111+
def extract_record(row)
112+
columns = []
113+
@task[:columns].each do |c|
114+
val = row[c['name']]
115+
if c['eval']
116+
val = eval(c['eval'], binding)
117+
end
118+
columns << val
119+
end
120+
return columns
121+
end
122+
75123
def values_to_sym(hashs, key)
76124
hashs.map do |h|
77125
h[key] = h[key].to_sym

0 commit comments

Comments
 (0)