Skip to content

Commit 454f83e

Browse files
committed
Merge commit for internal changes
2 parents d238a56 + e7a670f commit 454f83e

36 files changed

+1384
-68
lines changed

WORKSPACE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ local_repository(
44
)
55

66
load('//tensorflow/tensorflow:workspace.bzl', 'tf_workspace')
7-
tf_workspace("tensorflow/")
7+
tf_workspace("tensorflow/", "@tf")
88

99
# ===== gRPC dependencies =====
1010

tensorflow

Submodule tensorflow updated 541 files

tensorflow_serving/core/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ cc_test(
309309
":eager_load_policy",
310310
":servable_state",
311311
"//tensorflow_serving/core/test_util:dynamic_manager_test_util",
312+
"//tensorflow_serving/core/test_util:mock_loader",
312313
"//tensorflow_serving/core/test_util:test_main",
313314
"//tensorflow_serving/util:any_ptr",
314315
"//tensorflow_serving/util:event_bus",

tensorflow_serving/core/dynamic_manager.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,10 @@ void DynamicManager::SetAspiredVersions(
356356
loader = version.ConsumeDataOrDie();
357357
}
358358
std::shared_ptr<LoaderHarness> harness =
359-
std::make_shared<LoaderHarness>(version.id(), std::move(loader));
359+
std::make_shared<LoaderHarness>(
360+
version.id(), std::move(loader),
361+
LoaderHarness::Options{options_.max_num_load_tries,
362+
options_.load_retry_interval_micros});
360363
if (!version.status().ok()) {
361364
LOG(ERROR) << "Version error: " << version.status().ToString();
362365
harness->Error(version.status());

tensorflow_serving/core/dynamic_manager.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ class DynamicManager final : public Manager,
7979
// EventBus to publish servable state changes. This is optional, if unset,
8080
// we don't publish.
8181
EventBus<ServableState>* servable_event_bus = nullptr;
82+
83+
// Maximum number of times we try to load a servable before we give up.
84+
int max_num_load_tries = 5;
85+
86+
// The interval, in microseconds, between each servable load retry.
87+
// Default: 1min.
88+
int64 load_retry_interval_micros = 1 * 60 * 1000 * 1000;
8289
};
8390

8491
explicit DynamicManager(Options options);

tensorflow_serving/core/dynamic_manager_test.cc

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,25 @@ limitations under the License.
2828
#include "tensorflow_serving/core/eager_load_policy.h"
2929
#include "tensorflow_serving/core/servable_state.h"
3030
#include "tensorflow_serving/core/test_util/dynamic_manager_test_util.h"
31+
#include "tensorflow_serving/core/test_util/mock_loader.h"
3132
#include "tensorflow_serving/util/any_ptr.h"
3233
#include "tensorflow_serving/util/event_bus.h"
3334

3435
namespace tensorflow {
3536
namespace serving {
3637

3738
using ::testing::Eq;
39+
using ::testing::Invoke;
40+
using ::testing::NiceMock;
41+
using ::testing::Return;
3842
using ::testing::UnorderedElementsAreArray;
3943

4044
namespace {
4145

4246
class FakeLoader : public Loader {
4347
public:
44-
explicit FakeLoader(int64 servable) : FakeLoader(servable, false) {}
45-
46-
FakeLoader(int64 servable, const bool errors_on_load)
47-
: servable_(servable), errors_on_load_(errors_on_load) {
48-
++num_fake_loaders_;
49-
}
48+
explicit FakeLoader(int64 servable, const bool errors_on_load = false)
49+
: servable_(servable), errors_on_load_(errors_on_load) {}
5050
~FakeLoader() override { --num_fake_loaders_; }
5151

5252
Status Load() override {
@@ -83,13 +83,15 @@ class DynamicManagerTest : public ::testing::Test {
8383
LOG(INFO) << "Published state: " << state.DebugString();
8484
last_published_servable_state_ = state;
8585
});
86-
DynamicManager::Options options;
8786
// The state manager thread won't be run automatically.
88-
options.manage_state_interval_micros = -1;
89-
options.env = Env::Default();
90-
options.version_policy.reset(new EagerLoadPolicy());
91-
options.servable_event_bus = servable_event_bus_.get();
92-
manager_.reset(new DynamicManager(std::move(options)));
87+
dynamic_manager_options_.manage_state_interval_micros = -1;
88+
dynamic_manager_options_.env = Env::Default();
89+
dynamic_manager_options_.version_policy.reset(new EagerLoadPolicy());
90+
dynamic_manager_options_.servable_event_bus = servable_event_bus_.get();
91+
dynamic_manager_options_.max_num_load_tries = 2;
92+
dynamic_manager_options_.load_retry_interval_micros = 0;
93+
// dynamic_manager_options_.load_retry_interval_micros = 0;
94+
manager_.reset(new DynamicManager(std::move(dynamic_manager_options_)));
9395
}
9496

9597
// Creates an aspired-versions entry with 'id' and a FakeLoader whose servable
@@ -138,6 +140,7 @@ class DynamicManagerTest : public ::testing::Test {
138140
std::unique_ptr<EventBus<ServableState>::Subscription>
139141
servable_state_subscription_;
140142
ServableState last_published_servable_state_;
143+
DynamicManager::Options dynamic_manager_options_;
141144
std::unique_ptr<DynamicManager> manager_;
142145
};
143146

@@ -578,6 +581,51 @@ TEST_F(DynamicManagerTest, NoEventBus) {
578581
std::move(aspired_versions));
579582
}
580583

584+
TEST_F(DynamicManagerTest, RetryOnLoadErrorFinallySucceeds) {
585+
std::vector<ServableData<std::unique_ptr<Loader>>> aspired_versions;
586+
587+
test_util::MockLoader* loader = new NiceMock<test_util::MockLoader>;
588+
// Prevents it being changed without our knowledge.
589+
CHECK_EQ(dynamic_manager_options_.max_num_load_tries, 2);
590+
// We succeed on the last load, before the manager gives up.
591+
EXPECT_CALL(*loader, Load())
592+
.WillOnce(Return(errors::Internal("Error on load.")))
593+
.WillOnce(Return(Status::OK()));
594+
595+
const ServableId id = {kServableName, 7};
596+
aspired_versions.push_back({id, std::unique_ptr<Loader>(loader)});
597+
manager_->GetAspiredVersionsCallback()(kServableName,
598+
std::move(aspired_versions));
599+
600+
RunManageState();
601+
602+
const ServableState available_state = {
603+
{kServableName, 7},
604+
true,
605+
ServableState::ManagerState::kAvailable,
606+
Status::OK()};
607+
EXPECT_THAT(last_published_servable_state_,
608+
EqualsServableState(available_state));
609+
}
610+
611+
TEST_F(DynamicManagerTest, RetryOnLoadErrorFinallyFails) {
612+
std::vector<ServableData<std::unique_ptr<Loader>>> aspired_versions;
613+
const ServableId id = {kServableName, 7};
614+
// We always fail.
615+
std::unique_ptr<Loader> loader(new FakeLoader(7, true /* errors_on_load */));
616+
aspired_versions.push_back({id, std::move(loader)});
617+
manager_->GetAspiredVersionsCallback()(kServableName,
618+
std::move(aspired_versions));
619+
620+
RunManageState();
621+
622+
const ServableState error_state = {{kServableName, 7},
623+
true,
624+
ServableState::ManagerState::kEnd,
625+
errors::Internal("Error on load.")};
626+
EXPECT_THAT(last_published_servable_state_, EqualsServableState(error_state));
627+
}
628+
581629
} // namespace
582630
} // namespace serving
583631
} // namespace tensorflow

