Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,6 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
auto current = internal::MakeImmutableOptions(std::move(p.options));
auto request = p.request;
std::int64_t persisted_size = 0;
std::shared_ptr<storage::internal::HashFunction> hash_function =
CreateHashFunction(*current);
auto retry =
std::shared_ptr<storage::AsyncRetryPolicy>(retry_policy(*current));
auto backoff =
Expand Down Expand Up @@ -404,17 +402,28 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
auto pending = factory(std::move(request));
return pending.then(
[current, request = std::move(p.request), persisted_size,
hash = std::move(hash_function), fa = std::move(factory)](auto f) mutable
fa = std::move(factory)](auto f) mutable
-> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
auto rpc = f.get();
if (!rpc) return std::move(rpc).status();

std::shared_ptr<storage::internal::HashFunction> hash;
std::unique_ptr<AsyncWriterConnectionImpl> impl;

if (rpc->first_response.has_resource()) {
auto const& resource = rpc->first_response.resource();
if (current->get<storage::EnableCrc32cValidationOption>() &&
resource.has_checksums() && resource.checksums().has_crc32c()) {
hash = std::make_shared<storage::internal::Crc32cHashFunction>(
resource.checksums().crc32c(), resource.size());
} else {
hash = CreateHashFunction(*current);
}
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash,
rpc->first_response.resource(), false);
current, request, std::move(rpc->stream), hash, resource, false);
} else {
persisted_size = rpc->first_response.persisted_size();
hash = CreateHashFunction(*current);
impl = std::make_unique<AsyncWriterConnectionImpl>(
current, request, std::move(rpc->stream), hash, persisted_size,
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/storage/async/options.h"
#include "google/cloud/storage/async/retry_policy.h"
#include "google/cloud/storage/async/writer_connection.h"
#include "google/cloud/storage/internal/async/connection_impl.h"
#include "google/cloud/storage/internal/async/default_options.h"
#include "google/cloud/storage/testing/canonical_errors.h"
#include "google/cloud/storage/internal/crc32c.h"
#include "google/cloud/storage/testing/mock_storage_stub.h"
#include "google/cloud/common_options.h"
#include "google/cloud/grpc_options.h"
Expand Down Expand Up @@ -627,6 +629,139 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) {
next.first.set_value(true);
}

TEST_F(AsyncConnectionImplAppendableTest,
StartAppendableObjectUploadWithChecksum) {
auto constexpr kRequestText = R"pb(
write_object_spec {
resource {
bucket: "projects/_/buckets/test-bucket"
name: "test-object"
content_type: "text/plain"
}
}
)pb";
AsyncSequencer<bool> sequencer;
auto mock = std::make_shared<storage::testing::MockStorageStub>();

google::storage::v2::Object initial_resource;
initial_resource.set_bucket("projects/_/buckets/test-bucket");
initial_resource.set_name("test-object");
initial_resource.set_size(1024);
initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC

auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return sequencer.PushBack("Start");
});

EXPECT_CALL(*stream, Read)
.WillOnce([&, initial_resource] {
return sequencer.PushBack("Read(Takeover)")
.then([initial_resource](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
*response.mutable_resource() = initial_resource;
return absl::make_optional(std::move(response));
});
})
.WillOnce([&, initial_resource] {
return sequencer.PushBack("Read(FinalObject)")
.then([initial_resource](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
*response.mutable_resource() = initial_resource;
response.mutable_resource()->set_size(
initial_resource.size() + 9); // "some data" size is 9
return absl::make_optional(std::move(response));
});
});

EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
});

EXPECT_CALL(*stream, Write)
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_TRUE(request.state_lookup());
EXPECT_FALSE(wopt.is_last_message());
return sequencer.PushBack("Write(StateLookup)");
})
.WillOnce(
[&](google::storage::v2::BidiWriteObjectRequest const& /*request*/,
grpc::WriteOptions wopt) {
EXPECT_FALSE(wopt.is_last_message());
return sequencer.PushBack("Write(data)");
})
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_TRUE(request.finish_write());
EXPECT_TRUE(wopt.is_last_message());
// Here we expect full checksums to be set because we had the resource
// in takeover.
EXPECT_TRUE(request.has_object_checksums());
Comment thread
v-pratap marked this conversation as resolved.
auto expected_crc =
google::cloud::storage_internal::ExtendCrc32c(12345, "some data");
EXPECT_EQ(request.object_checksums().crc32c(), expected_crc);
return sequencer.PushBack("Write(Finalize)");
});

EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] {
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
});

internal::AutomaticallyCreatedBackgroundThreads pool(1);
// Enable CRC32C validation in options
auto options = TestOptions().set<storage::EnableCrc32cValidationOption>(true);
auto connection = MakeTestConnection(pool.cq(), mock, options);

auto request = google::storage::v2::BidiWriteObjectRequest{};
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
request.mutable_write_object_spec()->set_appendable(true);

auto pending = connection->StartAppendableObjectUpload(
{std::move(request), connection->options()});

auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(StateLookup)");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read(Takeover)");
next.first.set_value(true);

auto r = pending.get();
ASSERT_STATUS_OK(r);
auto writer = *std::move(r);

// Write some data.
auto w1 = writer->Write(storage::WritePayload("some data"));
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(data)");
next.first.set_value(true);
EXPECT_STATUS_OK(w1.get());

// Finalize the upload.
auto w2 = writer->Finalize({});
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(Finalize)");
next.first.set_value(true);
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read(FinalObject)");
next.first.set_value(true);

auto response = w2.get();
ASSERT_STATUS_OK(response);

writer.reset();
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
next.first.set_value(true);
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
31 changes: 27 additions & 4 deletions google/cloud/storage/internal/async/writer_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,14 @@ AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) {

auto p = WritePayloadImpl::GetImpl(payload);
auto size = p.size();
auto action = request_.has_append_object_spec() ||
request_.write_object_spec().appendable()
? PartialUpload::kFinalize
: PartialUpload::kFinalizeWithChecksum;
auto action = PartialUpload::kFinalizeWithChecksum;
if (request_.has_append_object_spec() ||
request_.write_object_spec().appendable()) {
if (!absl::holds_alternative<google::storage::v2::Object>(
persisted_state_)) {
action = PartialUpload::kFinalize;
}
}
auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write),
std::move(p), std::move(action));
return coro->Start().then([coro, size, this](auto f) mutable {
Expand Down Expand Up @@ -256,7 +260,26 @@ future<StatusOr<std::int64_t>> AsyncWriterConnectionImpl::OnQuery(
latest_write_handle_ = response->write_handle();
}
if (response->has_persisted_size()) {
absl::optional<google::storage::v2::Object> old_obj;
if (absl::holds_alternative<google::storage::v2::Object>(
persisted_state_)) {
old_obj = absl::get<google::storage::v2::Object>(persisted_state_);
}

persisted_state_ = response->persisted_size();

if (response->has_persisted_data_checksums()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that: if a connection retry is triggered during the session, and the server's reported persisted_size causes a resend that overlaps with (but is not exactly aligned to) the client's previous chunk boundaries, the request fails?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the request will fail if we try to send overlapping data.
I think the server strictly requires the next write to start exactly at the persisted_size it has saved.

Copy link
Copy Markdown
Contributor

@kalragauri kalragauri May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, in writer_connection_resumed.cc, when a resume occurs, the client removes the already-persisted bytes from its local buffer and updates its internal buffer_offset_ but it shares the same hash_function_ instance across retry attempts:

options_, initial_request_, std::move(res->stream), hash_function_,

If the server's persisted_size is not aligned with the client's original chunk boundaries, the client will resend a chunk starting from the new persisted_offset. If this chunk is large enough to extend past the old minimum_offset_ of the shared hash function, the Update call in partial_upload.cc will fail:

return InvalidArgumentError("mismatched offset", GCP_ERROR_INFO());

This is an edge case but if the fix is straightforward, pls include it in this PR.

auto const& checksums = response->persisted_data_checksums();
if (checksums.has_crc32c()) {
google::storage::v2::Object obj;
obj.set_size(response->persisted_size());
*obj.mutable_checksums() = checksums;
if (old_obj) {
obj.set_generation(old_obj->generation());
}
persisted_state_ = obj;
}
}
return make_ready_future(make_status_or(response->persisted_size()));
}
if (response->has_resource()) {
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/storage/internal/hash_function_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class MD5HashFunction : public HashFunction {
class Crc32cHashFunction : public HashFunction {
public:
Crc32cHashFunction() = default;
explicit Crc32cHashFunction(std::uint32_t initial_crc,
std::int64_t initial_offset)
: current_(initial_crc), minimum_offset_(initial_offset) {}

Crc32cHashFunction(Crc32cHashFunction const&) = delete;
Crc32cHashFunction& operator=(Crc32cHashFunction const&) = delete;
Expand Down
Loading