Simple message bus: Python and Tk

This is the Python and Tk implementation of the message bus program. The principles of the message bus are described in the main message bus article. Do read that article first.

This example uses Tk for the GUI, but not for handling events on the sockets.

The sockets are handled by coroutines, added in Python 3.5. It does not blend perfectly with Tk. Some less-than-ideal code was needed:

  • Tk’s update() routine is called at regular intervals to keep the GUI running.
  • When the GUI is about to close, because the window close button (created by the window manager) has been clicked, a delayed )five milliseconds ) call to loop.close() is used, so that the event loop is drained.

Surely there are better solutions. One obvious would be to skip coroutines, and go down a more traditional route.

Tk is used by several implementations: Perl, Python and Tcl.

The source code

#!/usr/bin/python
import asyncio
import tkinter as tk
# import sys;

#---------------------------------------------------------------------------------------- Client
class Client:
    instance = None;

    def __init__(self, loop):
        self.reader = None;
        self.writer = None;
        loop.create_task(self.client());
        Client.instance = self;

    async def client(self):
        self.reader, self.writer = await asyncio.open_connection( 'localhost', 4711);
        print('Connected to server');
        while True:
            try:
                response = await self.reader.readline();
                if self.reader.at_eof():
                    break;
                msg = response.decode().strip();
                print("Client got [{:s}]".format(msg));
                Gui.instance.on_incoming_message(msg);
            except asyncio.CancelledError:
                print("client: Got CancelledError")
                break;
        self.writer.close();
        self.reader.close();
        print('client: Disconnected from server');

    def notify_server(self, colour):
        msg = "{:d}\n".format(colour);
        self.writer.write(msg.encode());

#---------------------------------------------------------------------------------------- Gui
class Gui(tk.Tk):
    RED      = 1001;
    GREEN    = 1002;
    BLUE     = 1003;
    instance = None;

    def btn(self, text, value):
        return tk.Button(self,
                         text     = text,
                         width    =  9,
                         height   =  1,
                         command  = (lambda: self.on_clicked(value)));

    def __init__(self, loop, interval=1/120):
        super().__init__();
        self.protocol("WM_DELETE_WINDOW", self.close);
        loop.create_task(self.update_gui(interval));

        self.red   = self.btn("Red",   Gui.RED);
        self.green = self.btn("Green", Gui.GREEN);
        self.blue  = self.btn("Blue",  Gui.BLUE);

        self.red  .pack(side = tk.LEFT);
        self.green.pack(side = tk.LEFT);
        self.blue .pack(side = tk.LEFT);

        self.wm_title("Python and Tk");
        self.current_colour = Gui.BLUE;
        Gui.instance = self;

    def on_clicked(self, value):
        print("New colour is {:d}".format(value));
        Client.instance.notify_server(value);

    def update_btn(self, button, cond, colour_spec):
        c = colour_spec if cond else 'white';
        button.config(background = c, activebackground = c);

    def on_incoming_message(self, msg):
        self.current_colour = int(msg);
        self.update_btn(self.red,   1001 == self.current_colour, 'red' );
        self.update_btn(self.green, 1002 == self.current_colour, '#7F7');
        self.update_btn(self.blue,  1003 == self.current_colour, '#77F');

    async def update_gui(self, interval):
        while True:
            try:
                self.update();
                await asyncio.sleep(interval);
            except asyncio.CancelledError:
                print("update_gui: Got CancelledError")
                break;
        print("update_gui: exiting")

    def close(self):
        print("Close: Cancelling all tasks");
        for task in asyncio.Task.all_tasks():
            task.cancel();
        print("Draining and stopping the event loop");
        loop = asyncio.get_event_loop();
        loop.call_later(0.001, loop.stop);

#---------------------------------------------------------------------------------------- Worker
class Worker(asyncio.Protocol):
    servers = set();            #--- static members

    def connection_made(self, transport):
        self.transport = transport
        self.peer = transport.get_extra_info('peername')
        Worker.servers.add(self);
        print('Connected from client', self.peer);
        msg = "{:d}\n".format(Gui.instance.current_colour);
        self.transport.write(msg.encode());

    def connection_lost(self, exc):
        print('Disconnected from client', self.peer);

    def data_received(self, data):
        Worker.current = data.decode(); #--- Remember
        for s in Worker.servers:
            s.transport.write(data);

#---------------------------------------------------------------------------------------- main
def main():
    loop = asyncio.get_event_loop()

    try:
        bus    = loop.create_server(Worker, 'localhost', 4711);
        server = loop.run_until_complete(bus);
    except:
        None;

    app    = Gui   (loop);
    client = Client(loop);

    print("run_forever")
    try:
        loop.run_forever();
    except KeyboardInterrupt:
        print("Keyboard interrupt");

    print("loop.close()")
    loop.close()

main();

# Local Variables:
# coding:         us-ascii-unix
# End:

You can reach me by email at “lars dash 7 dot sdu dot se” or by telephone +46 705 189090

View source for the content of this page.