fluke.utils

This module contains utility functions and classes used in fluke.

Submodules

log

This submodule provides logging utilities.

model

This submodule provides utilities for pytorch model manipulation.

Classes included in fluke.utils

ClientObserver

Client observer interface.

Configuration

Fluke configuration class.

OptimizerConfigurator

This class is used to configure the optimizer and the learning rate scheduler.

ServerObserver

Server observer interface.

Functions included in fluke.utils

bytes2human

Convert bytes to human-readable format.

cache_obj

Move the object from the RAM to the disk cache to free up memory.

clear_cuda_cache

Clear the CUDA cache.

get_class_from_str

Get a class from its name.

get_class_from_qualified_name

Get a class from its fully qualified name.

get_full_classname

Get the fully qualified name of a class.

get_loss

Get a loss function from its name.

get_model

Get a model from its name.

get_optimizer

Get an optimizer from its name.

get_scheduler

Get a learning rate scheduler from its name.

flatten_dict

Flatten a nested dictionary.

import_module_from_str

Import a module from its name.

memory_usage

Get the memory usage of the current process.

plot_distribution

Plot the distribution of classes for each client.

retrieve_obj

Load an object from the cache (disk).

Classes

interface fluke.utils.ClientObserver

class fluke.utils.ClientObserver[source]

Bases: object

Client observer interface. This interface is used to observe the client during the federated learning process. For example, it can be used to log the performance of the local model, as it is done by the fluke.utils.log.Log class.

client_evaluation(round: int, client_id: int, phase: Literal['pre-fit', 'post-fit'], evals: dict[str, float], **kwargs: dict[str, Any])[source]

This method is called when the client evaluates the local model. The evaluation can be done before (‘pre-fit’) and/or after (‘post-fit’) the local training process. The ‘pre-fit’ evlauation is usually the evaluation of the global model on the local test set, and the ‘post-fit’ evaluation is the evaluation of the just updated local model on the local test set.

Parameters:
  • round (int) – The round number.

  • client_id (int) – The client ID.

  • phase (Literal['pre-fit', 'post-fit']) – Whether the evaluation is done before or after the local training process.

  • evals (dict[str, float]) – The evaluation results.

  • **kwargs (dict) – Additional keyword arguments.

end_fit(round: int, client_id: int, model: Module, loss: float, **kwargs: dict[str, Any])[source]

This method is called when the client ends the local training process.

Parameters:
  • round (int) – The round number.

  • client_id (int) – The client ID.

  • model (Module) – The local model after the local training.

  • loss (float) – The average loss incurred by the local model during training.

  • **kwargs (dict) – Additional keyword arguments.

start_fit(round: int, client_id: int, model: Module, **kwargs: dict[str, Any])[source]

This method is called when the client starts the local training process.

Parameters:
  • round (int) – The round number.

  • client_id (int) – The client ID.

  • model (Module) – The local model before training.

  • **kwargs (dict) – Additional keyword arguments.

class fluke.utils.Configuration

class fluke.utils.Configuration(config_exp_path: str = None, config_alg_path: str = None, force_validation: bool = True)[source]

Fluke configuration class. This class is used to store the configuration of an experiment. The configuration must adhere to a specific structure. The configuration is validated when the class is instantiated.

Parameters:
  • config_exp_path (str) – The path to the experiment configuration file.

  • config_alg_path (str) – The path to the algorithm configuration file.

  • force_validation (bool, optional) – Whether to force the validation of the configuration. Defaults to True.

Raises:

ValueError – If the configuration is not valid.

property client: DDict

Get quick access to the client’s hyperparameters.

Returns:

The client’s hyperparameters.

Return type:

DDict

classmethod from_ddict(ddict: DDict) Configuration[source]

Create a configuration from a fluke.DDict.

Parameters:

ddict (DDict) – The dictionary.

Returns:

The configuration.

Return type:

Configuration

property model: DDict

Get quick access to the model hyperparameters.

Returns:

The model hyperparameters.

Return type:

DDict

property server: DDict

Get quick access to the server’s hyperparameters.

Returns:

The server’s hyperparameters.

Return type:

DDict

classmethod sweep(config_exp_path: str, config_alg_path: str) Generator[Configuration, None, None][source]

Generate configurations from a sweep. This method is used to generate configurations from a sweep. The sweep is defined by the experiment configuration file. The method yields a configuration for each combination of hyperparameters.

Parameters:
  • config_exp_path (str) – The path to the experiment configuration file.

  • config_alg_path (str) – The path to the algorithm configuration file.

Yields:

Configuration – A configuration.

to_dict() dict[source]

Convert the configuration to a dictionary.

Returns:

The dictionary.

Return type:

