Accelerators¶
Accelerators connect a Lightning Trainer to arbitrary accelerators (CPUs, GPUs, TPUs, etc). Accelerators also manage distributed accelerators (like DP, DDP, HPC cluster).
Accelerators can also be configured to run on arbitrary clusters using Plugins or to link up to arbitrary computational strategies like 16-bit precision via AMP and Apex.
Implement a custom accelerator¶
To link up arbitrary hardware, implement your own Accelerator subclass
from pytorch_lightning.accelerators.accelerator import Accelerator
class MyAccelerator(Accelerator):
def __init__(self, trainer, cluster_environment=None):
super().__init__(trainer, cluster_environment)
self.nickname = 'my_accelerator'
def setup(self):
# find local rank, etc, custom things to implement
def train(self):
# implement what happens during training
def training_step(self):
# implement how to do a training_step on this accelerator
def validation_step(self):
# implement how to do a validation_step on this accelerator
def test_step(self):
# implement how to do a test_step on this accelerator
def backward(self, closure_loss, optimizer, opt_idx, *args, **kwargs):
# implement how to do a backward pass with this accelerator
def barrier(self, name: Optional[str] = None):
# implement this accelerator's barrier
def broadcast(self, obj, src=0):
# implement this accelerator's broadcast function
def sync_tensor(self,
tensor: Union[torch.Tensor],
group: Optional[Any] = None,
reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor:
# implement how to sync tensors when reducing metrics across accelerators
Examples¶
The following examples illustrate customizing accelerators.
Example 1: Arbitrary HPC cluster¶
To link any accelerator with an arbitrary cluster (SLURM, Condor, etc), pass in a Cluster Plugin which will be passed into any accelerator.
First, implement your own ClusterEnvironment. Here is the torch elastic implementation.
import os
from pytorch_lightning import _logger as log
from pytorch_lightning.utilities import rank_zero_warn
from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment
class TorchElasticEnvironment(ClusterEnvironment):
def __init__(self):
super().__init__()
def master_address(self):
if "MASTER_ADDR" not in os.environ:
rank_zero_warn(
"MASTER_ADDR environment variable is not defined. Set as localhost"
)
os.environ["MASTER_ADDR"] = "127.0.0.1"
log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
master_address = os.environ.get('MASTER_ADDR')
return master_address
def master_port(self):
if "MASTER_PORT" not in os.environ:
rank_zero_warn(
"MASTER_PORT environment variable is not defined. Set as 12910"
)
os.environ["MASTER_PORT"] = "12910"
log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}")
port = os.environ.get('MASTER_PORT')
return port
def world_size(self):
return os.environ.get('WORLD_SIZE')
def local_rank(self):
return int(os.environ['LOCAL_RANK'])
Now, pass it into the trainer which will use Torch Elastic across your accelerator of choice.
cluster = TorchElasticEnvironment()
accelerator = MyAccelerator()
trainer = Trainer(plugins=[cluster], accelerator=MyAccelerator())
In this example, MyAccelerator can define arbitrary hardware (like IPUs or TPUs) and links it to an arbitrary compute cluster.
Available Accelerators¶
CPU Accelerator¶
-
class
pytorch_lightning.accelerators.cpu_accelerator.
CPUAccelerator
(trainer, cluster_environment=None)[source] Bases:
pytorch_lightning.accelerators.accelerator.Accelerator
Runs training on CPU
Example:
# default trainer = Trainer(accelerator=CPUAccelerator())
-
sync_tensor
(tensor, group=None, reduce_op=None)[source] Function to reduce a tensor from several distributed processes to one aggregated tensor.
- Parameters
- Return type
- Returns
reduced value
-
DDP Accelerator¶
-
class
pytorch_lightning.accelerators.ddp_accelerator.
DDPAccelerator
(trainer=None, cluster_environment=None, ddp_plugin=None)[source] Bases:
pytorch_lightning.accelerators.accelerator.Accelerator
Runs training using DDP strategy on a single machine (manually, not via cluster start)
Example:
# default trainer = Trainer(accelerator=DDPAccelerator())
-
all_gather
(tensor, group=None, sync_grads=False)[source] Function to gather a tensor from several distributed processes
-
configure_sync_batchnorm
(model)[source] Add global batchnorm for a model spread across multiple GPUs and nodes.
Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.
- Parameters
model¶ (
LightningModule
) – pointer to currentLightningModule
.- Return type
- Returns
LightningModule with batchnorm layers synchronized between process groups
-
ddp_train
(process_idx, model)[source] Entry point for ddp
-
get_reference_model
(model)[source] Override to modify returning base
LightningModule
when accessing variable and functions if the accelerator has wrapped the model.- Example::
ref_model = accelerator.get_reference_model(model) ref_model.training_step(…)
- Parameters
model¶ – Accelerator model.
Returns: Reference
LightningModule
.- Return type
-
DDP2 Accelerator¶
-
class
pytorch_lightning.accelerators.ddp2_accelerator.
DDP2Accelerator
(trainer, cluster_environment=None, ddp_plugin=None)[source] Bases:
pytorch_lightning.accelerators.accelerator.Accelerator
Runs training using DDP2 strategy on a cluster
Example:
# default trainer = Trainer(accelerator=DDP2Accelerator())
-
all_gather
(tensor, group=None, sync_grads=False)[source] Function to gather a tensor from several distributed processes
-
configure_sync_batchnorm
(model)[source] Add global batchnorm for a model spread across multiple GPUs and nodes.
Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.
- Parameters
model¶ (
LightningModule
) – pointer to currentLightningModule
.- Return type
- Returns
LightningModule with batchnorm layers synchronized between process groups
-
ddp_train
(process_idx, mp_queue, model)[source] Entry point for ddp
-
get_reference_model
(model)[source] Override to modify returning base
LightningModule
when accessing variable and functions if the accelerator has wrapped the model.- Example::
ref_model = accelerator.get_reference_model(model) ref_model.training_step(…)
- Parameters
model¶ – Accelerator model.
Returns: Reference
LightningModule
.- Return type
-
sync_tensor
(tensor, group=None, reduce_op=None)[source] Function to reduce a tensor from several distributed processes to one aggregated tensor.
- Parameters
- Return type
- Returns
reduced value
-
DDP CPU HPC Accelerator¶
-
class
pytorch_lightning.accelerators.ddp_cpu_hpc_accelerator.
DDPCPUHPCAccelerator
(trainer, cluster_environment=None, ddp_plugin=None)[source] Bases:
pytorch_lightning.accelerators.ddp_hpc_accelerator.DDPHPCAccelerator
Runs training using DDP (with CPUs) strategy on a cluster
Example:
# default trainer = Trainer(accelerator=DDPCPUHPCAccelerator())
DDP CPU Spawn Accelerator¶
-
class
pytorch_lightning.accelerators.ddp_cpu_spawn_accelerator.
DDPCPUSpawnAccelerator
(trainer, nprocs, cluster_environment=None, ddp_plugin=None)[source] Bases:
pytorch_lightning.accelerators.accelerator.Accelerator
Runs training using DDP (on a single machine or manually on multiple machines), using mp.spawn
Example:
# default trainer = Trainer(accelerator=DDPCPUSpawnAccelerator())
-
all_gather
(tensor, group=None, sync_grads=False)[source] Function to gather a tensor from several distributed processes
-
configure_sync_batchnorm
(model)[source] Add global batchnorm for a model spread across multiple GPUs and nodes.
Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.
- Parameters
model¶ (
LightningModule
) – pointer to currentLightningModule
.- Return type
- Returns
LightningModule with batchnorm layers synchronized between process groups
-
ddp_train
(process_idx, mp_queue, model)[source] Entry point for ddp
-
get_reference_model
(model)[source] Override to modify returning base
LightningModule
when accessing variable and functions if the accelerator has wrapped the model.- Example::
ref_model = accelerator.get_reference_model(model) ref_model.training_step(…)
- Parameters
model¶ – Accelerator model.
Returns: Reference
LightningModule
.- Return type
-
sync_tensor
(tensor, group=None, reduce_op=None)[source] Function to reduce a tensor from several distributed processes to one aggregated tensor.
- Parameters
- Return type
- Returns
reduced value
-
DDP HPC Accelerator¶
-
class
pytorch_lightning.accelerators.ddp_hpc_accelerator.
DDPHPCAccelerator
(trainer, cluster_environment=None, ddp_plugin=None)[source] Bases:
pytorch_lightning.accelerators.accelerator.Accelerator
Runs training using DDP on an HPC cluster
Example:
# default trainer = Trainer(accelerator=DDPHPCAccelerator())
-
all_gather
(tensor, group=None, sync_grads=False)[source] Function to gather a tensor from several distributed processes
-
configure_sync_batchnorm
(model)[source] Add global batchnorm for a model spread across multiple GPUs and nodes.
Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.
- Parameters
model¶ (
LightningModule
) – pointer to currentLightningModule
.- Return type
- Returns
LightningModule with batchnorm layers synchronized between process groups
-
ddp_train
(process_idx, model)[source] Entry point for ddp
-
get_reference_model
(model)[source] Override to modify returning base
LightningModule
when accessing variable and functions if the accelerator has wrapped the model.- Example::
ref_model = accelerator.get_reference_model(model) ref_model.training_step(…)
- Parameters
model¶ – Accelerator model.
Returns: Reference
LightningModule
.- Return type
-
sync_tensor
(tensor, group=None, reduce_op=None)[source] Function to reduce a tensor from several distributed processes to one aggregated tensor.
- Parameters
- Return type
- Returns
reduced value
-
DDP Spawn Accelerator¶
-
class
pytorch_lightning.accelerators.ddp_spawn_accelerator.
DDPSpawnAccelerator
(trainer, nprocs, cluster_environment=None, ddp_plugin=None)[source] Bases:
pytorch_lightning.accelerators.accelerator.Accelerator
Runs training using DDP using mp.spawn via manual launch (not cluster launch)
Example:
# default trainer = Trainer(accelerator=DDPSpawnAccelerator())
-
all_gather
(tensor, group=None, sync_grads=False)[source] Function to gather a tensor from several distributed processes
-
configure_sync_batchnorm
(model)[source] Add global batchnorm for a model spread across multiple GPUs and nodes.
Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.
- Parameters
model¶ (
LightningModule
) – pointer to currentLightningModule
.- Return type
- Returns
LightningModule with batchnorm layers synchronized between process groups
-
ddp_train
(process_idx, mp_queue, model, is_master=False, proc_offset=0)[source] Entry point for ddp
-
get_reference_model
(model)[source] Override to modify returning base
LightningModule
when accessing variable and functions if the accelerator has wrapped the model.- Example::
ref_model = accelerator.get_reference_model(model) ref_model.training_step(…)
- Parameters
model¶ – Accelerator model.
Returns: Reference
LightningModule
.- Return type
-
sync_tensor
(tensor, group=None, reduce_op=None)[source] Function to reduce a tensor from several distributed processes to one aggregated tensor.
- Parameters
- Return type
- Returns
reduced value
-
GPU Accelerator¶
-
class
pytorch_lightning.accelerators.gpu_accelerator.
GPUAccelerator
(trainer, cluster_environment=None)[source] Bases:
pytorch_lightning.accelerators.accelerator.Accelerator
Runs training using a single GPU
Example:
# default trainer = Trainer(accelerator=GPUAccelerator())
-
sync_tensor
(tensor, group=None, reduce_op=None)[source] Function to reduce a tensor from several distributed processes to one aggregated tensor.
- Parameters
- Return type
- Returns
reduced value
-
Horovod Accelerator¶
-
class
pytorch_lightning.accelerators.horovod_accelerator.
HorovodAccelerator
(trainer, cluster_environment=None)[source] Bases:
pytorch_lightning.accelerators.accelerator.Accelerator
Runs training using horovod
Example:
# default trainer = Trainer(accelerator=HorovodAccelerator())
-
sync_tensor
(tensor, group=None, reduce_op=None)[source] Function to reduce a tensor from several distributed processes to one aggregated tensor.
- Parameters
- Return type
- Returns
reduced value
-
TPU Accelerator¶
-
class
pytorch_lightning.accelerators.tpu_accelerator.
TPUAccelerator
(trainer, cluster_environment=None)[source] Bases:
pytorch_lightning.accelerators.accelerator.Accelerator
Runs training using TPUs (colab, single machine or pod)
Example:
# default trainer = Trainer(accelerator=TPUAccelerator())
-
load_spawn_weights
(original_model)[source] Load the temp weights saved in the process To recover the trained model from the ddp process we load the saved weights
-
on_save
(checkpoint)[source] Move XLA tensors to CPU before saving Recommended on XLA Guide: https://github.com/pytorch/xla/blob/master/API_GUIDE.md#saving-and-loading-xla-tensors
-
save_spawn_weights
(model)[source] Dump a temporary checkpoint after ddp ends to get weights out of the process
-
sync_tensor
(tensor, group=None, reduce_op=None)[source] Function to reduce a tensor from several distributed processes to one aggregated tensor.
- Parameters
- Return type
- Returns
reduced value
-
to_device
(batch)[source] Transfers the data to the TPU.
- Parameters
- Returns
the tensor on the TPU device.
See also
move_data_to_device()
-
tpu_train_in_process
(tpu_core_idx, model, trainer=None, mp_queue=None)[source] Here we are inside each individual process
-