tensorflow_serving/core/loader_harness.cc

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@ limitations under the License.
1717

1818
#include <algorithm>
1919

20+
#include "tensorflow/core/platform/env.h"
21+
2022
namespace tensorflow {
2123
namespace serving {
2224

2325
LoaderHarness::LoaderHarness(const ServableId& id,
2426
std::unique_ptr<Loader> loader)
25-
: id_(id), loader_(std::move(loader)) {
27+
: LoaderHarness(id, std::move(loader), Options()) {}
28+
29+
LoaderHarness::LoaderHarness(const ServableId& id,
30+
std::unique_ptr<Loader> loader,
31+
const Options& options)
32+
: id_(id), loader_(std::move(loader)), options_(options) {
2633
VLOG(1) << "New aspired servable version " << id_;
2734
}
2835

@@ -48,7 +55,26 @@ Status LoaderHarness::Load() {
4855
state_ = kLoading;
4956
VLOG(1) << "Loading servable version " << id_;
5057
}
51-
const Status status = loader_->Load();
58+
59+
const Status status = [&]() {
60+
Status load_status;
61+
int num_tries = 0;
62+
do {
63+
if (num_tries > 0) {
64+
if (options_.load_retry_interval_micros > 0) {
65+
Env::Default()->SleepForMicroseconds(
66+
options_.load_retry_interval_micros);
67+
}
68+
LOG(INFO) << "Retrying load on servable version: " << id_
69+
<< " retry: " << num_tries;
70+
}
71+
load_status = loader_->Load();
72+
++num_tries;
73+
} while (is_aspired() && !load_status.ok() &&
74+
num_tries < options_.max_num_load_tries);
75+
return load_status;
76+
}();
77+
5278
{
5379
mutex_lock l(mu_);
5480
DCHECK_EQ(kLoading, state_);

tensorflow_serving/core/loader_harness.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,17 @@ class LoaderHarness final {
7272
kError
7373
};
7474

75+
struct Options {
76+
// Maximum number of times we try to load a servable before we give up.
77+
int max_num_load_tries;
78+
79+
// The interval, in microseconds, between each servable load retry.
80+
int64 load_retry_interval_micros;
81+
};
82+
7583
LoaderHarness(const ServableId& id, std::unique_ptr<Loader> loader);
84+
LoaderHarness(const ServableId& id, std::unique_ptr<Loader> loader,
85+
const Options& options);
7686

7787
// Legal to destruct iff current state is kNew|kDisabled|kError.
7888
// Check-fails if violated.
@@ -91,9 +101,14 @@ class LoaderHarness final {
91101
// Returns the current overall state snapshot of the underlying Servable.
92102
ServableStateSnapshot loader_state_snapshot() const LOCKS_EXCLUDED(mu_);
93103

94-
// Transitions to kLoading, delegates to Servable::Load(), then
95-
// transitions either to kReady if Load() succeeds, or to kError if
96-
// Load() fails. This call may take a long time.
104+
// Transitions to kLoading, delegates to Servable::Load(), then transitions
105+
// either to kReady if Load() succeeds, or to kError if Load() fails. This
106+
// call may take a long time.
107+
//
108+
// We retry the Servable::Load() according to the options set during
109+
// construction of this class. We stop retrying and give up if 1. we have
110+
// reached max_num_load_tries or, 2. if is_aspired is set to false.
111+
//
97112
// Legal to call iff current state is kNew. Check-fails if violated.
98113
Status Load() LOCKS_EXCLUDED(mu_);
99114

@@ -140,6 +155,7 @@ class LoaderHarness final {
140155

141156
const ServableId id_;
142157
const std::unique_ptr<Loader> loader_;
158+
const Options options_;
143159
mutable mutex mu_;
144160
State state_ GUARDED_BY(mu_) = kNew;
145161
bool is_aspired_ GUARDED_BY(mu_) = true;

tensorflow_serving/core/loader_harness_test.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ limitations under the License.
2828

2929
using ::testing::HasSubstr;
3030
using ::testing::InvokeWithoutArgs;
31+
using ::testing::InSequence;
3132
using ::testing::IsNull;
33+
using ::testing::NiceMock;
3234
using ::testing::Return;
3335
using ::testing::ReturnRef;
3436
using ::testing::StrictMock;
@@ -160,6 +162,60 @@ TEST(ServableVersionHarnessTest, LoadError) {
160162
EXPECT_EQ(LoaderHarness::kError, harness.state());
161163
}
162164

165+
TEST(ServableVersionHarnessTest, RetryOnLoadErrorFinallySucceeds) {
166+
test_util::MockLoader* loader = new NiceMock<test_util::MockLoader>;
167+
LoaderHarness harness(
168+
ServableId{"test", 0}, std::unique_ptr<Loader>(loader),
169+
{2 /* max_num_load_tries */, 1 /* load_retry_interval_micros */});
170+
171+
EXPECT_CALL(*loader, Load())
172+
.WillOnce(InvokeWithoutArgs(
173+
[]() { return Status(error::UNKNOWN, "test load error"); }))
174+
.WillOnce(InvokeWithoutArgs([]() { return Status::OK(); }));
175+
const Status status = harness.Load();
176+
TF_EXPECT_OK(status);
177+
178+
QuiesceAndUnload(&harness);
179+
}
180+
181+
// Tests cancelling a load by setting is_aspired to false,
182+
TEST(ServableVersionHarnessTest, RetryOnLoadErrorCancelledLoad) {
183+
test_util::MockLoader* loader = new NiceMock<test_util::MockLoader>;
184+
LoaderHarness harness(ServableId{"test", 0}, std::unique_ptr<Loader>(loader),
185+
{10 /* max_num_load_tries */, -1});
186+
187+
Notification load_called;
188+
Notification load_should_return;
189+
EXPECT_CALL(*loader, Load())
190+
.WillOnce(InvokeWithoutArgs([&load_called, &load_should_return]() {
191+
load_called.Notify();
192+
load_should_return.WaitForNotification();
193+
return Status(error::UNKNOWN, "test load error");
194+
}))
195+
.WillRepeatedly(InvokeWithoutArgs([]() { return Status::OK(); }));
196+
std::unique_ptr<Thread> test_thread(
197+
Env::Default()->StartThread(ThreadOptions(), "test", [&harness]() {
198+
const Status status = harness.Load();
199+
EXPECT_THAT(status.error_message(), HasSubstr("test load error"));
200+
}));
201+
load_called.WaitForNotification();
202+
harness.set_is_aspired(false);
203+
load_should_return.Notify();
204+
}
205+
206+
TEST(ServableVersionHarnessTest, RetryOnLoadErrorFinallyFails) {
207+
test_util::MockLoader* loader = new NiceMock<test_util::MockLoader>;
208+
LoaderHarness harness(ServableId{"test", 0}, std::unique_ptr<Loader>(loader),
209+
{2 /* max_num_load_tries */, -1});
210+
211+
EXPECT_CALL(*loader, Load())
212+
.Times(2)
213+
.WillRepeatedly(InvokeWithoutArgs(
214+
[]() { return Status(error::UNKNOWN, "test load error"); }));
215+
const Status status = harness.Load();
216+
EXPECT_THAT(status.error_message(), HasSubstr("test load error"));
217+
}
218+
163219
TEST(LoaderHarnessTest, ExternallySignalledError) {
164220
LoaderHarness harness(ServableId{"test", 0}, nullptr);
165221
EXPECT_EQ(LoaderHarness::State::kNew, harness.state());

tensorflow_serving/core/test_util/BUILD

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,20 @@ cc_library(
8080
testonly = 1,
8181
srcs = ["dynamic_manager_test_util.cc"],
8282
hdrs = ["dynamic_manager_test_util.h"],
83-
visibility = ["//tensorflow_serving:internal"],
8483
deps = [
8584
"//tensorflow_serving/core:dynamic_manager",
8685
],
8786
)
87+
88+
cc_library(
89+
name = "availability_test_util",
90+
testonly = 1,
91+
srcs = ["availability_test_util.cc"],
92+
hdrs = ["availability_test_util.h"],
93+
visibility = ["//visibility:public"],
94+
deps = [
95+
"@tf//tensorflow/core:lib",
96+
"//external:gtest",
97+
"//tensorflow_serving/core:servable_state_monitor",
98+
],
99+
)

0 commit comments

Comments
 (0)