dict

class fluke.utils.OptimizerConfigurator

class fluke.utils.OptimizerConfigurator(optimizer_cfg: DDict | dict, scheduler_cfg: DDict | dict = None)[source]

This class is used to configure the optimizer and the learning rate scheduler.

optimizer

The optimizer class.

Type:

type[Optimizer]

scheduler

The learning rate scheduler class.

Type:

type[LRScheduler]

optimizer_cfg

The optimizer keyword arguments.

Type:

DDict

scheduler_cfg

The scheduler keyword arguments.

Type:

DDict

__call__(model: Module, filter_fun: callable | None = None, **override_kwargs)[source]

Creates the optimizer and the scheduler.

Parameters:
  • model (Module) – The model whose parameters will be optimized.

  • filter_fun (callable) – This must be a function of the model and it must returns the set of parameters that the optimizer will consider.

  • override_kwargs (dict) – The optimizer’s keyword arguments to override the default ones.

Returns:

The optimizer and the scheduler.

Return type:

tuple[Optimizer, StepLR]

interface fluke.utils.ServerObserver

class fluke.utils.ServerObserver[source]

Bases: object

Server observer interface. This interface is used to observe the server during the federated learning process. For example, it can be used to log the performance of the global model and the communication costs, as it is done by the Log class.

end_round(round: int) None[source]

This method is called when a round ends.

Parameters:

round (int) – The round number.

finished(round: int) None[source]

This method is called when the federated learning process has ended.

Parameters:

round (int) – The last round number.

interrupted() None[source]

This method is called when the federated learning process has been interrupted.

selected_clients(round: int, clients: Iterable) None[source]

This method is called when the clients have been selected for the current round.

Parameters:
  • round (int) – The round number.

  • clients (Iterable) – The clients selected for the current round.

server_evaluation(round: int, type: Literal['global', 'locals'], evals: dict[str, float] | dict[int, dict[str, float]], **kwargs: dict[str, Any]) None[source]

This method is called when the server evaluates the global or the local models on its test set.

Parameters:
  • round (int) – The round number.

  • type (Literal['global', 'locals']) – The type of evaluation. If ‘global’, the evaluation is done on the global model. If ‘locals’, the evaluation is done on the local models of the clients on the test set of the server.

  • evals (dict[str, float] | dict[int, dict[str, float]]) – The evaluation metrics. In case of ‘global’ evaluation, it is a dictionary with the evaluation metrics. In case of ‘locals’ evaluation, it is a dictionary of dictionaries where the keys are the client IDs and the values are the evaluation metrics.

start_round(round: int, global_model: Any) None[source]

This method is called when a new round starts.

Parameters:
  • round (int) – The round number.

  • global_model (Any) – The current global model.

Functions

fluke.utils.bytes2human(n: int) str[source]

Convert bytes to human-readable format.

See:

https://psutil.readthedocs.io/en/latest/#psutil.Process.memory_info

Parameters:

n (int) – The number of bytes.

Returns:

The number of bytes in human-readable format.

Return type:

str

Example

1>>> bytes2human(10000)
2'9.8K'
3>>> bytes2human(100001221)
4'95.4M'
5>>> bytes2human(1024 ** 3)
6'1.0G'
fluke.utils.cache_obj(obj: Any, key: str, party: Server | Client | None = None) FlukeCache._ObjectRef | None[source]

Move the object from the RAM to the disk cache to free up memory. If the object is None, it returns None without caching it.

Warning

This method assumes the cache is already initialized and opened. If it is not, it will raise an exception.

Parameters:
  • obj (Any) – The object to cache.

  • key (str, optional) – The key associated with the object.

  • party (Client | Server) – the party for which to unload the model. Defaults to None.

Returns:

The object reference identifier. If the object is None, it returns None.

Return type:

FlukeCache._ObjectRef

fluke.utils.clear_cuda_cache(ipc: bool = False)[source]

Clear the CUDA cache. This function should be used to free the GPU memory after the training process has ended. It is usually used after the local training of the clients.

Parameters:

ipc (bool, optional) – Whether to force collecting GPU memory after it has been released by CUDA IPC.

fluke.utils.get_class_from_str(module_name: str, class_name: str) Any[source]

Get a class from its name. This function is used to get a class from its name and the name of the module where it is defined. It is used to dynamically import classes.

Parameters:
  • module_name (str) – The name of the module where the class is defined.

  • class_name (str) – The name of the class.

Returns:

The class.

Return type:

Any

fluke.utils.get_class_from_qualified_name(qualname: str) Any[source]

Get a class from its fully qualified name.

Parameters:

qualname (str) – The fully qualified name of the class.

Returns:

The class.

Return type:

Any

