distributed¶
Functions
Function to gather a tensor from several distributed processes. |
|
|
|
Function to gather all tensors from several ddp processes onto a list that is broadcasted to all processes. |
|
|
|
Utility function to initialize distributed connection by setting env variables and initializing the distributed process group. |
|
|
|
|
|
Function to register communication hook for DDP model https://pytorch.org/docs/master/ddp_comm_hooks.html. |
|
Function to reduce the tensors from several ddp processes to one main process. |
|
Function to reduce a tensor across worker processes during distributed training. |
|
|
|
Classes
|
|
|
Utilities that can be used with distributed training.
- class pytorch_lightning.utilities.distributed.AllGatherGrad(*args, **kwargs)[source]¶
Bases:
torch.autograd.Function
- pytorch_lightning.utilities.distributed.all_gather_ddp_if_available(tensor, group=None, sync_grads=False)[source]¶
Function to gather a tensor from several distributed processes.
- Parameters
- Return type
- Returns
A tensor of shape (world_size, batch, …)
- pytorch_lightning.utilities.distributed.gather_all_tensors(result, group=None)[source]¶
Function to gather all tensors from several ddp processes onto a list that is broadcasted to all processes.
- pytorch_lightning.utilities.distributed.init_dist_connection(cluster_environment, torch_distributed_backend, global_rank=None, world_size=None, **kwargs)[source]¶
Utility function to initialize distributed connection by setting env variables and initializing the distributed process group.
- Parameters
- Raises
RuntimeError – If
torch.distributed
is not available- Return type
- pytorch_lightning.utilities.distributed.register_ddp_comm_hook(model, ddp_comm_state=None, ddp_comm_hook=None, ddp_comm_wrapper=None)[source]¶
Function to register communication hook for DDP model https://pytorch.org/docs/master/ddp_comm_hooks.html.
- Parameters
model¶ (
DistributedDataParallel
) – DDP modelddp_comm_state¶ (
Optional
[object
]) – state is passed to the hook and can be used to maintain and update any state information that users would like to maintain as part of the training process. Examples: error feedback in gradient compression, peers to communicate with next in GossipGrad etc.ddp_comm_hook¶ (
Optional
[Callable
]) –hook(state: object, bucket: dist._GradBucket) -> torch.futures.Future
This callable function is called once the bucket is ready. The hook can perform whatever processing is needed and return a Future indicating completion of any async work (ex: allreduce). If the hook doesn’t perform any communication, it can also just return a completed Future. The Future should hold the new value of grad bucket’s tensors. Once a bucket is ready, c10d reducer would call this hook and use the tensors returned by the Future and copy grads to individual parameters.
ddp_comm_wrapper¶ (
Optional
[Callable
]) – communication hook wrapper to support a communication hook such as FP16 compression as wrapper, which could be combined with ddp_comm_hook
Warning
DDP communication hook needs pytorch version at least 1.8.0
Warning
DDP communication wrapper needs pytorch version at least 1.9.0 Post-localSGD hook needs pytorch version at least 1.9.0
Examples
>>> from torch.distributed.algorithms.ddp_comm_hooks import ( ... default_hooks as default, ... powerSGD_hook as powerSGD, ... post_localSGD_hook as post_localSGD, ... ) >>> >>> # fp16_compress_hook for compress gradients >>> ddp_model = ... >>> register_ddp_comm_hook( ... model=ddp_model, ... ddp_comm_hook=default.fp16_compress_hook, ... ) >>> >>> # powerSGD_hook >>> ddp_model = ... >>> register_ddp_comm_hook( ... model=ddp_model, ... ddp_comm_state=powerSGD.PowerSGDState( ... process_group=None, ... matrix_approximation_rank=1, ... start_powerSGD_iter=5000, ... ), ... ddp_comm_hook=powerSGD.powerSGD_hook, ... ) >>> >>> # post_localSGD_hook >>> subgroup, _ = torch.distributed.new_subgroups() >>> ddp_model = ... >>> register_ddp_comm_hook( ... model=ddp_model, ... state=post_localSGD.PostLocalSGDState( ... process_group=None, ... subgroup=subgroup, ... start_localSGD_iter=1_000, ... ), ... ddp_comm_hook=post_localSGD.post_localSGD_hook, ... ) >>> >>> # fp16_compress_wrapper combined with other communication hook >>> ddp_model = ... >>> register_ddp_comm_hook( ... model=ddp_model, ... ddp_comm_state=powerSGD.PowerSGDState( ... process_group=None, ... matrix_approximation_rank=1, ... start_powerSGD_iter=5000, ... ), ... ddp_comm_hook=powerSGD.powerSGD_hook, ... ddp_comm_wrapper=default.fp16_compress_wrapper, ... )
- Return type
- pytorch_lightning.utilities.distributed.sync_ddp(result, group=None, reduce_op=None)[source]¶
Function to reduce the tensors from several ddp processes to one main process.
- Parameters
result¶ (
Tensor
) – the value to sync and reduce (typically tensor or number)group¶ (
Optional
[Any
]) – the process group to gather results from. Defaults to all processes (world)reduce_op¶ (
Union
[ReduceOp
,str
,None
]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.
- Return type
- Returns
reduced value
- pytorch_lightning.utilities.distributed.sync_ddp_if_available(result, group=None, reduce_op=None)[source]¶
Function to reduce a tensor across worker processes during distributed training.
- Parameters
result¶ (
Tensor
) – the value to sync and reduce (typically tensor or number)group¶ (
Optional
[Any
]) – the process group to gather results from. Defaults to all processes (world)reduce_op¶ (
Union
[ReduceOp
,str
,None
]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.
- Return type
- Returns
reduced value