From e49c805ecc6c22dc1866e3ae74a24d8a48369d5f Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 09:09:39 +0000 Subject: [PATCH 01/24] add gradient accumulator --- tensorflow_addons/optimizers/__init__.py | 1 + .../optimizers/gradient_accumulator.py | 179 ++++++++++++++++++ .../tests/gradient_accumulator_test.py | 134 +++++++++++++ 3 files changed, 314 insertions(+) create mode 100644 tensorflow_addons/optimizers/gradient_accumulator.py create mode 100644 tensorflow_addons/optimizers/tests/gradient_accumulator_test.py diff --git a/tensorflow_addons/optimizers/__init__.py b/tensorflow_addons/optimizers/__init__.py index b8bc0109da..3cf79856c5 100644 --- a/tensorflow_addons/optimizers/__init__.py +++ b/tensorflow_addons/optimizers/__init__.py @@ -32,6 +32,7 @@ from tensorflow_addons.optimizers.lamb import LAMB from tensorflow_addons.optimizers.lazy_adam import LazyAdam from tensorflow_addons.optimizers.lookahead import Lookahead +from tensorflow_addons.optimizers.gradient_accumulator import GradientAccumulator from tensorflow_addons.optimizers.moving_average import MovingAverage from tensorflow_addons.optimizers.novograd import NovoGrad from tensorflow_addons.optimizers.proximal_adagrad import ProximalAdagrad diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py new file mode 100644 index 0000000000..432aa48639 --- /dev/null +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -0,0 +1,179 @@ +# Copyright 2021 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +import tensorflow as tf +from tensorflow_addons.utils import types +from typeguard import typechecked + + +@tf.keras.utils.register_keras_serializable(package="Addons") +class GradientAccumulator(tf.keras.optimizers.Optimizer): + """Optimizer wrapper for gradient accumulation.""" + + @typechecked + def __init__( + self, + optimizer: types.Optimizer, + accum_steps: types.TensorLike = 4, + name: str = "GradientAccumulator", + **kwargs, + ): + r"""Construct a new GradientAccumulator optimizer. + + Args: + optimizer: str or `tf.keras.optimizers.Optimizer` that will be + used to compute and apply gradients. + accum_steps: int > 0. Update gradient in every accumulation steps. + name: Optional name for the operations created when applying + gradients. Defaults to "GradientAccumulator". + **kwargs: keyword arguments. Allowed to be {`clipnorm`, + `clipvalue`, `lr`, `decay`}. `clipnorm` is clip gradients by + norm; `clipvalue` is clip gradients by value, `decay` is + included for backward compatibility to allow time inverse + decay of learning rate. `lr` is included for backward + compatibility, recommended to use `learning_rate` instead. + """ + super().__init__(name, **kwargs) + self._optimizer = tf.keras.optimizers.get(optimizer) + self._gradients = [] + self._accum_steps = accum_steps + + def _create_slots(self, var_list): + self._optimizer._create_slots(var_list=var_list) + for var in var_list: + self.add_slot(var, "ga") + + self._gradients = [self.get_slot(var, "ga") for var in var_list] + + @property + def gradients(self): + """The accumulated gradients on the current replica.""" + if not self._gradients: + raise ValueError( + "The accumulator should be called first to initialize the gradients" + ) + return list( + gradient.read_value() if gradient is not None else gradient + for gradient in self._gradients + ) + + def apply_gradients(self, grads_and_vars, name=None, **kwargs): + self._optimizer._iterations = self.iterations + return super().apply_gradients(grads_and_vars, name, **kwargs) + + def _resource_apply_dense(self, grad, var, apply_state=None): + accum_gradient = self.get_slot(var, "ga") + if accum_gradient is not None and grad is not None: + accum_gradient.assign_add( + grad, use_locking=self._use_locking, read_value=False + ) + + def _apply(): + if "apply_state" in self._optimizer._dense_apply_args: + train_op = self._optimizer._resource_apply_dense( + accum_gradient.read_value(), var, apply_state=apply_state + ) + else: + train_op = self._optimizer._resource_apply_dense( + accum_gradient.read_value(), var + ) + reset_op = accum_gradient.assign( + tf.zeros_like(accum_gradient), + use_locking=self._use_locking, + read_value=False, + ) + return tf.group(train_op, reset_op) + + apply_op = tf.cond( + (self.iterations + 1) % self._accum_steps == 0, _apply, lambda: tf.no_op() + ) + return apply_op + + def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): + accum_gradient = self.get_slot(var, "ga") + if accum_gradient is not None and grad is not None: + self._resource_scatter_add(accum_gradient, indices, grad) + + def _apply(): + if "apply_state" in self._optimizer._sparse_apply_args: + train_op = self._optimizer._resource_apply_sparse( + accum_gradient.sparse_read(indices), + var, + indices, + apply_state=apply_state, + ) + else: + train_op = self._optimizer._resource_apply_sparse( + accum_gradient.sparse_read(indices), var, indices + ) + reset_op = accum_gradient.assign( + tf.zeros_like(accum_gradient), + use_locking=self._use_locking, + read_value=False, + ) + return tf.group(train_op, reset_op) + + apply_op = tf.cond( + (self.iterations + 1) % self._accum_steps == 0, _apply, lambda: tf.no_op() + ) + return apply_op + + def reset(self): + """Resets the accumulated gradients on the current replica.""" + assign_ops = [] + if not self._gradients: + return assign_ops + + for gradient in self._gradients: + if gradient is not None: + assign_ops.append( + gradient.assign( + tf.zeros_like(gradient), + use_locking=self._use_locking, + read_value=False, + ) + ) + + return tf.group(assign_ops) + + @property + def lr(self): + return self._optimizer._get_hyper("learning_rate") + + @lr.setter + def lr(self, lr): + self._optimizer._set_hyper("learning_rate", lr) # + + @property + def learning_rate(self): + return self._optimizer._get_hyper("learning_rate") + + @learning_rate.setter + def learning_rate(self, learning_rate): + self._optimizer._set_hyper("learning_rate", learning_rate) + + def get_config(self): + config = { + "accum_steps": self._accum_steps, + "optimizer": tf.keras.optimizers.serialize(self._optimizer), + } + base_config = super().get_config() + return {**base_config, **config} + + @classmethod + def from_config(cls, config, custom_objects=None): + optimizer = tf.keras.optimizers.deserialize( + config.pop("optimizer"), custom_objects=custom_objects + ) + return cls(optimizer, **config) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py new file mode 100644 index 0000000000..a4596880cb --- /dev/null +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -0,0 +1,134 @@ +# Copyright 2021 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for GradientAccumulator optimizers.""" + +import numpy as np +import pytest +import tensorflow as tf + +from tensorflow_addons.optimizers import GradientAccumulator + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_run(): + var0 = tf.Variable([1.0, 2.0]) + var1 = tf.Variable([3.0, 4.0]) + accum_steps = 4 + + grads0 = tf.constant([0.1, 0.1]) + grads1 = tf.constant([0.01, 0.01]) + + grads_and_vars = list(zip([grads0, grads1], [var0, var1])) + + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accum_steps) + + for _ in range(accum_steps): + opt.apply_gradients(grads_and_vars) + + np.testing.assert_allclose(var0.read_value(), [0.6, 1.6]) + np.testing.assert_allclose(var1.read_value(), [2.96, 3.96]) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_opt_failure(): + base_opt = None + with pytest.raises(TypeError): + GradientAccumulator(base_opt, 0.5) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_model_weights_not_update(): + grad = tf.Variable([[0.1]]) + model = tf.keras.Sequential( + [ + tf.keras.layers.Dense( + 1, + kernel_initializer=tf.keras.initializers.Constant([[1.0]]), + use_bias=False, + ) + ] + ) + model.build(input_shape=[1, 1]) + + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=2.0), accum_steps=2) + _ = opt.apply_gradients(list(zip([grad], model.variables))) + np.testing.assert_allclose(model.variables[0].read_value(), [[1.0]]) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_optimizer_string(): + _ = GradientAccumulator("adam") + + +def test_config(): + sgd_opt = tf.keras.optimizers.SGD(lr=2.0, nesterov=True, momentum=0.3, decay=0.1) + accum_steps = 4 + opt = GradientAccumulator(sgd_opt, accum_steps=accum_steps) + config = opt.get_config() + + assert config["accum_steps"] == accum_steps + + new_opt = GradientAccumulator.from_config(config) + old_sgd_config = opt._optimizer.get_config() + new_sgd_config = new_opt._optimizer.get_config() + + for k1, k2 in zip(old_sgd_config, new_sgd_config): + assert old_sgd_config[k1] == new_sgd_config[k2] + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_fit_simple_linear_model(): + seed = 0x2019 + np.random.seed(seed) + tf.random.set_seed(seed) + num_examples = 5000 + x = np.random.standard_normal((num_examples, 3)) + w = np.random.standard_normal((3, 1)) + y = np.dot(x, w) + np.random.standard_normal((num_examples, 1)) * 1e-4 + + model = tf.keras.models.Sequential() + model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) + + opt = GradientAccumulator("sgd") + model.compile(opt, loss="mse") + + model.fit(x, y, epochs=5) + + x = np.random.standard_normal((100, 3)) + y = np.dot(x, w) + + predicted = model.predict(x) + + max_abs_diff = np.max(np.abs(predicted - y)) + assert max_abs_diff < 5e-3 + + +def test_serialization(): + sgd_opt = tf.keras.optimizers.SGD(lr=2.0, nesterov=True, momentum=0.3, decay=0.1) + optimizer = GradientAccumulator(sgd_opt) + config = tf.keras.optimizers.serialize(optimizer) + new_optimizer = tf.keras.optimizers.deserialize(config) + assert new_optimizer.get_config() == optimizer.get_config() + + +@pytest.mark.usefixtures("run_with_mixed_precision_policy") +def test_model_mixed_precision(): + x = np.random.standard_normal((10000, 3)) + w = np.random.standard_normal((3, 1)) + y = np.dot(x, w) + np.random.standard_normal((10000, 1)) * 1e-4 + model = tf.keras.Sequential() + model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) + model.compile(GradientAccumulator("sgd"), loss="mse") + model.fit(x, y, epochs=3) From 2c0fbae8d73ba82a3fd0562aa4b18ef47c5269ef Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 09:33:03 +0000 Subject: [PATCH 02/24] add exceptions --- tensorflow_addons/optimizers/tests/standard_test.py | 1 + tools/testing/source_code_test.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tensorflow_addons/optimizers/tests/standard_test.py b/tensorflow_addons/optimizers/tests/standard_test.py index f1d284ad68..3366c4f9a4 100644 --- a/tensorflow_addons/optimizers/tests/standard_test.py +++ b/tensorflow_addons/optimizers/tests/standard_test.py @@ -29,6 +29,7 @@ "ConditionalGradient", # is wrapper "Lookahead", # is wrapper "MovingAverage", # is wrapper + "GradientAccumulator", # is wrapper ] diff --git a/tools/testing/source_code_test.py b/tools/testing/source_code_test.py index c54bf73ea2..299e612078 100644 --- a/tools/testing/source_code_test.py +++ b/tools/testing/source_code_test.py @@ -124,6 +124,7 @@ def test_no_tf_cond(): "tensorflow_addons/metrics/cohens_kappa.py", "tensorflow_addons/seq2seq/sampler.py", "tensorflow_addons/seq2seq/beam_search_decoder.py", + "tensorflow_addons/optimizers/gradient_accumulator.py", ] for file_path, line_idx, line in get_lines_of_source_code(allowlist): From 11e536dc5559dae3d383ef037ceff6e85fb4f88a Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 12:25:46 +0000 Subject: [PATCH 03/24] fix multi gpus bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 432aa48639..45ab758071 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -96,7 +96,7 @@ def _apply(): return tf.group(train_op, reset_op) apply_op = tf.cond( - (self.iterations + 1) % self._accum_steps == 0, _apply, lambda: tf.no_op() + self.iterations % self._accum_steps == 0, _apply, lambda: tf.no_op() ) return apply_op @@ -125,7 +125,7 @@ def _apply(): return tf.group(train_op, reset_op) apply_op = tf.cond( - (self.iterations + 1) % self._accum_steps == 0, _apply, lambda: tf.no_op() + self.iterations % self._accum_steps == 0, _apply, lambda: tf.no_op() ) return apply_op From 1a4c0d495c388c9e6d62884276c9043fa2e17783 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 12:32:26 +0000 Subject: [PATCH 04/24] fix test bugs --- .../optimizers/tests/gradient_accumulator_test.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index a4596880cb..a541ebd05c 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -34,18 +34,11 @@ def test_run(): opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accum_steps) - for _ in range(accum_steps): + for _ in range(accum_steps + 1): opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [0.6, 1.6]) - np.testing.assert_allclose(var1.read_value(), [2.96, 3.96]) - - -@pytest.mark.usefixtures("maybe_run_functions_eagerly") -def test_opt_failure(): - base_opt = None - with pytest.raises(TypeError): - GradientAccumulator(base_opt, 0.5) + np.testing.assert_allclose(var0.read_value(), [0.5, 1.5]) + np.testing.assert_allclose(var1.read_value(), [2.95, 3.95]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -64,7 +57,7 @@ def test_model_weights_not_update(): opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=2.0), accum_steps=2) _ = opt.apply_gradients(list(zip([grad], model.variables))) - np.testing.assert_allclose(model.variables[0].read_value(), [[1.0]]) + np.testing.assert_allclose(model.variables[0].read_value(), [[0.8]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") From eabed95fde398a3c64fb4e2ce6a93747a2824dc7 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 13:37:56 +0000 Subject: [PATCH 05/24] fix sparse optimizer --- .../optimizers/gradient_accumulator.py | 4 +-- .../tests/gradient_accumulator_test.py | 34 +++++++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 45ab758071..19096195a2 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -108,14 +108,14 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta def _apply(): if "apply_state" in self._optimizer._sparse_apply_args: train_op = self._optimizer._resource_apply_sparse( - accum_gradient.sparse_read(indices), + accum_gradient.read_value(), var, indices, apply_state=apply_state, ) else: train_op = self._optimizer._resource_apply_sparse( - accum_gradient.sparse_read(indices), var, indices + accum_gradient.read_value(), var, indices ) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index a541ebd05c..34d33eae74 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -17,6 +17,7 @@ import numpy as np import pytest import tensorflow as tf +from tensorflow_addons.utils import test_utils from tensorflow_addons.optimizers import GradientAccumulator @@ -42,7 +43,26 @@ def test_run(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") -def test_model_weights_not_update(): +def test_sparse(): + var0 = tf.Variable([1.0, 2.0]) + var1 = tf.Variable([3.0, 4.0]) + + grads0 = tf.constant([0.1, 0.1]) + grads1 = tf.constant([0.01, 0.01]) + grads0_np_indices = tf.constant([0, 1], dtype=tf.int32) + grads0 = tf.IndexedSlices(grads0, grads0_np_indices, tf.constant([2])) + grads1_np_indices = tf.constant([0, 1], dtype=tf.int32) + grads1 = tf.IndexedSlices(grads1, grads1_np_indices, tf.constant([2])) + + grads_and_vars = list(zip([grads0, grads1], [var0, var1])) + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) + opt.apply_gradients(grads_and_vars) + np.testing.assert_allclose(var0.read_value(), [0.9, 1.9]) + np.testing.assert_allclose(var1.read_value(), [2.99, 3.99]) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_dense(): grad = tf.Variable([[0.1]]) model = tf.keras.Sequential( [ @@ -82,6 +102,7 @@ def test_config(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") +@pytest.mark.needs_gpu def test_fit_simple_linear_model(): seed = 0x2019 np.random.seed(seed) @@ -90,12 +111,13 @@ def test_fit_simple_linear_model(): x = np.random.standard_normal((num_examples, 3)) w = np.random.standard_normal((3, 1)) y = np.dot(x, w) + np.random.standard_normal((num_examples, 1)) * 1e-4 + strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) + with strategy.scope(): + model = tf.keras.models.Sequential() + model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) - model = tf.keras.models.Sequential() - model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) - - opt = GradientAccumulator("sgd") - model.compile(opt, loss="mse") + opt = GradientAccumulator("sgd") + model.compile(opt, loss="mse") model.fit(x, y, epochs=5) From a6ff7c014f97dd51266d81094499aca0a40722eb Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 14:06:43 +0000 Subject: [PATCH 06/24] remove read_value --- tensorflow_addons/optimizers/gradient_accumulator.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 19096195a2..24bcabe018 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -82,12 +82,10 @@ def _resource_apply_dense(self, grad, var, apply_state=None): def _apply(): if "apply_state" in self._optimizer._dense_apply_args: train_op = self._optimizer._resource_apply_dense( - accum_gradient.read_value(), var, apply_state=apply_state + accum_gradient, var, apply_state=apply_state ) else: - train_op = self._optimizer._resource_apply_dense( - accum_gradient.read_value(), var - ) + train_op = self._optimizer._resource_apply_dense(accum_gradient, var) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), use_locking=self._use_locking, @@ -108,14 +106,14 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta def _apply(): if "apply_state" in self._optimizer._sparse_apply_args: train_op = self._optimizer._resource_apply_sparse( - accum_gradient.read_value(), + accum_gradient, var, indices, apply_state=apply_state, ) else: train_op = self._optimizer._resource_apply_sparse( - accum_gradient.read_value(), var, indices + accum_gradient, var, indices ) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), From 24ae8a904effe7c65a8ac5ec5f0a2a9e019db472 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 15:30:06 +0000 Subject: [PATCH 07/24] fix sparse test --- .../tests/gradient_accumulator_test.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 34d33eae74..aff641fcb7 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -44,21 +44,25 @@ def test_run(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") def test_sparse(): - var0 = tf.Variable([1.0, 2.0]) - var1 = tf.Variable([3.0, 4.0]) + var0 = tf.Variable([[1.0, 2.0, 0.0]]) + var1 = tf.Variable([[3.0, 4.0, 0.0]]) - grads0 = tf.constant([0.1, 0.1]) - grads1 = tf.constant([0.01, 0.01]) - grads0_np_indices = tf.constant([0, 1], dtype=tf.int32) - grads0 = tf.IndexedSlices(grads0, grads0_np_indices, tf.constant([2])) - grads1_np_indices = tf.constant([0, 1], dtype=tf.int32) - grads1 = tf.IndexedSlices(grads1, grads1_np_indices, tf.constant([2])) + grads0 = tf.IndexedSlices( + tf.constant([[0.1, 0.1, 0.0]]), + tf.constant([0]), + tf.constant([1, 3]), + ) + grads1 = tf.IndexedSlices( + tf.constant([[0.01, 0.01, 0.0]]), + tf.constant([0]), + tf.constant([1, 3]), + ) grads_and_vars = list(zip([grads0, grads1], [var0, var1])) opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [0.9, 1.9]) - np.testing.assert_allclose(var1.read_value(), [2.99, 3.99]) + np.testing.assert_allclose(var0.read_value(), [[0.9, 1.9, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") From 2760fadfbd33526932d126559878dedf98899824 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 16:15:19 +0000 Subject: [PATCH 08/24] fix sparse bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 24bcabe018..bdb3104846 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -105,16 +105,13 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta def _apply(): if "apply_state" in self._optimizer._sparse_apply_args: - train_op = self._optimizer._resource_apply_sparse( + train_op = self._optimizer._resource_apply_dense( accum_gradient, var, - indices, apply_state=apply_state, ) else: - train_op = self._optimizer._resource_apply_sparse( - accum_gradient, var, indices - ) + train_op = self._optimizer._resource_apply_dense(accum_gradient, var) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), use_locking=self._use_locking, From 4ba7a559702f4850a7880a9dba39b032e1a7e9b5 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 16:22:09 +0000 Subject: [PATCH 09/24] refactor --- .../optimizers/gradient_accumulator.py | 24 ++++--------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index bdb3104846..203b7d4d00 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -79,32 +79,18 @@ def _resource_apply_dense(self, grad, var, apply_state=None): grad, use_locking=self._use_locking, read_value=False ) - def _apply(): - if "apply_state" in self._optimizer._dense_apply_args: - train_op = self._optimizer._resource_apply_dense( - accum_gradient, var, apply_state=apply_state - ) - else: - train_op = self._optimizer._resource_apply_dense(accum_gradient, var) - reset_op = accum_gradient.assign( - tf.zeros_like(accum_gradient), - use_locking=self._use_locking, - read_value=False, - ) - return tf.group(train_op, reset_op) - - apply_op = tf.cond( - self.iterations % self._accum_steps == 0, _apply, lambda: tf.no_op() - ) - return apply_op + return self._apply_grad(accum_gradient, var, apply_state) def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): accum_gradient = self.get_slot(var, "ga") if accum_gradient is not None and grad is not None: self._resource_scatter_add(accum_gradient, indices, grad) + return self._apply_grad(accum_gradient, var, apply_state) + + def _apply_grad(self, accum_gradient, var, apply_state): def _apply(): - if "apply_state" in self._optimizer._sparse_apply_args: + if "apply_state" in self._optimizer._dense_apply_args: train_op = self._optimizer._resource_apply_dense( accum_gradient, var, From dc50184d03b262ee0afd1e327037b312bb8e65e5 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Fri, 16 Jul 2021 00:31:19 +0000 Subject: [PATCH 10/24] add sparse multi gpu test --- .../tests/gradient_accumulator_test.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index aff641fcb7..7e46716a55 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -65,6 +65,32 @@ def test_sparse(): np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +@pytest.mark.needs_gpu +def test_sparse_multi_gpus(): + strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) + with strategy.scope(): + var0 = tf.Variable([[1.0, 2.0, 0.0]]) + var1 = tf.Variable([[3.0, 4.0, 0.0]]) + + grads0 = tf.IndexedSlices( + tf.constant([[0.1, 0.1, 0.0]]), + tf.constant([0]), + tf.constant([1, 3]), + ) + grads1 = tf.IndexedSlices( + tf.constant([[0.01, 0.01, 0.0]]), + tf.constant([0]), + tf.constant([1, 3]), + ) + + grads_and_vars = list(zip([grads0, grads1], [var0, var1])) + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) + strategy.run(opt.apply_gradients, [grads_and_vars]) + np.testing.assert_allclose(var0.read_value(), [[0.9, 1.9, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) + + @pytest.mark.usefixtures("maybe_run_functions_eagerly") def test_dense(): grad = tf.Variable([[0.1]]) From 8cd65ad6880d7da20a875d8a6619faa89735fc51 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Sun, 18 Jul 2021 03:22:45 +0000 Subject: [PATCH 11/24] fix rnn bug --- .../optimizers/gradient_accumulator.py | 50 +++++++++++-------- .../tests/gradient_accumulator_test.py | 15 +++--- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 203b7d4d00..68bd5dc07c 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -69,8 +69,15 @@ def gradients(self): ) def apply_gradients(self, grads_and_vars, name=None, **kwargs): - self._optimizer._iterations = self.iterations - return super().apply_gradients(grads_and_vars, name, **kwargs) + train_op = super().apply_gradients(grads_and_vars, name, **kwargs) + with tf.control_dependencies([train_op]): + assign_op = self._optimizer.iterations.assign_add( + tf.cast( + tf.where(self.iterations % self._accum_steps == 0, 1, 0), tf.int64 + ), + read_value=False, + ) + return assign_op def _resource_apply_dense(self, grad, var, apply_state=None): accum_gradient = self.get_slot(var, "ga") @@ -89,26 +96,29 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta return self._apply_grad(accum_gradient, var, apply_state) def _apply_grad(self, accum_gradient, var, apply_state): - def _apply(): - if "apply_state" in self._optimizer._dense_apply_args: - train_op = self._optimizer._resource_apply_dense( - accum_gradient, - var, - apply_state=apply_state, - ) - else: - train_op = self._optimizer._resource_apply_dense(accum_gradient, var) - reset_op = accum_gradient.assign( - tf.zeros_like(accum_gradient), - use_locking=self._use_locking, - read_value=False, + grad = tf.where( + (self.iterations + 1) % self._accum_steps == 0, + accum_gradient, + tf.zeros_like(var), + ) + if "apply_state" in self._optimizer._dense_apply_args: + train_op = self._optimizer._resource_apply_dense( + grad, + var, + apply_state=apply_state, ) - return tf.group(train_op, reset_op) - - apply_op = tf.cond( - self.iterations % self._accum_steps == 0, _apply, lambda: tf.no_op() + else: + train_op = self._optimizer._resource_apply_dense(grad, var) + reset_val = tf.where( + grad == accum_gradient, tf.zeros_like(accum_gradient), accum_gradient ) - return apply_op + reset_op = accum_gradient.assign( + reset_val, + use_locking=self._use_locking, + read_value=False, + ) + + return tf.group(train_op, reset_op) def reset(self): """Resets the accumulated gradients on the current replica.""" diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 7e46716a55..fada07a22a 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -38,8 +38,8 @@ def test_run(): for _ in range(accum_steps + 1): opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [0.5, 1.5]) - np.testing.assert_allclose(var1.read_value(), [2.95, 3.95]) + np.testing.assert_allclose(var0.read_value(), [0.6, 1.6]) + np.testing.assert_allclose(var1.read_value(), [2.96, 3.96]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -61,8 +61,8 @@ def test_sparse(): grads_and_vars = list(zip([grads0, grads1], [var0, var1])) opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [[0.9, 1.9, 0.0]]) - np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) + np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -87,8 +87,8 @@ def test_sparse_multi_gpus(): grads_and_vars = list(zip([grads0, grads1], [var0, var1])) opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) strategy.run(opt.apply_gradients, [grads_and_vars]) - np.testing.assert_allclose(var0.read_value(), [[0.9, 1.9, 0.0]]) - np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) + np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -107,7 +107,7 @@ def test_dense(): opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=2.0), accum_steps=2) _ = opt.apply_gradients(list(zip([grad], model.variables))) - np.testing.assert_allclose(model.variables[0].read_value(), [[0.8]]) + np.testing.assert_allclose(model.variables[0].read_value(), [[1.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -168,6 +168,7 @@ def test_serialization(): assert new_optimizer.get_config() == optimizer.get_config() +@pytest.mark.usefixtures("maybe_run_functions_eagerly") @pytest.mark.usefixtures("run_with_mixed_precision_policy") def test_model_mixed_precision(): x = np.random.standard_normal((10000, 3)) From 7d40946e53ef4f3b9f99819b85023e28c95a070e Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 00:35:02 +0000 Subject: [PATCH 12/24] fix step bugs --- .../optimizers/gradient_accumulator.py | 69 +++++++++++++++---- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 68bd5dc07c..dbf96589cb 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -24,7 +24,7 @@ class GradientAccumulator(tf.keras.optimizers.Optimizer): @typechecked def __init__( self, - optimizer: types.Optimizer, + inner_optimizer: types.Optimizer, accum_steps: types.TensorLike = 4, name: str = "GradientAccumulator", **kwargs, @@ -32,7 +32,7 @@ def __init__( r"""Construct a new GradientAccumulator optimizer. Args: - optimizer: str or `tf.keras.optimizers.Optimizer` that will be + inner_optimizer: str or `tf.keras.optimizers.Optimizer` that will be used to compute and apply gradients. accum_steps: int > 0. Update gradient in every accumulation steps. name: Optional name for the operations created when applying @@ -44,10 +44,12 @@ def __init__( decay of learning rate. `lr` is included for backward compatibility, recommended to use `learning_rate` instead. """ - super().__init__(name, **kwargs) - self._optimizer = tf.keras.optimizers.get(optimizer) + self._optimizer = tf.keras.optimizers.get(inner_optimizer) self._gradients = [] self._accum_steps = accum_steps + self._step = None + self._iteraions = self._optimizer.iterations + super().__init__(name, **kwargs) def _create_slots(self, var_list): self._optimizer._create_slots(var_list=var_list) @@ -56,6 +58,32 @@ def _create_slots(self, var_list): self._gradients = [self.get_slot(var, "ga") for var in var_list] + @property + def step(self): + """Variable. The number of training steps this Optimizer has run.""" + if self._step is None: + with self._distribution_strategy_scope(): + self._step = self.add_weight( + "iter", + shape=[], + initializer="ones", + dtype=tf.int64, + trainable=False, + aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA, + ) + self._weights.append(self._step) + return self._step + + @step.setter + def step(self, variable): + if self._step is not None: + raise RuntimeError( + "Cannot set `step` to a new Variable after " + "the Optimizer weights have been created" + ) + self._step = variable + self._weights.append(self._step) + @property def gradients(self): """The accumulated gradients on the current replica.""" @@ -71,13 +99,17 @@ def gradients(self): def apply_gradients(self, grads_and_vars, name=None, **kwargs): train_op = super().apply_gradients(grads_and_vars, name, **kwargs) with tf.control_dependencies([train_op]): - assign_op = self._optimizer.iterations.assign_add( - tf.cast( - tf.where(self.iterations % self._accum_steps == 0, 1, 0), tf.int64 - ), - read_value=False, - ) - return assign_op + with tf.control_dependencies( + [ + self._optimizer.iterations.assign_add( + tf.cast( + tf.where(self.step % self._accum_steps == 0, 1, 0), tf.int64 + ), + read_value=False, + ) + ] + ): + return self.step.assign_add(1, read_value=False) def _resource_apply_dense(self, grad, var, apply_state=None): accum_gradient = self.get_slot(var, "ga") @@ -97,7 +129,7 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta def _apply_grad(self, accum_gradient, var, apply_state): grad = tf.where( - (self.iterations + 1) % self._accum_steps == 0, + self.step % self._accum_steps == 0, accum_gradient, tf.zeros_like(var), ) @@ -138,6 +170,19 @@ def reset(self): return tf.group(assign_ops) + @property + def inner_optimizer(self): + """The optimizer that this LossScaleOptimizer is wrapping.""" + return self._optimizer + + @property + def iterations(self): + return self._optimizer.iterations + + @iterations.setter + def iterations(self, variable): + self._optimizer.iterations = variable + @property def lr(self): return self._optimizer._get_hyper("learning_rate") From 6949bd3c6cfa36e650207a77be62d2aaab495600 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 01:13:11 +0000 Subject: [PATCH 13/24] fix _iterations --- tensorflow_addons/optimizers/gradient_accumulator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index dbf96589cb..32009aa4ae 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -44,12 +44,12 @@ def __init__( decay of learning rate. `lr` is included for backward compatibility, recommended to use `learning_rate` instead. """ + super().__init__(name, **kwargs) self._optimizer = tf.keras.optimizers.get(inner_optimizer) self._gradients = [] self._accum_steps = accum_steps self._step = None - self._iteraions = self._optimizer.iterations - super().__init__(name, **kwargs) + self._iterations = self._optimizer.iterations def _create_slots(self, var_list): self._optimizer._create_slots(var_list=var_list) From 9e423e57b0c239ca0314e64980b9ba487c919c2c Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 06:28:20 +0000 Subject: [PATCH 14/24] use gradient transformer --- .../optimizers/gradient_accumulator.py | 98 ++++++++++++------- 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 32009aa4ae..0be15e8b87 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -46,17 +46,60 @@ def __init__( """ super().__init__(name, **kwargs) self._optimizer = tf.keras.optimizers.get(inner_optimizer) - self._gradients = [] - self._accum_steps = accum_steps self._step = None + self._gradients = {} + self._accum_steps = accum_steps + + def _accum_grad(grads_and_vars): + with tf.init_scope(): + if not self._gradients: + for grad, var in grads_and_vars: + if tf.distribute.has_strategy(): + for v in var.values: + self._gradients[v.ref()] = tf.Variable( + tf.zeros_like(v), trainable=False + ) + else: + self._gradients[var.ref()] = tf.Variable( + tf.zeros_like(var), trainable=False + ) + new_grads_and_vars = [] + for grad, var in grads_and_vars: + if tf.distribute.has_strategy(): + replica_id = tf.get_static_value( + tf.distribute.get_replica_context().replica_id_in_sync_group + ) + handle = self._gradients[var.values[replica_id].ref()] + else: + handle = self._gradients[var.ref()] + + if isinstance(grad, tf.IndexedSlices): + handle.scatter_add(grad) + fake_grad = tf.IndexedSlices( + tf.zeros_like(grad.values), grad.indices, grad.dense_shape + ) + else: + handle.assign_add(grad) + fake_grad = tf.zeros_like(var) + + def _get_grad(): + new_grad = handle.read_value() + handle.assign(tf.zeros_like(handle), use_locking=self._use_locking) + return new_grad + + new_grad = tf.cond( + (self.iterations + 1) % self._accum_steps == 0, + _get_grad, + lambda: fake_grad, + ) + new_grads_and_vars.append((new_grad, var)) + return new_grads_and_vars + + self.gradient_transformers.append(_accum_grad) self._iterations = self._optimizer.iterations def _create_slots(self, var_list): self._optimizer._create_slots(var_list=var_list) - for var in var_list: - self.add_slot(var, "ga") - - self._gradients = [self.get_slot(var, "ga") for var in var_list] @property def step(self): @@ -93,7 +136,7 @@ def gradients(self): ) return list( gradient.read_value() if gradient is not None else gradient - for gradient in self._gradients + for _, gradient in self._gradients ) def apply_gradients(self, grads_and_vars, name=None, **kwargs): @@ -112,27 +155,6 @@ def apply_gradients(self, grads_and_vars, name=None, **kwargs): return self.step.assign_add(1, read_value=False) def _resource_apply_dense(self, grad, var, apply_state=None): - accum_gradient = self.get_slot(var, "ga") - if accum_gradient is not None and grad is not None: - accum_gradient.assign_add( - grad, use_locking=self._use_locking, read_value=False - ) - - return self._apply_grad(accum_gradient, var, apply_state) - - def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): - accum_gradient = self.get_slot(var, "ga") - if accum_gradient is not None and grad is not None: - self._resource_scatter_add(accum_gradient, indices, grad) - - return self._apply_grad(accum_gradient, var, apply_state) - - def _apply_grad(self, accum_gradient, var, apply_state): - grad = tf.where( - self.step % self._accum_steps == 0, - accum_gradient, - tf.zeros_like(var), - ) if "apply_state" in self._optimizer._dense_apply_args: train_op = self._optimizer._resource_apply_dense( grad, @@ -141,16 +163,16 @@ def _apply_grad(self, accum_gradient, var, apply_state): ) else: train_op = self._optimizer._resource_apply_dense(grad, var) - reset_val = tf.where( - grad == accum_gradient, tf.zeros_like(accum_gradient), accum_gradient - ) - reset_op = accum_gradient.assign( - reset_val, - use_locking=self._use_locking, - read_value=False, - ) + return train_op - return tf.group(train_op, reset_op) + def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): + if "apply_state" in self._optimizer._sparse_apply_args: + train_op = self._optimizer._resource_apply_sparse( + grad, var, indices, apply_state=apply_state + ) + else: + train_op = self._optimizer._resource_apply_sparse(grad, var, indices) + return train_op def reset(self): """Resets the accumulated gradients on the current replica.""" @@ -158,7 +180,7 @@ def reset(self): if not self._gradients: return assign_ops - for gradient in self._gradients: + for _, gradient in self._gradients: if gradient is not None: assign_ops.append( gradient.assign( From 7f3b2e921cbe48e447f3feba470a7ac08198cfd3 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 06:34:51 +0000 Subject: [PATCH 15/24] fix bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 0be15e8b87..2847d0e2f4 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -88,7 +88,7 @@ def _get_grad(): return new_grad new_grad = tf.cond( - (self.iterations + 1) % self._accum_steps == 0, + (self.step + 1) % self._accum_steps == 0, _get_grad, lambda: fake_grad, ) From 99dcde51d1046cebea8c28f2848e2426b96e761e Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 06:40:49 +0000 Subject: [PATCH 16/24] fix step bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 2847d0e2f4..2b2b79938e 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -88,7 +88,7 @@ def _get_grad(): return new_grad new_grad = tf.cond( - (self.step + 1) % self._accum_steps == 0, + self.step % self._accum_steps == 0, _get_grad, lambda: fake_grad, ) From a1845810c514a387fc6e0621ca6cee5e1bcddd60 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 08:25:41 +0000 Subject: [PATCH 17/24] simpify code --- .../optimizers/gradient_accumulator.py | 24 ++----------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 2b2b79938e..dd422e182f 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -95,7 +95,7 @@ def _get_grad(): new_grads_and_vars.append((new_grad, var)) return new_grads_and_vars - self.gradient_transformers.append(_accum_grad) + self._optimizer.gradient_transformers.append(_accum_grad) self._iterations = self._optimizer.iterations def _create_slots(self, var_list): @@ -140,7 +140,7 @@ def gradients(self): ) def apply_gradients(self, grads_and_vars, name=None, **kwargs): - train_op = super().apply_gradients(grads_and_vars, name, **kwargs) + train_op = self._optimizer.apply_gradients(grads_and_vars, name, **kwargs) with tf.control_dependencies([train_op]): with tf.control_dependencies( [ @@ -154,26 +154,6 @@ def apply_gradients(self, grads_and_vars, name=None, **kwargs): ): return self.step.assign_add(1, read_value=False) - def _resource_apply_dense(self, grad, var, apply_state=None): - if "apply_state" in self._optimizer._dense_apply_args: - train_op = self._optimizer._resource_apply_dense( - grad, - var, - apply_state=apply_state, - ) - else: - train_op = self._optimizer._resource_apply_dense(grad, var) - return train_op - - def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): - if "apply_state" in self._optimizer._sparse_apply_args: - train_op = self._optimizer._resource_apply_sparse( - grad, var, indices, apply_state=apply_state - ) - else: - train_op = self._optimizer._resource_apply_sparse(grad, var, indices) - return train_op - def reset(self): """Resets the accumulated gradients on the current replica.""" assign_ops = [] From d0718f851e65cbc06961d6407cf22d1faa86bd29 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 10:39:00 +0000 Subject: [PATCH 18/24] optimize --- .../optimizers/gradient_accumulator.py | 49 ++++++++++++++----- .../tests/gradient_accumulator_test.py | 15 +++--- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index dd422e182f..87b02492ff 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -78,21 +78,48 @@ def _accum_grad(grads_and_vars): fake_grad = tf.IndexedSlices( tf.zeros_like(grad.values), grad.indices, grad.dense_shape ) + + def _get_grad(): + new_grad = handle.read_value() + indices = tf.nest.flatten( + tf.where( + tf.reduce_sum( + new_grad, axis=list(range(len(new_grad.shape))[1:]) + ) + != 0 + )[0] + ) + values = tf.gather(new_grad, indices) + dense_shape = new_grad.shape + new_grad = tf.IndexedSlices(values, indices, dense_shape) + handle.assign( + tf.zeros_like(handle), use_locking=self._use_locking + ) + return new_grad + + new_grad = tf.cond( + self.step % self._accum_steps == 0, + _get_grad, + lambda: fake_grad, + ) + new_grads_and_vars.append((new_grad, var)) else: handle.assign_add(grad) fake_grad = tf.zeros_like(var) - def _get_grad(): - new_grad = handle.read_value() - handle.assign(tf.zeros_like(handle), use_locking=self._use_locking) - return new_grad - - new_grad = tf.cond( - self.step % self._accum_steps == 0, - _get_grad, - lambda: fake_grad, - ) - new_grads_and_vars.append((new_grad, var)) + def _get_grad(): + new_grad = handle.read_value() + handle.assign( + tf.zeros_like(handle), use_locking=self._use_locking + ) + return new_grad + + new_grad = tf.cond( + self.step % self._accum_steps == 0, + _get_grad, + lambda: fake_grad, + ) + new_grads_and_vars.append((new_grad, var)) return new_grads_and_vars self._optimizer.gradient_transformers.append(_accum_grad) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index fada07a22a..7fe4171edd 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -44,12 +44,12 @@ def test_run(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") def test_sparse(): - var0 = tf.Variable([[1.0, 2.0, 0.0]]) + var0 = tf.Variable([[1.0, 2.0, 0.0], [1.0, 2.0, 0.0]]) var1 = tf.Variable([[3.0, 4.0, 0.0]]) grads0 = tf.IndexedSlices( tf.constant([[0.1, 0.1, 0.0]]), - tf.constant([0]), + tf.constant([1]), tf.constant([1, 3]), ) grads1 = tf.IndexedSlices( @@ -59,10 +59,11 @@ def test_sparse(): ) grads_and_vars = list(zip([grads0, grads1], [var0, var1])) - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) - opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) - np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0)) + for _ in range(8): + opt.apply_gradients(grads_and_vars) + np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0], [0.2, 1.2, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[2.92, 3.92, 0.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -85,7 +86,7 @@ def test_sparse_multi_gpus(): ) grads_and_vars = list(zip([grads0, grads1], [var0, var1])) - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0)) strategy.run(opt.apply_gradients, [grads_and_vars]) np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) From 2af54758a832ccb74c386916a8b9fd7db0bbf2a0 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 10:41:04 +0000 Subject: [PATCH 19/24] fix bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 87b02492ff..b298ccf35a 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -87,7 +87,7 @@ def _get_grad(): new_grad, axis=list(range(len(new_grad.shape))[1:]) ) != 0 - )[0] + ) ) values = tf.gather(new_grad, indices) dense_shape = new_grad.shape From 42fccea6ffd77317d2ce072015438d60c141fa94 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 11:54:36 +0000 Subject: [PATCH 20/24] fix bug --- .../optimizers/gradient_accumulator.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index b298ccf35a..c392239039 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -75,33 +75,36 @@ def _accum_grad(grads_and_vars): if isinstance(grad, tf.IndexedSlices): handle.scatter_add(grad) - fake_grad = tf.IndexedSlices( - tf.zeros_like(grad.values), grad.indices, grad.dense_shape - ) def _get_grad(): new_grad = handle.read_value() - indices = tf.nest.flatten( + indices = tf.squeeze( tf.where( tf.reduce_sum( new_grad, axis=list(range(len(new_grad.shape))[1:]) ) != 0 - ) + ), + axis=-1, ) + values = tf.gather(new_grad, indices) - dense_shape = new_grad.shape - new_grad = tf.IndexedSlices(values, indices, dense_shape) + dense_shape = tf.constant(new_grad.shape.as_list()) handle.assign( tf.zeros_like(handle), use_locking=self._use_locking ) - return new_grad + return values, tf.cast(indices, tf.int32), dense_shape - new_grad = tf.cond( + values, indices, dense_shape = tf.cond( self.step % self._accum_steps == 0, _get_grad, - lambda: fake_grad, + lambda: ( + tf.zeros_like(grad.values), + grad.indices, + grad.dense_shape, + ), ) + new_grad = tf.IndexedSlices(values, indices, dense_shape) new_grads_and_vars.append((new_grad, var)) else: handle.assign_add(grad) From 93794ec1ddf53c132290d53f7c408c0e896d2bf4 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 12:12:05 +0000 Subject: [PATCH 21/24] simpify code --- tensorflow_addons/optimizers/gradient_accumulator.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index c392239039..0c302e4c0b 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -175,9 +175,7 @@ def apply_gradients(self, grads_and_vars, name=None, **kwargs): with tf.control_dependencies( [ self._optimizer.iterations.assign_add( - tf.cast( - tf.where(self.step % self._accum_steps == 0, 1, 0), tf.int64 - ), + tf.cast(self.step % self._accum_steps == 0, tf.int64), read_value=False, ) ] From e62cc95b5fd23e1a700b846e0384ce5b18750eb5 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 12:37:15 +0000 Subject: [PATCH 22/24] add mean reduction --- tensorflow_addons/optimizers/gradient_accumulator.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 0c302e4c0b..4bcbf8ccc7 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -26,6 +26,7 @@ def __init__( self, inner_optimizer: types.Optimizer, accum_steps: types.TensorLike = 4, + reduction: str = "SUM", name: str = "GradientAccumulator", **kwargs, ): @@ -35,6 +36,7 @@ def __init__( inner_optimizer: str or `tf.keras.optimizers.Optimizer` that will be used to compute and apply gradients. accum_steps: int > 0. Update gradient in every accumulation steps. + reduction: str, Reduction method ['SUM', 'MEAN'] name: Optional name for the operations created when applying gradients. Defaults to "GradientAccumulator". **kwargs: keyword arguments. Allowed to be {`clipnorm`, @@ -49,6 +51,7 @@ def __init__( self._step = None self._gradients = {} self._accum_steps = accum_steps + self._reduction = reduction def _accum_grad(grads_and_vars): with tf.init_scope(): @@ -78,6 +81,8 @@ def _accum_grad(grads_and_vars): def _get_grad(): new_grad = handle.read_value() + if self._reduction == "MEAN": + new_grad /= tf.cast(self._accum_steps, new_grad.dtype) indices = tf.squeeze( tf.where( tf.reduce_sum( @@ -108,10 +113,11 @@ def _get_grad(): new_grads_and_vars.append((new_grad, var)) else: handle.assign_add(grad) - fake_grad = tf.zeros_like(var) def _get_grad(): new_grad = handle.read_value() + if self._reduction == "MEAN": + new_grad /= tf.cast(self._accum_steps, new_grad.dtype) handle.assign( tf.zeros_like(handle), use_locking=self._use_locking ) @@ -120,7 +126,7 @@ def _get_grad(): new_grad = tf.cond( self.step % self._accum_steps == 0, _get_grad, - lambda: fake_grad, + lambda: tf.zeros_like(grad), ) new_grads_and_vars.append((new_grad, var)) return new_grads_and_vars From 64b70b48bbdd652fe5426230e319d6151d93d74a Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Tue, 20 Jul 2021 01:24:52 +0000 Subject: [PATCH 23/24] decrease memory usage --- .../optimizers/gradient_accumulator.py | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 4bcbf8ccc7..268e019454 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -57,24 +57,12 @@ def _accum_grad(grads_and_vars): with tf.init_scope(): if not self._gradients: for grad, var in grads_and_vars: - if tf.distribute.has_strategy(): - for v in var.values: - self._gradients[v.ref()] = tf.Variable( - tf.zeros_like(v), trainable=False - ) - else: - self._gradients[var.ref()] = tf.Variable( - tf.zeros_like(var), trainable=False - ) + self._gradients[var.ref()] = tf.Variable( + tf.zeros_like(var), trainable=False + ) new_grads_and_vars = [] for grad, var in grads_and_vars: - if tf.distribute.has_strategy(): - replica_id = tf.get_static_value( - tf.distribute.get_replica_context().replica_id_in_sync_group - ) - handle = self._gradients[var.values[replica_id].ref()] - else: - handle = self._gradients[var.ref()] + handle = self._gradients[var.ref()] if isinstance(grad, tf.IndexedSlices): handle.scatter_add(grad) From 67c1e8ea19e82c3f2a5706674dd81f15ab5002a2 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Wed, 21 Jul 2021 08:10:30 +0000 Subject: [PATCH 24/24] fix iterations --- .../optimizers/gradient_accumulator.py | 100 ++++++++---------- .../tests/gradient_accumulator_test.py | 50 +++------ 2 files changed, 60 insertions(+), 90 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 268e019454..57051f8e9e 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -49,20 +49,13 @@ def __init__( super().__init__(name, **kwargs) self._optimizer = tf.keras.optimizers.get(inner_optimizer) self._step = None - self._gradients = {} self._accum_steps = accum_steps self._reduction = reduction def _accum_grad(grads_and_vars): - with tf.init_scope(): - if not self._gradients: - for grad, var in grads_and_vars: - self._gradients[var.ref()] = tf.Variable( - tf.zeros_like(var), trainable=False - ) new_grads_and_vars = [] for grad, var in grads_and_vars: - handle = self._gradients[var.ref()] + handle = self.get_slot(var, "ga") if isinstance(grad, tf.IndexedSlices): handle.scatter_add(grad) @@ -84,9 +77,11 @@ def _get_grad(): values = tf.gather(new_grad, indices) dense_shape = tf.constant(new_grad.shape.as_list()) handle.assign( - tf.zeros_like(handle), use_locking=self._use_locking + tf.zeros_like(handle), + use_locking=self._use_locking, + read_value=False, ) - return values, tf.cast(indices, tf.int32), dense_shape + return values, tf.cast(indices, grad.indices.dtype), dense_shape values, indices, dense_shape = tf.cond( self.step % self._accum_steps == 0, @@ -100,14 +95,18 @@ def _get_grad(): new_grad = tf.IndexedSlices(values, indices, dense_shape) new_grads_and_vars.append((new_grad, var)) else: - handle.assign_add(grad) + handle.assign_add( + grad, use_locking=self._use_locking, read_value=False + ) def _get_grad(): new_grad = handle.read_value() if self._reduction == "MEAN": new_grad /= tf.cast(self._accum_steps, new_grad.dtype) handle.assign( - tf.zeros_like(handle), use_locking=self._use_locking + tf.zeros_like(handle), + use_locking=self._use_locking, + read_value=False, ) return new_grad @@ -119,11 +118,39 @@ def _get_grad(): new_grads_and_vars.append((new_grad, var)) return new_grads_and_vars - self._optimizer.gradient_transformers.append(_accum_grad) + self.gradient_transformers.append(_accum_grad) self._iterations = self._optimizer.iterations def _create_slots(self, var_list): self._optimizer._create_slots(var_list=var_list) + for var in var_list: + self.add_slot(var, "ga") + + def _resource_apply_dense(self, grad, handle, apply_state): + if "apply_state" in self._optimizer._dense_apply_args: + return self.inner_optimizer._resource_apply_dense(grad, handle, apply_state) + else: + return self.inner_optimizer._resource_apply_dense(grad, handle) + + def _resource_apply_sparse(self, grad, handle, indices, apply_state): + if "apply_state" in self._optimizer._sparse_apply_args: + return self.inner_optimizer._resource_apply_sparse( + grad, handle, indices, apply_state=apply_state + ) + else: + return self.inner_optimizer._resource_apply_sparse(grad, handle, indices) + + def _resource_apply_sparse_duplicate_indices( + self, grad, handle, indices, apply_state=None + ): + if "apply_state" in self._optimizer._sparse_apply_args: + return self.inner_optimizer._resource_apply_sparse_duplicate_indices( + grad, handle, indices, apply_state=apply_state + ) + else: + return self.inner_optimizer._resource_apply_sparse_duplicate_indices( + grad, handle, indices + ) @property def step(self): @@ -133,7 +160,6 @@ def step(self): self._step = self.add_weight( "iter", shape=[], - initializer="ones", dtype=tf.int64, trainable=False, aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA, @@ -151,49 +177,15 @@ def step(self, variable): self._step = variable self._weights.append(self._step) - @property - def gradients(self): - """The accumulated gradients on the current replica.""" - if not self._gradients: - raise ValueError( - "The accumulator should be called first to initialize the gradients" - ) - return list( - gradient.read_value() if gradient is not None else gradient - for _, gradient in self._gradients - ) - def apply_gradients(self, grads_and_vars, name=None, **kwargs): - train_op = self._optimizer.apply_gradients(grads_and_vars, name, **kwargs) - with tf.control_dependencies([train_op]): - with tf.control_dependencies( - [ - self._optimizer.iterations.assign_add( - tf.cast(self.step % self._accum_steps == 0, tf.int64), - read_value=False, - ) - ] - ): - return self.step.assign_add(1, read_value=False) - - def reset(self): - """Resets the accumulated gradients on the current replica.""" - assign_ops = [] - if not self._gradients: - return assign_ops - - for _, gradient in self._gradients: - if gradient is not None: - assign_ops.append( - gradient.assign( - tf.zeros_like(gradient), - use_locking=self._use_locking, - read_value=False, - ) + with tf.control_dependencies([self.step.assign_add(1, read_value=False)]): + train_op = super().apply_gradients(grads_and_vars, name, **kwargs) + with tf.control_dependencies([train_op]): + return self.iterations.assign_sub( + tf.cast(self.step % self._accum_steps != 0, tf.int64), + read_value=False, ) - return tf.group(assign_ops) - @property def inner_optimizer(self): """The optimizer that this LossScaleOptimizer is wrapping.""" diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 7fe4171edd..18d8d890f1 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -17,12 +17,12 @@ import numpy as np import pytest import tensorflow as tf -from tensorflow_addons.utils import test_utils from tensorflow_addons.optimizers import GradientAccumulator @pytest.mark.usefixtures("maybe_run_functions_eagerly") +@pytest.mark.with_device(["cpu", "gpu", tf.distribute.MirroredStrategy]) def test_run(): var0 = tf.Variable([1.0, 2.0]) var1 = tf.Variable([3.0, 4.0]) @@ -35,14 +35,18 @@ def test_run(): opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accum_steps) + strategy = tf.distribute.get_strategy() for _ in range(accum_steps + 1): - opt.apply_gradients(grads_and_vars) + strategy.run(opt.apply_gradients, [grads_and_vars]) np.testing.assert_allclose(var0.read_value(), [0.6, 1.6]) np.testing.assert_allclose(var1.read_value(), [2.96, 3.96]) + np.testing.assert_allclose(opt.iterations.read_value(), 1) + np.testing.assert_allclose(opt.step.read_value(), accum_steps + 1) @pytest.mark.usefixtures("maybe_run_functions_eagerly") +@pytest.mark.with_device(["cpu", "gpu", tf.distribute.MirroredStrategy]) def test_sparse(): var0 = tf.Variable([[1.0, 2.0, 0.0], [1.0, 2.0, 0.0]]) var1 = tf.Variable([[3.0, 4.0, 0.0]]) @@ -60,38 +64,13 @@ def test_sparse(): grads_and_vars = list(zip([grads0, grads1], [var0, var1])) opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0)) + strategy = tf.distribute.get_strategy() for _ in range(8): - opt.apply_gradients(grads_and_vars) + strategy.run(opt.apply_gradients, [grads_and_vars]) np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0], [0.2, 1.2, 0.0]]) np.testing.assert_allclose(var1.read_value(), [[2.92, 3.92, 0.0]]) -@pytest.mark.usefixtures("maybe_run_functions_eagerly") -@pytest.mark.needs_gpu -def test_sparse_multi_gpus(): - strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) - with strategy.scope(): - var0 = tf.Variable([[1.0, 2.0, 0.0]]) - var1 = tf.Variable([[3.0, 4.0, 0.0]]) - - grads0 = tf.IndexedSlices( - tf.constant([[0.1, 0.1, 0.0]]), - tf.constant([0]), - tf.constant([1, 3]), - ) - grads1 = tf.IndexedSlices( - tf.constant([[0.01, 0.01, 0.0]]), - tf.constant([0]), - tf.constant([1, 3]), - ) - - grads_and_vars = list(zip([grads0, grads1], [var0, var1])) - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0)) - strategy.run(opt.apply_gradients, [grads_and_vars]) - np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) - np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) - - @pytest.mark.usefixtures("maybe_run_functions_eagerly") def test_dense(): grad = tf.Variable([[0.1]]) @@ -133,7 +112,7 @@ def test_config(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") -@pytest.mark.needs_gpu +@pytest.mark.with_device([tf.distribute.MirroredStrategy]) def test_fit_simple_linear_model(): seed = 0x2019 np.random.seed(seed) @@ -142,13 +121,12 @@ def test_fit_simple_linear_model(): x = np.random.standard_normal((num_examples, 3)) w = np.random.standard_normal((3, 1)) y = np.dot(x, w) + np.random.standard_normal((num_examples, 1)) * 1e-4 - strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) - with strategy.scope(): - model = tf.keras.models.Sequential() - model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) - opt = GradientAccumulator("sgd") - model.compile(opt, loss="mse") + model = tf.keras.models.Sequential() + model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) + + opt = GradientAccumulator("sgd") + model.compile(opt, loss="mse") model.fit(x, y, epochs=5)