Shortcuts

Accelerators

Accelerators connect a Lightning Trainer to arbitrary accelerators (CPUs, GPUs, TPUs, etc). Accelerators also manage distributed accelerators (like DP, DDP, HPC cluster).

Accelerators can also be configured to run on arbitrary clusters using Plugins or to link up to arbitrary computational strategies like 16-bit precision via AMP and Apex.


Implement a custom accelerator

To link up arbitrary hardware, implement your own Accelerator subclass

from pytorch_lightning.accelerators.accelerator import Accelerator

    class MyAccelerator(Accelerator):
        def __init__(self, trainer, cluster_environment=None):
            super().__init__(trainer, cluster_environment)
            self.nickname = 'my_accelator'

        def setup(self):
            # find local rank, etc, custom things to implement

        def train(self):
            # implement what happens during training

        def training_step(self):
            # implement how to do a training_step on this accelerator

        def validation_step(self):
            # implement how to do a validation_step on this accelerator

        def test_step(self):
            # implement how to do a test_step on this accelerator

        def backward(self, closure_loss, optimizer, opt_idx, *args, **kwargs):
            # implement how to do a backward pass with this accelerator

        def barrier(self, name: Optional[str] = None):
            # implement this accelerator's barrier

        def broadcast(self, obj, src=0):
            # implement this accelerator's broadcast function

        def sync_tensor(self,
                        tensor: Union[torch.Tensor],
                        group: Optional[Any] = None,
                        reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor:
            # implement how to sync tensors when reducing metrics across accelerators

Examples

The following examples illustrate customizing accelerators.

Example 1: Arbitrary HPC cluster

To link any accelerator with an arbitrary cluster (SLURM, Condor, etc), pass in a Cluster Plugin which will be passed into any accelerator.

First, implement your own ClusterEnvironment. Here is the torch elastic implementation.

import os
from pytorch_lightning import _logger as log
from pytorch_lightning.utilities import rank_zero_warn
from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment

class TorchElasticEnvironment(ClusterEnvironment):

    def __init__(self):
        super().__init__()

    def master_address(self):
        if "MASTER_ADDR" not in os.environ:
            rank_zero_warn(
                "MASTER_ADDR environment variable is not defined. Set as localhost"
            )
            os.environ["MASTER_ADDR"] = "127.0.0.1"
        log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
        master_address = os.environ.get('MASTER_ADDR')
        return master_address

    def master_port(self):
        if "MASTER_PORT" not in os.environ:
            rank_zero_warn(
                "MASTER_PORT environment variable is not defined. Set as 12910"
            )
            os.environ["MASTER_PORT"] = "12910"
        log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}")

        port = os.environ.get('MASTER_PORT')
        return port

    def world_size(self):
        return os.environ.get('WORLD_SIZE')

    def local_rank(self):
        return int(os.environ['LOCAL_RANK'])

Now, pass it into the trainer which will use Torch Elastic across your accelerator of choice.

cluster = TorchElasticEnvironment()
accelerator = MyAccelerator()
trainer = Trainer(plugins=[cluster], accelerator=MyAccelerator())

In this example, MyAccelerator can define arbitrary hardware (like IPUs or TPUs) and links it to an arbitrary compute cluster.


Available Accelerators

CPU Accelerator

class pytorch_lightning.accelerators.cpu_accelerator.CPUAccelerator(trainer, cluster_environment=None)[source]

Bases: pytorch_lightning.accelerators.accelerator.Accelerator

Runs training on CPU

Example:

# default
trainer = Trainer(accelerator=CPUAccelerator())
sync_tensor(tensor, group=None, reduce_op=None)[source]

Function to reduce a tensor from several distributed processes to one aggregated tensor.

Parameters
  • tensor (Tensor) – the tensor to sync and reduce

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value

DDP Accelerator

class pytorch_lightning.accelerators.ddp_accelerator.DDPAccelerator(trainer, cluster_environment=None, ddp_plugin=None)[source]

Bases: pytorch_lightning.accelerators.accelerator.Accelerator

Runs training using DDP strategy on a single machine (manually, not via cluster start)

Example:

# default
trainer = Trainer(accelerator=DDPAccelerator())
configure_sync_batchnorm(model)[source]

Add global batchnorm for a model spread across multiple GPUs and nodes.

Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.

Parameters

model (LightningModule) – pointer to current LightningModule.

Return type

LightningModule

Returns

LightningModule with batchnorm layers synchronized between process groups

ddp_train(process_idx, model)[source]

Entry point for ddp

Parameters
  • process_idx

  • mp_queue – multiprocessing queue

  • model

Returns

Dict with evaluation results

sync_tensor(tensor, group=None, reduce_op=None)[source]
Return type

Tensor

DDP2 Accelerator

class pytorch_lightning.accelerators.ddp2_accelerator.DDP2Accelerator(trainer, cluster_environment=None, ddp_plugin=None)[source]

Bases: pytorch_lightning.accelerators.accelerator.Accelerator

Runs training using DDP2 strategy on a cluster

Example:

# default
trainer = Trainer(accelerator=DDP2Accelerator())
configure_sync_batchnorm(model)[source]

Add global batchnorm for a model spread across multiple GPUs and nodes.

Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.

Parameters

model (LightningModule) – pointer to current LightningModule.

Return type

LightningModule

Returns

LightningModule with batchnorm layers synchronized between process groups

ddp_train(process_idx, mp_queue, model)[source]

Entry point for ddp

Parameters
  • process_idx – current process rank

  • mp_queue – multiprocessing queue

  • model – pointer to current LightningModule

Returns

Dict with evaluation results

sync_tensor(tensor, group=None, reduce_op=None)[source]

Function to reduce a tensor from several distributed processes to one aggregated tensor.

Parameters
  • tensor (Tensor) – the tensor to sync and reduce

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value

DDP CPU HPC Accelerator

class pytorch_lightning.accelerators.ddp_cpu_hpc_accelerator.DDPCPUHPCAccelerator(trainer, cluster_environment=None, ddp_plugin=None)[source]

Bases: pytorch_lightning.accelerators.ddp_hpc_accelerator.DDPHPCAccelerator

Runs training using DDP (with CPUs) strategy on a cluster

Example:

# default
trainer = Trainer(accelerator=DDPCPUHPCAccelerator())

DDP CPU Spawn Accelerator

class pytorch_lightning.accelerators.ddp_cpu_spawn_accelerator.DDPCPUSpawnAccelerator(trainer, nprocs, cluster_environment=None, ddp_plugin=None)[source]

Bases: pytorch_lightning.accelerators.accelerator.Accelerator

Runs training using DDP (on a single machine or manually on multiple machines), using mp.spawn

Example:

# default
trainer = Trainer(accelerator=DDPCPUSpawnAccelerator())
configure_sync_batchnorm(model)[source]

Add global batchnorm for a model spread across multiple GPUs and nodes.

Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.

Parameters

model (LightningModule) – pointer to current LightningModule.

Return type

LightningModule

Returns

LightningModule with batchnorm layers synchronized between process groups

ddp_train(process_idx, mp_queue, model)[source]

Entry point for ddp

Parameters
  • process_idx

  • mp_queue – multiprocessing queue

  • model

sync_tensor(tensor, group=None, reduce_op=None)[source]

Function to reduce a tensor from several distributed processes to one aggregated tensor.

Parameters
  • tensor (Tensor) – the tensor to sync and reduce

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value

DDP HPC Accelerator

class pytorch_lightning.accelerators.ddp_hpc_accelerator.DDPHPCAccelerator(trainer, cluster_environment=None, ddp_plugin=None)[source]

Bases: pytorch_lightning.accelerators.accelerator.Accelerator

Runs training using DDP on an HPC cluster

Example:

# default
trainer = Trainer(accelerator=DDPHPCAccelerator())
configure_sync_batchnorm(model)[source]

Add global batchnorm for a model spread across multiple GPUs and nodes.

Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.

Parameters

model (LightningModule) – pointer to current LightningModule.

Return type

LightningModule

Returns

LightningModule with batchnorm layers synchronized between process groups

ddp_train(process_idx, model)[source]

Entry point for ddp

Parameters
  • process_idx

  • mp_queue – multiprocessing queue

  • model

Returns

Dict with evaluation results

sync_tensor(tensor, group=None, reduce_op=None)[source]

Function to reduce a tensor from several distributed processes to one aggregated tensor.

Parameters
  • tensor (Tensor) – the tensor to sync and reduce

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value

DDP Spawn Accelerator

class pytorch_lightning.accelerators.ddp_spawn_accelerator.DDPSpawnAccelerator(trainer, nprocs, cluster_environment=None, ddp_plugin=None)[source]

Bases: pytorch_lightning.accelerators.accelerator.Accelerator

Runs training using DDP using mp.spawn via manual launch (not cluster launch)

Example:

# default
trainer = Trainer(accelerator=DDPSpawnAccelerator())
configure_sync_batchnorm(model)[source]

Add global batchnorm for a model spread across multiple GPUs and nodes.

Override to synchronize batchnorm between specific process groups instead of the whole world or use a different sync_bn like apex’s version.

Parameters

model (LightningModule) – pointer to current LightningModule.

Return type

LightningModule

Returns

LightningModule with batchnorm layers synchronized between process groups

ddp_train(process_idx, mp_queue, model, is_master=False, proc_offset=0)[source]

Entry point for ddp

Parameters
  • process_idx

  • mp_queue – multiprocessing queue

  • model

sync_tensor(tensor, group=None, reduce_op=None)[source]

Function to reduce a tensor from several distributed processes to one aggregated tensor.

Parameters
  • tensor (Tensor) – the tensor to sync and reduce

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value

GPU Accelerator

class pytorch_lightning.accelerators.gpu_accelerator.GPUAccelerator(trainer, cluster_environment=None)[source]

Bases: pytorch_lightning.accelerators.accelerator.Accelerator

Runs training using a single GPU

Example:

# default
trainer = Trainer(accelerator=GPUAccelerator())
sync_tensor(tensor, group=None, reduce_op=None)[source]

Function to reduce a tensor from several distributed processes to one aggregated tensor.

Parameters
  • tensor (Tensor) – the tensor to sync and reduce

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value

Horovod Accelerator

class pytorch_lightning.accelerators.horovod_accelerator.HorovodAccelerator(trainer, cluster_environment=None)[source]

Bases: pytorch_lightning.accelerators.accelerator.Accelerator

Runs training using horovod

Example:

# default
trainer = Trainer(accelerator=HorovodAccelerator())
sync_tensor(tensor, group=None, reduce_op=None)[source]

Function to reduce a tensor from several distributed processes to one aggregated tensor.

Parameters
  • tensor (Tensor) – the tensor to sync and reduce

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value

TPU Accelerator

class pytorch_lightning.accelerators.tpu_accelerator.TPUAccelerator(trainer, cluster_environment=None)[source]

Bases: pytorch_lightning.accelerators.accelerator.Accelerator

Runs training using TPUs (colab, single machine or pod)

Example:

# default
trainer = Trainer(accelerator=TPUAccelerator())
load_spawn_weights(original_model)[source]

Load the temp weights saved in the process To recover the trained model from the ddp process we load the saved weights

save_spawn_weights(model)[source]

Dump a temporary checkpoint after ddp ends to get weights out of the process

sync_tensor(tensor, group=None, reduce_op=None)[source]

Function to reduce a tensor from several distributed processes to one aggregated tensor.

Parameters
  • tensor (Tensor) – the tensor to sync and reduce

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value

to_device(batch)[source]

Transfers the data to the TPU.

Parameters
  • batch – A tensor or collection of tensors.

  • tpu_id – The id of the TPU core. If omitted, the first available core is chosen.

Returns

the tensor on the TPU device.

See also

  • move_data_to_device()

tpu_train_in_process(tpu_core_idx, model, trainer=None, mp_queue=None)[source]

Here we are inside each individual process