Shortcuts

distributed

Functions

all_gather_ddp_if_available

Function to gather a tensor from several distributed processes.

distributed_available

rtype

bool

gather_all_tensors

Function to gather all tensors from several ddp processes onto a list that is broadcasted to all processes.

get_default_process_group_backend_for_device

rtype

str

init_dist_connection

Utility function to initialize distributed connection by setting env variables and initializing the distributed process group.

rank_zero_debug

rtype

Any

rank_zero_info

rtype

Any

register_ddp_comm_hook

Function to register communication hook for DDP model https://pytorch.org/docs/master/ddp_comm_hooks.html.

sync_ddp

Function to reduce the tensors from several ddp processes to one main process.

sync_ddp_if_available

Function to reduce a tensor across worker processes during distributed training.

tpu_distributed

rtype

bool

Classes

AllGatherGrad

ReduceOp

group

Utilities that can be used with distributed training.

class pytorch_lightning.utilities.distributed.AllGatherGrad(*args, **kwargs)[source]

Bases: torch.autograd.Function

pytorch_lightning.utilities.distributed.all_gather_ddp_if_available(tensor, group=None, sync_grads=False)[source]

Function to gather a tensor from several distributed processes.

Parameters
  • tensor (Tensor) – tensor of shape (batch, …)

  • group (Optional[ProcessGroup]) – the process group to gather results from. Defaults to all processes (world)

  • sync_grads (bool) – flag that allows users to synchronize gradients for all_gather op

Return type

Tensor

Returns

A tensor of shape (world_size, batch, …)

pytorch_lightning.utilities.distributed.gather_all_tensors(result, group=None)[source]

Function to gather all tensors from several ddp processes onto a list that is broadcasted to all processes.

Parameters
  • result (Tensor) – the value to sync

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

Returns

list with size equal to the process group where

gathered_result[i] corresponds to result tensor from process i

Return type

gathered_result

pytorch_lightning.utilities.distributed.init_dist_connection(cluster_environment, torch_distributed_backend, global_rank=None, world_size=None, **kwargs)[source]

Utility function to initialize distributed connection by setting env variables and initializing the distributed process group.

Parameters
  • cluster_environment (ClusterEnvironment) – ClusterEnvironment instance

  • torch_distributed_backend (str) – backend to use (includes nccl and gloo)

  • global_rank (Optional[int]) – rank of the current process

  • world_size (Optional[int]) – number of processes in the group

  • kwargs (Any) – kwargs for init_process_group

Raises

RuntimeError – If torch.distributed is not available

Return type

None

pytorch_lightning.utilities.distributed.register_ddp_comm_hook(model, ddp_comm_state=None, ddp_comm_hook=None, ddp_comm_wrapper=None)[source]

Function to register communication hook for DDP model https://pytorch.org/docs/master/ddp_comm_hooks.html.

Parameters
  • model (DistributedDataParallel) – DDP model

  • ddp_comm_state (Optional[object]) – state is passed to the hook and can be used to maintain and update any state information that users would like to maintain as part of the training process. Examples: error feedback in gradient compression, peers to communicate with next in GossipGrad etc.

  • ddp_comm_hook (Optional[Callable]) –

    hook(state: object, bucket: dist._GradBucket) -> torch.futures.Future

    This callable function is called once the bucket is ready. The hook can perform whatever processing is needed and return a Future indicating completion of any async work (ex: allreduce). If the hook doesn’t perform any communication, it can also just return a completed Future. The Future should hold the new value of grad bucket’s tensors. Once a bucket is ready, c10d reducer would call this hook and use the tensors returned by the Future and copy grads to individual parameters.

  • ddp_comm_wrapper (Optional[Callable]) – communication hook wrapper to support a communication hook such as FP16 compression as wrapper, which could be combined with ddp_comm_hook

Warning

DDP communication hook needs pytorch version at least 1.8.0

Warning

DDP communication wrapper needs pytorch version at least 1.9.0 Post-localSGD hook needs pytorch version at least 1.9.0

Examples

>>> from torch.distributed.algorithms.ddp_comm_hooks import ( 
...     default_hooks as default,
...     powerSGD_hook as powerSGD,
...     post_localSGD_hook as post_localSGD,
... )
>>>
>>> # fp16_compress_hook for compress gradients
>>> ddp_model = ...
>>> register_ddp_comm_hook( 
...     model=ddp_model,
...     ddp_comm_hook=default.fp16_compress_hook,
... )
>>>
>>> # powerSGD_hook
>>> ddp_model = ...
>>> register_ddp_comm_hook( 
...     model=ddp_model,
...     ddp_comm_state=powerSGD.PowerSGDState(
...         process_group=None,
...         matrix_approximation_rank=1,
...         start_powerSGD_iter=5000,
...     ),
...     ddp_comm_hook=powerSGD.powerSGD_hook,
... )
>>>
>>> # post_localSGD_hook
>>> subgroup, _ = torch.distributed.new_subgroups() 
>>> ddp_model = ...
>>> register_ddp_comm_hook( 
...     model=ddp_model,
...     state=post_localSGD.PostLocalSGDState(
...         process_group=None,
...         subgroup=subgroup,
...         start_localSGD_iter=1_000,
...     ),
...     ddp_comm_hook=post_localSGD.post_localSGD_hook,
... )
>>>
>>> # fp16_compress_wrapper combined with other communication hook
>>> ddp_model = ...
>>> register_ddp_comm_hook( 
...     model=ddp_model,
...     ddp_comm_state=powerSGD.PowerSGDState(
...         process_group=None,
...         matrix_approximation_rank=1,
...         start_powerSGD_iter=5000,
...     ),
...     ddp_comm_hook=powerSGD.powerSGD_hook,
...     ddp_comm_wrapper=default.fp16_compress_wrapper,
... )
Return type

None

pytorch_lightning.utilities.distributed.sync_ddp(result, group=None, reduce_op=None)[source]

Function to reduce the tensors from several ddp processes to one main process.

Parameters
  • result (Tensor) – the value to sync and reduce (typically tensor or number)

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value

pytorch_lightning.utilities.distributed.sync_ddp_if_available(result, group=None, reduce_op=None)[source]

Function to reduce a tensor across worker processes during distributed training.

Parameters
  • result (Tensor) – the value to sync and reduce (typically tensor or number)

  • group (Optional[Any]) – the process group to gather results from. Defaults to all processes (world)

  • reduce_op (Union[ReduceOp, str, None]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.

Return type

Tensor

Returns

reduced value