fluke.utils.get_full_classname(classtype: type) str[source]

Get the fully qualified name of a class.

Parameters:

classtype (type) – The class.

Returns:

The fully qualified name of the class.

Return type:

str

Example

Let A be a class defined in the module fluke.utils

1# This is the content of the file fluke/utils.py
2class A:
3    pass
4
5get_full_classname(A) # 'fluke.utils.A'

If the class is defined in the __main__ module, then:

1if __name__ == "__main__":
2    class B:
3        pass
4
5    get_full_classname(B) # '__main__.B'
fluke.utils.get_loss(lname: str) Module[source]

Get a loss function from its name. The supported loss functions are the ones defined in the torch.nn module.

Parameters:

lname (str) – The name of the loss function.

Returns:

The loss function.

Return type:

Module

fluke.utils.get_model(mname: str, **kwargs: dict[str, Any]) Module[source]

Get a model from its name. This function is used to get a torch model from its name and the name of the module where it is defined. It is used to dynamically import models. If mname is not a fully qualified name, the model is assumed to be defined in the fluke.nets module.

Parameters:
  • mname (str) – The name of the model.

  • **kwargs – The keyword arguments to pass to the model’s constructor.

Returns:

The model.

Return type:

Module

fluke.utils.get_optimizer(oname: str) type[Optimizer][source]

Get an optimizer from its name. This function is used to get an optimizer from its name. It is used to dynamically import optimizers. The supported optimizers are the ones defined in the torch.optim module.

Parameters:

oname (str) – The name of the optimizer.

Returns:

The optimizer class.

Return type:

type[Optimizer]

fluke.utils.get_scheduler(sname: str) type[LRScheduler][source]

Get a learning rate scheduler from its name. This function is used to get a learning rate scheduler from its name. It is used to dynamically import learning rate schedulers. The supported schedulers are the ones defined in the torch.optim.lr_scheduler module.

Parameters:

sname (str) – The name of the scheduler.

Returns:

The learning rate scheduler.

Return type:

torch.nn.Module

fluke.utils.flatten_dict(nested_dict: dict, sep: str = '.') dict[source]

Flatten a nested dictionary. The flatten dictionary is a dictionary where the keys are the concatenation of the keys of the nested dictionary separated by a separator.

Parameters:
  • d (dict) – Nested dictionary.

  • sep (str, optional) – Separator. Defaults to ‘.’.

Returns:

Flattened dictionary.

Return type:

dict

Example

1d = {'a': 1, 'b': {'c': 2, 'd': {'e': 3}}}
2flatten_dict(d)
3# Output: {'a': 1, 'b.c': 2, 'b.d.e': 3}
fluke.utils.import_module_from_str(name: str) Any[source]

Import a module from its name.

Parameters:

name (str) – The name of the module.

Returns:

The module.

Return type:

Any

fluke.utils.memory_usage() tuple[int, int, int][source]

Get the memory usage of the current process.

Returns:

The resident set size (RSS), the virtual memory size (VMS), and the

current CUDA reserved memory. If the device is not a CUDA device, the reserved memory is 0.

Return type:

tuple[int, int, int]

fluke.utils.plot_distribution(clients: list[Client], train: bool = True, type: str = 'ball') None[source]

Plot the distribution of classes for each client. This function is used to plot the distribution of classes for each client. The plot can be a scatter plot, a heatmap, or a bar plot. The scatter plot (type='ball') shows filled circles whose size is proportional to the number of examples of a class. The heatmap (type='mat') shows a matrix where the rows represent the classes and the columns represent the clients with a color intensity proportional to the number of examples of a class. The bar plot (type='bar') shows a stacked bar plot where the height of the bars is proportional to the number of examples of a class.

Warning

If the number of clients is greater than 30, the type is automatically switched to 'bar' for better visualization.

Parameters:
  • clients (list[Client]) – The list of clients.

  • train (bool, optional) – Whether to plot the distribution on the training set. If False, the distribution is plotted on the test set. Defaults to True.

  • type (str, optional) – The type of plot. It can be 'ball', 'mat', or 'bar'. Defaults to 'ball'.

fluke.utils.retrieve_obj(key: str, party: Client | Server | None = None, pop: bool = True) Any[source]

Load an object from the cache (disk). If the object is not found in the cache, it returns None.

Warning

This method assumes the cache is already initialized and opened. If it is not, it will raise an exception.

Parameters:
  • key (str, optional) – specific key to add to the default filename.

  • party (Client | Server, optional) – the party for which to load the object. Defaults to None.

  • pop (bool, optional) – Whether to remove the object from the cache after loading it. Defaults to True.

Returns:

The object retrieved from the cache.

Return type:

Any