Skip to content

[BEAM-12356] Cache and shutdown BigQuery services#14949

Merged
reuvenlax merged 3 commits into
apache:masterfrom
reuvenlax:fix_overcreating_bq_client
Jun 9, 2021
Merged

[BEAM-12356] Cache and shutdown BigQuery services#14949
reuvenlax merged 3 commits into
apache:masterfrom
reuvenlax:fix_overcreating_bq_client

Conversation

@reuvenlax

Copy link
Copy Markdown
Contributor

No description provided.

@reuvenlax reuvenlax requested a review from chamikaramj June 4, 2021 20:50

@chamikaramj chamikaramj left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks.

@reuvenlax reuvenlax force-pushed the fix_overcreating_bq_client branch from 7bb6951 to 244a0cf Compare June 5, 2021 03:27
@reuvenlax

Copy link
Copy Markdown
Contributor Author

Run Java PreCommit

@scwhittle

Copy link
Copy Markdown
Contributor

I patched this cl and am still seeing some of the following, though much less than without this change. Is this expected to fix all of these logs?

From
at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.getDatasetService(BatchedStreamingWrite.java:395)
in the stack it seems like I am using the patch as expected.

"~* Channel ManagedChannelImpl{logId=381, target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! *~
Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true."

exception:
java.lang.RuntimeException: ManagedChannel allocation site
at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.(ManagedChannelOrphanWrapper.java:93)
at io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:53)
at io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:44)
at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:615)
at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:327)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1700(InstantiatingGrpcChannelProvider.java:74)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:220)
at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:227)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:210)
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
at com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create(GrpcBigQueryWriteStub.java:138)
at com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub(BigQueryWriteStubSettings.java:145)
at com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.(BigQueryWriteClient.java:128)
at com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create(BigQueryWriteClient.java:109)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient(BigQueryServicesImpl.java:1255)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800(BigQueryServicesImpl.java:134)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.(BigQueryServicesImpl.java:523)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.(BigQueryServicesImpl.java:451)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:168)
at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.getDatasetService(BatchedStreamingWrite.java:395)
at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:375)
at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:72)
at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$InsertBatchedElements.processElement(BatchedStreamingWrite.java:343)
at org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$InsertBatchedElements$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:763)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1680)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2081)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2363)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:479)
at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:414)
at org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:763)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:266)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:218)
at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient$ConsumerAndData.accept(QueueingBeamFnDataClient.java:315)
at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:218)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:326)
at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:140)
at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:110)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

@reuvenlax

Copy link
Copy Markdown
Contributor Author

@scwhittle can you try again?

@scwhittle

Copy link
Copy Markdown
Contributor

Still seems to be happening, I also added logging to see if the 60 seconds was just not enough and that didn't seem to be the case.

@reuvenlax

Copy link
Copy Markdown
Contributor Author

@scwhittle I'm going to merge this PR, as it seems to remove most of the logs. We can investigate the rest separately; Beam will still create many individual DoFns, so it's possible that we should be caching things at a higher level.

@reuvenlax reuvenlax merged commit 9832f78 into apache:master Jun 9, 2021
@aaltay

aaltay commented Jun 10, 2021

Copy link
Copy Markdown
Member

Are we planning to cherry pick this into Beam 2.31.0?

/cc @apilloud

apilloud pushed a commit to apilloud/beam that referenced this pull request Jun 14, 2021
baeminbo added a commit to baeminbo/java-bigtable-hbase that referenced this pull request Dec 12, 2022
* Reader.close(): fixes googleapis#2658 where original fix googleapis#2782 was rolled back by googleapis#2871 and googleapis#2873. As we don’t use “session” here anymore, I believe we should close “connection” here.
* AbstractCloudBigtableTableDoFn.tearDown(): If an exception happened in ProcessElement, FinishBundle is not executed. We should clean up resources in TearDown. You can see the similar code with BigQueryIO apache/beam#14949.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants