# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Specification of a training cluster."""
import collections
import contextlib
import heapq
import re
import lingvo.compat as tf
from lingvo.core import hyperparams
from lingvo.core import nested_map
from lingvo.core import thread_local_utils
import numpy as np
# Helper class to record the current infeed host we are working on.
InfeedContext = collections.namedtuple(
'InfeedContext', ['infeed_host_index', 'num_infeed_hosts'])
_INFEED_CONTEXT_STACK = thread_local_utils.ThreadLocalStack()
[docs]@contextlib.contextmanager
def InfeedContextScope(infeed_host_index, num_infeed_hosts):
_INFEED_CONTEXT_STACK.stack.append(
InfeedContext(infeed_host_index, num_infeed_hosts))
try:
yield
finally:
_INFEED_CONTEXT_STACK.stack.pop()
[docs]def GetInfeedContext():
return (_INFEED_CONTEXT_STACK.stack[-1] if _INFEED_CONTEXT_STACK.stack else
InfeedContext(infeed_host_index=0, num_infeed_hosts=1))
[docs]def MakeDeviceString(job_name, replica_id, task_id, device_name, device_id):
return '%s/replica:%d/task:%d/device:%s:%d' % (job_name, replica_id, task_id,
device_name, device_id)
_CLUSTER_STACK = thread_local_utils.ThreadLocalStack()
[docs]class _Cluster:
"""The whole training cluster from a single task's point of view."""
[docs] @classmethod
def _JobSpec(cls, replicas):
"""Construct a job spec param with the given number of replicas."""
p = hyperparams.Params()
# By default, we use /job:localhost so that most of tests can just
# work out of the box. trainer.py will then set job names accordingly.
p.Define('name', '/job:localhost',
'TensorFlow job spec, e.g., /job:trainer, /job:ps')
p.Define('replicas', replicas, 'The number of tasks of a job.')
p.Define(
'targets', '', 'The target network address(es) to which we can '
'create tf sessions. E.g., a single ip:port, or a list of '
'comma-separated grpc://ip:port, etc.')
p.Define('cpus_per_replica', 1, 'The number of CPU devices to use per '
'replica.')
p.Define('gpus_per_replica', 0, 'The number of GPU devices to use per '
'replica.')
p.Define(
'devices_per_split', 1, 'Devices of a replica are grouped into '
'splits. Each split contains these many devices. One split is a '
'group of devices on which the computation nodes of a graph is '
'placed upon.E.g., one can place the forward lstm on device 0 of '
'a split and place the backward lstm on device 1. etc.')
p.Define('tpus_per_replica', 0,
'The number of tpu cores to use per replica.')
p.Define('num_tpu_hosts', 0, 'The number of tpu hosts.')
p.Define('additional_worker_names', [], 'A list of additional job names')
return p
[docs] @classmethod
def Params(cls):
"""Defaults parameters for a cluster."""
p = hyperparams.InstantiableParams(cls)
p.Define(
'mode', 'async', 'A string noting the overall training method. '
'Valid values: sync, async.')
p.Define(
'job', 'trainer', 'The role of this job in the training cluster. '
'E.g., trainer_client, trainer, controller, etc.')
p.Define('task', 0, 'This process is the task-th task in the job.')
p.Define('logdir', '', 'The log directory.')
# How the cluster is composed.
#
# A typical training cluster has a few jobs (controller, worker, ps, etc).
# One can potentially place computation on any device of these jobs.
# Here, we specify how each job is configured. E.g., number of GPUs each
# task is equipped with, the number of replicas, etc.
#
# Note that trainer client may dispatch operations on just a
# smaller subset of jobs. For example, the controller only places
# computations onto the controller and ps devices; while evaler
# only places computations on the evaler devices.
#
# cluster.job refers to the role of a client process performs. It
# can be 'controller', 'trainer', 'trainer_client', 'evaler' and
# 'decoder', etc. Often, a client can be the same process as one
# of the compute devices (e.g., controller). Sometimes, they can
# be a separate processes. E.g., trainer_client is a separate
# standalone process. It places computations on the worker and
# ps devices, while itself does not host any.
p.Define('controller', cls._JobSpec(1), 'The controller job.')
p.Define('train_summaries', cls._JobSpec(1), 'The train_summaries job.')
p.Define('worker', cls._JobSpec(1), 'The worker job.')
p.Define('ps', cls._JobSpec(1), 'The ps job.')
p.Define('input', cls._JobSpec(0), 'The input job.')
p.Define('evaler', cls._JobSpec(0), 'The evaler job.')
p.Define('decoder', cls._JobSpec(0), 'The decoder job.')
# A few 'global' knobs.
p.Define('tf_data_service_address', '',
'The address of the tf.data service.')
p.Define(
'add_summary', None, 'Whether to add summaries. If None, '
'decides based on the job type.')
p.Define('do_eval', None, 'Whether to do eval.')
p.Define('in_unit_test', None, 'Whether this is running in a unit test.')
p.Define('split_id', 0, 'Split id for the model.')
p.Define(
'require_sequential_input_order', None,
'Whether the input needs to be in sequential order. '
'This is used for eval mode jobs and unit tests.')
p.Define(
'xla_device', None, 'If set to non-None, '
'this value is used instead of FLAGS.xla_device '
'for running multiple runners in the same process, '
'eg: Controller and TrainerTpu')
p.Define(
'enable_asserts', None, 'If set to non-None, '
'this value is used instead of FLAGS.enable_asserts. '
'If False, we disable all asserts ops and return tf.no_op() instead.')
p.Define(
'enable_check_numerics', None, 'If set to non-None, '
'this value is used instead of FLAGS.enable_check_numerics. '
'If False, we bypass calls to CheckNumerics.')
p.Define('reporting_job', 'evaler',
'Name of job that reports trial results.')
return p
[docs] def InitDevices(self, sess):
self._session_devices = [d.name for d in sess.list_devices()]
tf.logging.info('InitDevices %s' % sorted(self._session_devices))
[docs] def InitDevicesEager(self):
assert tf.executing_eagerly()
self._session_devices = [d.name for d in tf.config.list_logical_devices()]
tf.logging.info('InitDevices %s' % sorted(self._session_devices))
[docs] def ListDevices(self, job_spec):
"""Lists devices in the job.
Args:
job_spec: A param object specifying a job in a training cluster.
Returns:
Returns a 2D np string array. ret[i, j] is the i-th replica's j-th
devices.
Raises:
RuntimeError: the cluster configuration does not match actual devices.
"""
replicas = job_spec.replicas
if job_spec.gpus_per_replica:
device_type = 'GPU'
devices_per_replica = job_spec.gpus_per_replica
else:
device_type = 'CPU'
devices_per_replica = job_spec.num_tpu_hosts or job_spec.cpus_per_replica
ret = np.empty((replicas, devices_per_replica), np.object)
if self._session_devices:
devices = [
d for d in self._session_devices
if f'{job_spec.name}/' in d and f'/device:{device_type}:' in d
]
else:
devices = []
if devices:
def DeviceKey(device):
res = re.compile(
'/job:.*/replica:(.*)/task:(.*)/device:.*:(.*)').search(device)
replica = int(res.group(1))
task = int(res.group(2))
device_id = int(res.group(3))
return replica, task, device_id
devices.sort(key=DeviceKey)
if len(devices) != replicas * devices_per_replica:
raise RuntimeError(
'Actual list of devices does not match cluster configuration: '
f'Expected {devices_per_replica} {device_type} devices in '
f'{replicas} replicas but got {devices} for {job_spec}.')
it = iter(devices)
for i in range(replicas):
for j in range(devices_per_replica):
ret[i, j] = next(it)
else:
# Devices not initialized from session, eg. exporting inference graph or
# remote sessions not linked to this session, or InitDevices was not
# called. In such case we build device strings manually, but there's no
# guarantee that those strings will be correct.
if self._session_devices is not None:
tf.logging.info(
f'ListDevices: No devices found in the session for {job_spec.name}.'
)
else:
tf.logging.info('ListDevices: InitDevices was not called.')
for i in range(replicas):
for j in range(devices_per_replica):
if self.job == 'evaler' or self.job == 'decoder':
# For multi-task models, evaler and decoder actually have multiple
# replicas if we specify --model_task_names=['task_a','task_b',
# ...].
ret[i, j] = MakeDeviceString(job_spec.name, i, 0, device_type, j)
else:
ret[i, j] = MakeDeviceString(job_spec.name, 0, i, device_type, j)
tf.logging.info(f'ListDevices({job_spec.name}): {ret}')
return ret
def __enter__(self):
_CLUSTER_STACK.stack.append(self)
return self
def __exit__(self, type_arg, value_arg, traceback_arg):
assert _CLUSTER_STACK.stack
assert _CLUSTER_STACK.stack[-1] is self
_CLUSTER_STACK.stack.pop()
[docs] @staticmethod
def Top():
return _CLUSTER_STACK.stack[-1] if _CLUSTER_STACK.stack else None
[docs] def ExportMetrics(self, *args, **kwargs):
"""Export metrics externally."""
pass
def __init__(self, params):
self._params = params.Copy()
self._session_devices = None
self._CheckInvariants()
[docs] def _CheckInvariants(self):
"""A set of invariants about the setup of the cluster.
NOTE. Two job specs can be identical. E.g., if p.worker.name is
the same as p.ps.name, that means ps is colocated with worker.
"""
p = self.params
assert p.ps.replicas >= 0
assert p.ps.gpus_per_replica >= 0
if p.mode == 'async' and p.job == 'controller':
# There is only 1 controller.
assert p.controller.replicas == 1
assert p.task == 0
elif p.mode == 'async' and p.job == 'trainer':
assert p.worker.replicas >= 1
assert p.worker.gpus_per_replica >= 0
assert p.worker.devices_per_split >= 1
# In async mode, trainers colocate with workers.
assert 0 <= p.task and p.task < p.worker.replicas
if p.ps.replicas == 0:
# There is no ps. We are doing single-replica training.
assert p.worker.replicas == 1
elif p.mode == 'async' and p.job == 'evaler':
assert 0 <= p.task and p.task < p.evaler.replicas
elif p.mode == 'async' and p.job == 'decoder':
assert 0 <= p.task and p.task < p.decoder.replicas
elif p.mode == 'sync' and p.job == 'controller':
# There is only 1 controller.
assert p.controller.replicas == 1
assert p.task == 0
elif p.mode == 'sync' and p.job == 'trainer_client':
assert p.worker.replicas >= 1
assert p.worker.gpus_per_replica >= 0
assert p.worker.devices_per_split >= 1
elif p.mode == 'sync' and p.job == 'evaler':
assert 0 <= p.task and p.task < p.evaler.replicas
elif p.mode == 'sync' and p.job == 'decoder':
assert 0 <= p.task and p.task < p.decoder.replicas
elif p.mode == 'sync' and p.job == 'executor_tpu':
assert p.worker.replicas >= 1
elif p.job == 'train_summaries':
# There is only 1 train_summaries.
assert p.train_summaries.replicas == 1
assert p.task == 0
else:
assert False, (p.mode, p.job)
@property
def params(self):
return self._params
@property
def mode(self):
return self.params.mode
@property
def job(self):
return self.params.job
@property
def logdir(self):
return self.params.logdir
@property
def task(self):
return self.params.task
@property
def is_executor_tpu(self):
return self.params.job == 'executor_tpu'
@property
def job_spec(self):
"""Returns the current job specs."""
p = self.params
if p.job == 'controller':
return p.controller
elif p.job in ('trainer', 'worker', 'trainer_client'):
return p.worker
elif p.job == 'train_summaries':
return p.train_summaries
elif p.job == 'evaler':
return p.evaler
elif p.job == 'decoder':
return p.decoder
elif p.job == 'executor_tpu':
return p.worker
@property
def asynchronous(self):
"""Returns True if configured for asynchronous training."""
return self.params.mode == 'async'
@property
def synchronous(self):
"""Returns True if configured for synchronous training."""
return self.params.mode == 'sync'
@property
def num_replicas(self):
return self.job_spec.replicas
@property
def tpus_per_replica(self):
return self.job_spec.tpus_per_replica
@property
def num_tpu_hosts(self):
return self.job_spec.num_tpu_hosts
@property
def num_devices_per_replica(self):
return (self.job_spec.gpus_per_replica or self.job_spec.tpus_per_replica or
self.job_spec.cpus_per_replica)
@property
def total_worker_devices(self):
"""Return the total number of discrete worker devices in the cluster."""
worker_spec = self.params.worker
devices_per_replica = (
worker_spec.gpus_per_replica or worker_spec.tpus_per_replica or
self.job_spec.cpus_per_replica)
num_replicas = worker_spec.replicas
return devices_per_replica * num_replicas
@property
def num_devices_per_split(self):
"""Return number of accelerators to use per split."""
return self.job_spec.devices_per_split
@property
def num_splits_per_replica(self):
# Note that a split must be within a replica.
assert self.num_devices_per_replica % self.num_devices_per_split == 0
return int(self.num_devices_per_replica / self.num_devices_per_split)
@property
def num_splits_per_client(self):
"""The number of splits visible by one trainer client."""
if self.synchronous and self.job == 'trainer_client':
# One client drives all the workers.
return self.num_splits_per_replica * self.num_replicas
elif self.synchronous and self.job == 'executor_tpu':
# One client drives all the workers.
return self.num_splits_per_replica * self.num_replicas
else:
# One client colocates with one worker and drives the worker only.
return self.num_splits_per_replica
@property
def available_devices(self):
"""Returns all compute devices available in a 2D array.
Returns:
A 2D array (python list of python lists) of strings. ret[i, j]
is the j-th visible device on i-th visible replica.
"""
from lingvo.core import py_utils # pylint: disable=g-import-not-at-top
if self.job_spec.tpus_per_replica and not py_utils.IsEagerMode():
ret = np.empty((1, self.num_devices_per_split), np.object)
for i in range(self.num_devices_per_split):
ret[0, i] = tf.tpu.core(i)
return ret
if self.job == 'trainer' and self.asynchronous:
# In async mode, each trainer task can only use its own devices.
return self.ListDevices(self.job_spec)[self.task:(self.task + 1), :]
if self.job == 'trainer_client' and self.synchronous:
# In sync mode, trainer_client can use every device.
return self.ListDevices(self.job_spec)
if self.job == 'executor_tpu' and self.synchronous:
# executor_tpu can use every device.
return self.ListDevices(self.job_spec)
if self.job in ('controller', 'train_summaries', 'evaler', 'decoder'):
# Our current policy is that each controller/evaler/decoder task
# only uses 1 replica.
return self.ListDevices(self.job_spec)[self.task:(self.task + 1), :]
assert False, (self.job, self.mode)
@property
def input_device(self):
"""Returns the tensorflow device name to place input op on."""
p = self.params
if self.synchronous and p.input.replicas > 0:
# Uses a separate job for input processing.
assert p.input.replicas == 1
return self.ListDevices(p.input)[0, 0]
else:
return ''
@property
def all_worker_names(self):
return [self.job_spec.name] + self.job_spec.additional_worker_names
@property
def input_targets(self):
"""Returns a list of network addresses of the input job."""
p = self.params.input
if not p.targets:
return []
targets = p.targets.split(',')
assert p.replicas == len(targets), '{} vs. {}'.format(p.replicas, targets)
return targets
[docs] def WorkerDeviceInModelSplit(self, device_index):
"""Returns the device to use for 'device_index' for the current model split.
Args:
device_index: An int, the device index within 'model_split'.
Returns:
A string. The device to place ops onto.
Raises:
ValueError: if split_id of cluster is incorrectly set.
"""
devices = self.available_devices.reshape([-1]).tolist()
if not devices:
return ''
else:
split_id = self.params.split_id
if split_id < 0 or split_id >= self.num_splits_per_client:
raise ValueError('split_id (%d) not in [0, %d)' %
(split_id, self.num_splits_per_client))
devices_per_split = self.num_devices_per_split
return devices[devices_per_split * split_id +
device_index % devices_per_split]
[docs] def GetPlacer(self, strategy=None):
"""Returns a device function for placing ops within the cluster.
Args:
strategy: A string. Identifier for a placement strategy. By default, we
use a least loaded policy to place variables.
Returns:
Returns a device function can be used in tf.device().
Raises:
ValueError: when strategy is not supported.
"""
if self.job == 'evaler' or self.job == 'decoder':
# Currently, we only support evaler/decoder uses 1 accelerator.
return self.ListDevices(self.job_spec)[self.task, 0]
elif strategy is None:
return _LeastLoadedPlacer(self).DeviceFunction
raise ValueError('Unsupported placement policy: ', strategy)
@property
def tf_data_service_address(self):
return self.params.tf_data_service_address
@property
def add_summary(self):
p = self.params
if p.add_summary is None:
return self.job in ['controller', 'train_summaries', 'evaler', 'decoder']
else:
return p.add_summary
@property
def do_eval(self):
return self.params.do_eval
@property
def in_unit_test(self):
return self.params.in_unit_test
@property
def require_sequential_input_order(self):
if self.params.require_sequential_input_order is not None:
return self.params.require_sequential_input_order
# By default require sequential order when in eval mode.
return self.params.do_eval
@property
def worker_cluster_def(self):
"""Returns a tf.train.ClusterDef representing the worker cluster."""
p = self.params.worker
if not p.targets:
return None
job = p.name.replace('/job:', '', 1)
workers = [addr.replace('grpc://', '', 1) for addr in p.targets.split(',')]
return tf.train.ClusterSpec({job: workers}).as_cluster_def()
@property
def reporting_job(self):
return self.params.reporting_job
# Ops that must be placed on the 'ps' devices.
_VAR_OPS = ['Variable', 'VariableV2', 'AutoReloadVariable', 'VarHandleOp']
[docs]class VarPlacer:
"""Placer which places variables across a set of devices.
VarPlacer places non-variable ops on the worker device.
"""
def __init__(self, cluster):
self._cluster = cluster
self._devices = cluster.ListDevices(cluster.job_spec)
[docs] def _AssignVar(self, _):
raise ValueError('Unimplemented')
[docs] def DeviceFunction(self, op):
"""Choose a device for 'op'.
Args:
op: an Operation.
Returns:
The device to use for the Operation.
"""
# Op has already assigned to a device explicitly. Don't change it.
if op.device:
return op.device
# Place vars according our policy.
if op.type in _VAR_OPS:
return self._AssignVar(op)
# The default policy is to place the op on the 1st device visible
# to this task.
assert self._devices is not None, ('Unexpected job: %s' % self._cluster.job)
task = self._cluster.params.task
assert 0 <= task and task < len(self._devices)
return self._devices[task, 0]
[docs]class _LeastLoadedPlacer(VarPlacer):
"""Placer which places a variable on the least loaded var device.
We use total byte sizes of variables placed on a device to indicate
the device's load.
"""
def __init__(self, cluster):
super().__init__(cluster)
# A min heap of (size, device)
var_devices = cluster.ListDevices(cluster.params.ps)
tf.logging.info('_LeastLoadedPlacer : %s', var_devices)
self._var_space_pq = [(0, d) for d in var_devices.flatten().tolist()]
[docs] def _AssignVar(self, var_op):
size = var_op.get_attr('dtype').size
shape = tf.TensorShape(var_op.get_attr('shape'))
assert self._var_space_pq, ('No ps devices to use.')
allocated, device = heapq.heappop(self._var_space_pq)
if shape.num_elements() is None:
# For vars whose shape aren't known statically, make a constant
# estimate to avoid introducing more complexities.
var_bytes = 10 * 1024**2 * size
else:
var_bytes = shape.num_elements() * size
allocated += var_bytes
heapq.heappush(self._var_space_pq, (allocated, device))
tf.logging.info('Place variable %s on %s %d(+%d)', var_op.name, device,
allocated, var_bytes)
return device
[docs]def ParseDeviceString(device_str):
"""Parse a device string and return a NestedMap.
Args:
device_str: a device string in the format of that may contain up to 4 parts:
job, replica, task, and device.
Returns:
parsed_device: a NestedMap that maps job, replica, task, and device to their
corresponding value.
"""
parsed_device = nested_map.NestedMap()
device_parts = device_str.split('/')
for device_part in device_parts:
if device_part.startswith('job:'):
parsed_device.job = device_part[4:]
elif device_part.startswith('replica:'):
parsed_device.replica = int(device_part[8:])
elif device_part.startswith('task:'):
parsed_device.task = int(device_part[5:])
elif device_part.startswith('device:'):
parsed_device.device = device_part[7:].split(':')[0]
return parsed_device