fluke.comm

This module contains the classes for the communication between the clients and the server.

Message

This class represents a message that can be exchanged between clients and the server.

Channel

A bi-directional communication channel.

ChannelObserver

Channel observer interface for the Observer pattern.

class fluke.comm.Message

payload

Get the payload of the message.

msg_type

Get the type of the message.

sender

Get the sender of the message.

get_size

Get the size of the message.

clone

Clone the message.

class fluke.comm.Message(payload: Any, msg_type: str = 'model', sender: Any | None = None)[source]

This class represents a message that can be exchanged between clients and the server. This type is immutable. The message contains a payload, a type and a sender. The payload can be of any type. The type is a string that describes the content of the message. The sender is the object that sends the message.

Example

Let us consider a simple example where a client wants to send preparea message to be sent to the server with the content “Hello”. The client can create a message as follows:

1message = Message(payload="Hello", msg_type="greeting", sender=client)
2# Then through a channel (of type Channel) the message can be sent to the server
3# channel.send(message, server)

See also

Channel

property msg_type: str

Get the type of the message.

Returns:

The type of the message.

Return type:

str

property payload: Any

Get the payload of the message.

Returns:

The payload of the message.

Return type:

Any

property sender: Any | None

Get the sender of the message.

Returns:

The sender of the message.

Return type:

Optional[Any]

clone() Message[source]

Clone the message. The cloned message containes a deepcopy of the payload while keeping the same message type and the same reference to the sender.

Returns:

The cloned message.

Return type:

Message

get_size() int[source]

Get the size of the message. The message size is the size of the payload calculated in terms of “floating point” numbers. For example, a message containing a tensor of size (10, 10) has a size of 100. A message containing a string of length 10 has a size of 10. A message containing an ACK (i.e., with no payload) has a size of 1. In case of unknown types, a warning is raised and the size is set to 0.

Returns:

The size of the message in bytes.

Return type:

int

Example

1message = Message("Hello", "greeting", client)
2print(message.get_size())  # 5
3
4message = Message(torch.randn(10, 10), "tensor", client)
5print(message.get_size())  # 100
6
7message = Message(None, "ack", client)
8print(message.get_size())  # 1

class fluke.comm.Channel

buffer

Get the buffer of the channel.

__getitem__

Get the messages of the given receiver.

send

Send a copy of the message to a receiver.

receive

Receive (i.e., read) a message from a sender.

broadcast

Send a copy of the message to a list of receivers.

clear

Clear the message box of the given receiver.

class fluke.comm.Channel[source]

Bases: ObserverSubject

A bi-directional communication channel. It is used to send and receive messages between the parties. The Channel class implements the Observer pattern. It notifies the observers when a message is received. Clients and server are supposed to use a channel to communicate with each other.

Example

Let us consider a simple example where a client sends a message to the server.

1channel = Channel()
2channel.send(Message("Hello", "greeting", server), client)
3mag: Message = channel.receive(server, client, "greeting")
4print(msg.payload) # Hello
__getitem__(mbox: Any) list[Message][source]

Get the messages of the given receiver.

Parameters:

mbox (Any) – The receiver.

Returns:

The list of messages sent to the receiver.

Return type:

list[Message]

broadcast(message: Message, to: list[Any]) None[source]

Send a copy of the message to a list of receivers.

Parameters:
  • message (Message) – The message to be sent.

  • to (list[Any]) – The list of receivers.

property buffer: dict[Any, list[Message]]

Get the buffer of the channel. The buffer stores the unread messages in a dictionary. The keys are the recipients and the values are the list of messages sent to the recipient.

Returns:

The buffer of the channel.

Return type:

dict[Any, list[Message]]

clear(mbox: Any) None[source]

Clear the message box of the given receiver.

Caution

Any unread message will be lost after calling this method. Lost messages are not considered in the communication protocol thus they are not accounted for in the communication cost.

Parameters:

mbox (Any) – The receiver.

receive(mbox: Any, sender: Any | None = None, msg_type: str | None = None) Message[source]

Receive (i.e., read) a message from a sender. The message is removed from the message box of the receiver. If both sender and msg_type are None, the first message in the message box is returned. If sender is None, the first message with the given msg_type is returned. If msg_type is None, the first message from the given sender is returned.

Parameters:
  • mbox (Any) – The receiver.

  • sender (Any) – The sender.

  • msg_type (str) – The type of the message.

Returns:

The received message.

Return type:

Message

Raises:

ValueError – message not found in the message box of the receiver with the given sender and message type.

Example

Receiving a message from the server with message type greeting:

1channel = Channel()
2message = channel.receive(client, server, "greeting")
send(message: Message, mbox: Any) None[source]

Send a copy of the message to a receiver. To any sent message should correspond a received message. The receiver should call the receive method of the channel to get the message.

Parameters:
  • message (Message) – The message to be sent.

  • mbox (Any) – The receiver.

Example

Sending a string message from the server to a client:

1channel = Channel()
2channel.send(Message("Hello", "greeting", server), client)

interface fluke.comm.ChannelObserver

class fluke.comm.ChannelObserver[source]

Channel observer interface for the Observer pattern. This interface is used to observe the communication channel during the federated learning process.

message_received(message: Message) None[source]

This method is called when a message is received, i.e., when a message is read from the message box of the receiver.

Parameters:

message (Message) – The message received.