fluke.comm
¶
This module contains the classes for the communication between the clients and the server.
This class represents a message that can be exchanged between clients and the server. |
|
A bi-directional communication channel. |
|
Channel observer interface for the Observer pattern. |
class fluke.comm.Message
Get the payload of the message. |
|
Get the type of the message. |
|
Get the sender of the message. |
|
Get the size of the message. |
|
Clone the message. |
- class fluke.comm.Message(payload: Any, msg_type: str = 'model', sender: Any | None = None)[source]¶
This class represents a message that can be exchanged between clients and the server. This type is immutable. The message contains a payload, a type and a sender. The payload can be of any type. The type is a string that describes the content of the message. The sender is the object that sends the message.
Example
Let us consider a simple example where a client wants to send preparea message to be sent to the server with the content “Hello”. The client can create a message as follows:
1message = Message(payload="Hello", msg_type="greeting", sender=client) 2# Then through a channel (of type Channel) the message can be sent to the server 3# channel.send(message, server)
See also
- property payload: Any¶
Get the payload of the message.
- Returns:
The payload of the message.
- Return type:
Any
- property sender: Any | None¶
Get the sender of the message.
- Returns:
The sender of the message.
- Return type:
Optional[Any]
- clone() Message [source]¶
Clone the message. The cloned message containes a deepcopy of the payload while keeping the same message type and the same reference to the sender.
- Returns:
The cloned message.
- Return type:
- get_size() int [source]¶
Get the size of the message. The message size is the size of the payload calculated in terms of “floating point” numbers. For example, a message containing a tensor of size (10, 10) has a size of 100. A message containing a string of length 10 has a size of 10. A message containing an ACK (i.e., with no payload) has a size of 1. In case of unknown types, a warning is raised and the size is set to 0.
- Returns:
The size of the message in bytes.
- Return type:
Example
1message = Message("Hello", "greeting", client) 2print(message.get_size()) # 5 3 4message = Message(torch.randn(10, 10), "tensor", client) 5print(message.get_size()) # 100 6 7message = Message(None, "ack", client) 8print(message.get_size()) # 1
class fluke.comm.Channel
Get the buffer of the channel. |
|
Get the messages of the given receiver. |
|
Send a copy of the message to a receiver. |
|
Receive (i.e., read) a message from a sender. |
|
Send a copy of the message to a list of receivers. |
|
Clear the message box of the given receiver. |
- class fluke.comm.Channel[source]¶
Bases:
ObserverSubject
A bi-directional communication channel. It is used to send and receive messages between the parties. The Channel class implements the Observer pattern. It notifies the observers when a message is received. Clients and server are supposed to use a channel to communicate with each other.
Example
Let us consider a simple example where a
client
sends a message to theserver
.1channel = Channel() 2channel.send(Message("Hello", "greeting", server), client) 3mag: Message = channel.receive(server, client, "greeting") 4print(msg.payload) # Hello
- broadcast(message: Message, to: list[Any]) None [source]¶
Send a copy of the message to a list of receivers.
- property buffer: dict[Any, list[Message]]¶
Get the buffer of the channel. The buffer stores the unread messages in a dictionary. The keys are the recipients and the values are the list of messages sent to the recipient.
- clear(mbox: Any) None [source]¶
Clear the message box of the given receiver.
Caution
Any unread message will be lost after calling this method. Lost messages are not considered in the communication protocol thus they are not accounted for in the communication cost.
- Parameters:
mbox (Any) – The receiver.
- receive(mbox: Any, sender: Any | None = None, msg_type: str | None = None) Message [source]¶
Receive (i.e., read) a message from a sender. The message is removed from the message box of the receiver. If both
sender
andmsg_type
are None, the first message in the message box is returned. Ifsender
is None, the first message with the givenmsg_type
is returned. Ifmsg_type
is None, the first message from the givensender
is returned.- Parameters:
mbox (Any) – The receiver.
sender (Any) – The sender.
msg_type (str) – The type of the message.
- Returns:
The received message.
- Return type:
- Raises:
ValueError – message not found in the message box of the receiver with the given sender and message type.
Example
Receiving a message from the
server
with message typegreeting
:1channel = Channel() 2message = channel.receive(client, server, "greeting")
- send(message: Message, mbox: Any) None [source]¶
Send a copy of the message to a receiver. To any sent message should correspond a received message. The receiver should call the receive method of the channel to get the message.
- Parameters:
message (Message) – The message to be sent.
mbox (Any) – The receiver.
Example
Sending a string message from the
server
to aclient
:1channel = Channel() 2channel.send(Message("Hello", "greeting", server), client)
interface fluke.comm.ChannelObserver