Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion bigtable-dataflow-parent/bigtable-beam-import/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ limitations under the License.
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
Expand Down Expand Up @@ -217,6 +216,23 @@ limitations under the License.
<version>${hbase.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-emulator</artifactId>
<version>0.124.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>${jsr305.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.bigtable.beam.sequencefiles.CreateTableHelper;
import com.google.cloud.bigtable.beam.sequencefiles.ExportJob;
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob;
import com.google.cloud.bigtable.beam.validation.SyncTableJob;
import java.io.File;
import java.net.URISyntaxException;
import java.util.Arrays;
Expand Down Expand Up @@ -53,6 +54,9 @@ public static void main(String[] args) throws Exception {
case "create-table":
CreateTableHelper.main(subArgs);
break;
case "sync-table":
SyncTableJob.main(subArgs);
break;
default:
usage();
System.exit(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.beam.sequencefiles.ExportJob.ExportOptions;
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob.ImportOptions;
import com.google.cloud.bigtable.beam.validation.SyncTableJob.SyncTableOptions;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.adapters.Adapters;
import com.google.cloud.bigtable.hbase.adapters.read.DefaultReadHooks;
Expand Down Expand Up @@ -72,6 +73,19 @@ public static CloudBigtableTableConfiguration BuildImportConfig(ImportOptions op
return builder.build();
}

/** Builds CloudBigtableTableConfiguration from input runtime parameters for import job. */
public static CloudBigtableTableConfiguration BuildSyncTableConfig(SyncTableOptions opts) {
CloudBigtableTableConfiguration.Builder builder =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(opts.getBigtableProject())
.withInstanceId(opts.getBigtableInstanceId())
.withTableId(opts.getBigtableTableId());
if (opts.getBigtableAppProfileId() != null) {
builder.withAppProfileId(opts.getBigtableAppProfileId());
}
return builder.build();
}

/** Provides a request that is constructed with some attributes. */
private static class RequestValueProvider
implements ValueProvider<ReadRowsRequest>, Serializable {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright 2021 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.beam.validation;

import static com.google.cloud.bigtable.beam.validation.SyncTableUtils.immutableBytesToString;

import com.google.cloud.bigtable.beam.validation.HadoopHashTableSource.RangeHash;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.values.KV;
import org.apache.hadoop.hbase.util.Bytes;

/**
* Buffers the RangeHashes generated by {@link HadoopHashTableSource}. This is an optimization that
* allows {@link ComputeAndValidateHashFromBigtableDoFn} to issue fewer ReadRow APIs with larger row
* ranges.
*
* <p>Hadoop HashTable output is sorted by row-key and contains a row-range and hash. Beam
* Pcollection do not guarantee any ordering. To fetch a batch of ranges in 1 ReadRows operation,
* this source buffers then and outputs a List<RangeHash> guaranteeing the sorted order of ranges.
*
* <p>Emits a batch of sorted RangeHashes keyed by the start key of the first range.
*/
class BufferedHadoopHashTableSource extends BoundedSource<KV<String, List<RangeHash>>> {
Comment thread
vermas2012 marked this conversation as resolved.

private static final long serialVersionUID = 39842743L;

private static final int DEFAULT_BATCH_SIZE = 50;
private static final Coder<KV<String, List<RangeHash>>> CODER =
KvCoder.of(StringUtf8Coder.of(), ListCoder.of(RangeHashCoder.of()));;

// Max number of RangeHashes to buffer.
private final int maxBufferSize;
private final HadoopHashTableSource hashTableSource;

public BufferedHadoopHashTableSource(HadoopHashTableSource source) {
this(source, DEFAULT_BATCH_SIZE);
}

public BufferedHadoopHashTableSource(HadoopHashTableSource hashTableSource, int maxBufferSize) {
this.hashTableSource = hashTableSource;
this.maxBufferSize = maxBufferSize;
}

@Override
public List<? extends BoundedSource<KV<String, List<RangeHash>>>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws IOException {

@SuppressWarnings("unchecked")
List<HadoopHashTableSource> splitHashTableSources =
(List<HadoopHashTableSource>) hashTableSource.split(desiredBundleSizeBytes, options);

List<BufferedHadoopHashTableSource> splitSources =
new ArrayList<>(splitHashTableSources.size());
// Keep the splits same as HashTableSource.
for (HadoopHashTableSource splitHashTableSource : splitHashTableSources) {
// Add the last range for [lastPartition, stopRow).
splitSources.add(new BufferedHadoopHashTableSource(splitHashTableSource));
}
return splitSources;
}

@Override
public Coder<KV<String, List<RangeHash>>> getOutputCoder() {
return CODER;
}

@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
// HashTable data files don't expose a method to estimate size or lineCount.
return hashTableSource.getEstimatedSizeBytes(options);
}

@Override
public BoundedReader<KV<String, List<RangeHash>>> createReader(PipelineOptions options)
throws IOException {
return new BufferedHashBasedReader(this, hashTableSource.createReader(options));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof BufferedHadoopHashTableSource)) {
return false;
}
BufferedHadoopHashTableSource that = (BufferedHadoopHashTableSource) o;
return maxBufferSize == that.maxBufferSize
&& Objects.equal(hashTableSource, that.hashTableSource);
}

@Override
public int hashCode() {
return Objects.hashCode(maxBufferSize, hashTableSource);
}

@Override
public String toString() {
return "BufferedHadoopHashTableSource ["
+ immutableBytesToString(hashTableSource.startRowInclusive)
+ ", "
+ immutableBytesToString(hashTableSource.stopRowExclusive)
+ "), maxBufferSize="
+ maxBufferSize;
}

private static class BufferedHashBasedReader extends BoundedReader<KV<String, List<RangeHash>>> {

private final BoundedReader<RangeHash> hashReader;
private final BufferedHadoopHashTableSource source;

private List<RangeHash> buffer;

public BufferedHashBasedReader(
BufferedHadoopHashTableSource source, BoundedReader<RangeHash> hashReader) {
this.source = source;
this.hashReader = hashReader;
this.buffer = new ArrayList<>(source.maxBufferSize);
}

@Override
public boolean start() throws IOException {
if (!hashReader.start()) {
// HashReader does not have any hashes, return empty reader.
return false;
}
// Start returned true, consume the current RangeHash.
buffer.add(hashReader.getCurrent());
bufferRangeHashes();
// Buffer is not empty, return true to consume the current buffer.
return true;
}

// Reads from hashReader and buffers the RangeHashes.
// Returns true if any RangeHashes were read from hashReader.
private boolean bufferRangeHashes() throws IOException {
boolean readRangeHashes = false;
while (buffer.size() < source.maxBufferSize && hashReader.advance()) {
readRangeHashes = true;
buffer.add(hashReader.getCurrent());
}
return readRangeHashes;
Comment thread
igorbernstein2 marked this conversation as resolved.
}

@Override
public boolean advance() throws IOException {
// Reset the buffer for next batch.
buffer = new ArrayList<>(source.maxBufferSize);

return bufferRangeHashes();
}

@Override
public KV<String, List<RangeHash>> getCurrent() {
// getCurrent only gets called when buffer is not empty.
Preconditions.checkState(
!buffer.isEmpty(), "getCurrent() should only be called when start/advance return true.");
// GroupBy key is a string and not ImmutableBytesWritable because the WritableCoder is not
// deterministic. The outputted PCollection is grouped by the K and needs a deterministic
// coder. Having a String K leads to an unfortunate double encoding, ImmutableBytesWritable->
// HEX string -> UTF8 encoded string. The number of batches are significantly smaller than
// data fetched from Bigtable and should not have meaningful impact on the job performance.
return KV.of(Bytes.toStringBinary(buffer.get(0).startInclusive.copyBytes()), buffer);
}

@Override
public void close() throws IOException {
hashReader.close();
}

@Override
public BoundedSource<KV<String, List<RangeHash>>> getCurrentSource() {
return source;
}
}
}
Loading