TMB: The implementation

The principles of the Text Message Bus bus are described in the main TMB article. Do read that article first.

A brief summary of components.

class MessageBusSocket A single object handling socket-level networking and the main event loop.
namespace MessageBus Handling some of the logic of the message bus.
class AgentSocket Socket-level networking for a connection to a client.
class Agent The logic for processing incoming message from a client.
class Matcher Specifies a subscription by matching to category.
class MatcherUsingRegexp Specialization for regular expressions.
class MatcherUsingText Specialization for plain text.
class SubscriptionList A Matcher (to specify a subscription) and a list of subscribers.
class EventLoopSocket Base class of MessageBusSocket and AgentSocket.
common.{h,cc} Various common declarations and definitions.
main.cc A short main program.

class MessageBusSocket — A single object that listens for incoming TCP connection request and creates instances of AgentSocket. This class knows nothing about the logic of the message bus. It only handles networking. Keeps a list (std::vector<EventLoopSocket *>) of all open sockets, including itself. The largest part is the method event_loop(), called once by the main program. When event_loop() returns, the main program terminates.

class AgentSocket — Reads from a socket connected to a client. Incoming data is split into lines that are passed to an Agent. An AgentSocket and an Agent always come i a pair. Again, this class knows nothing about the logic of the message bus. It only handles networking and splitting into lines.

class Agent — Derived from AgentSocket. There is one Agent for each client. All incoming messages are passed to an Agent. If the message is a broadcast message, then it is passed on to MessageBus, described below. Otherwise, the message is a command message and handled by the Agent itself.

namespace MessageBus — This namespace has a single member function, broadcast(), which calls SubscriptionList::broadcast() to do the actual matching and broadcasting.

class MatcherMatcher is the mechanism for matching subscriptions to broadcast messages. Incoming broadcast messages are matched to subscription lists by matching the broadcast message’s category against the list’s matcher. Matcher is both an abstract base class for specific matchers (MatcherUsingText and MatcherUsingRegexp) and a factory for producing matchers from subscribe commands.

class SubscriptionList — Holds a Matcher and a list of subscribers (class Agent). Also holds a set containing all SubscriptionList:s. Static member function broadcast(message) handles broadcasting to all subscribers of all SubscriptionList:s. When a SubscriptionList no longer has any subscribers, it is deleted.

class MatcherUsingRegexp — A concrete implementation of Matcher for regular expression. Uses PCRE, the Perl-Compatible Regular Expressions library.

class MatcherUsingText — A concrete implementation of Matcher for matching plain text, using strstr et al.

class EventLoopSocket — Base class of MessageBusSocket and AgentSocket, for uniform handling in the event loop.

common.h — Various type definitions and function declarations. Forward declarations of all classes. Included first in all implementation (.cc) files.

Execution flow

Starting

Before main() is called:
    register MatcherUsingRegexp::factory() as a Matcher factory (explained later)
    register MatcherUsingText::factory() as a Matcher factory
In main():
    register handler for SIGINT. The handler sets a flag that will terminate the event loop.
    use signal(..., SIG_IGN) to ignore all ignorable signals
    create an instance of MessageBusSocket
       create the listening socket on port 4711 on all interfaces
       add the instance (of MessageBusSocket) to MessageBusSocket::sockets[]
    pass control to the event loop, MessageBusSocket::event_loop()

The event loop

sockets[] contain pointers to all open sockets. First element, sockets[0], is a pointer to the single instance of MessageBusSocket. Remaining elements, if any, are pointers to AgentSocket:s.

while not exit-loop
   reclaim unused entries in sockets[]
   from sockets[] build array of struct pollfd pointers
   call poll(2)
   for each socket that has data available
      call EventLoopSocket::on_pollin()
      if on_pollin() returns true
         delete corresponding EventLoopSocket
         mark entry in sockets[] as unused

Execution flow for incoming connections

the event loop calls MessageBusSocket::on_pollin()
   accept connection using accept(2)
   create a new Agent, and add it to sockets[]

Execution flow for incoming messages

the event loop calls AgentSocket::on_pollin()
   read available characters from the socket
   assemble characters are into lines
   for each line call Agent::handle_message(line)
      if the message is a broadcast message
         call MessageBus::broadcast(message)
      else
         call this->handle_command(message«)

Execution flow for broadcast messages

A broadcast message gets its own sequence number, and integer that is incremented by one for each broadcast message. An Agent remembers the sequence number for its recentl broadcast message. This is used to avoid duplicates that would otherwise occur if multiple subscriptions match a broadcast message.

A subscriber is an Agent that is a member of a SubscriptionList. There is no separate subscriber class.

SubscriptionList manages the set of all SubscriptionList:s. Almost all the broadcast work is done in this class.

Agent::handle_message() has just called MessageBus::broadcast(message):

assign a sequence number to the message
    call SubscriptionList::broadcast(message)
      for each subscription list
         if the message category matches the list's Matcher
            for each of the list's subscribers
               call the subscriber's Agent::write_broadcast_message
                  compare the message's sequence number and the Agent's message number
                  if they are different
                     send the message by calling AgentSocket::write_to_client(message)
                     update the Agent's sequence number

Execution flow for command messages

Agent::handle_message() has just called Agent::handle_command(message):

split the line into words.
    study the first word
    call the appropriate handler, e.g. handle_subscribe(message)
       process the command
       reply by calling AgentSocket::write_to_client(message).
          call ::write(2)

Summary of source code

The code is mostly limited to header files. Only the most important parts of the files are included.

Main program

static void exit_on_sigint(int sig) { ...     g.exiting = true; ... }

int main(int argc, char *argv[]) {
    for (int n = 0; n < 32; n++) ::signal(n, SIG_IGN); // Ignore most signal
    signal(SIGINT, exit_on_sigint);

    Matcher::add_factory(MatcherUsingText  ::factory);
    Matcher::add_factory(MatcherUsingRegexp::factory);

    MessageBusSocket mb(4711);
    mb.run();
}

Header file common.h

extern struct Globals {          -- Container for all global variables
    bool exiting;                -- Set to true in SIGINT handler
} g;

typedef char        const cchar;
typedef std::string const cstring;
typedef ...               uint64;

inline cchar *c(cstring &s)      -- Get C string from C++ string

# define ENTER()                 -- Tracing and debugging macros
# define DEBUGG(  format, ...)
# define FATAL(   format, ...)
...

class Agent;                     -- Forward declarations of all classes.
...

bool  is_int      (cstring &s);  -- Miscellaneous utility routines
int   get_int     (cstring &s);
char *trim_line   (char    *line, unsigned len);
bool  is_prefix_of(cstring &shortname, char *longname, unsigned minimum = 1);
...

class EventLoopSocket

This base class is common to MessageBusSocket and AgentSocket. It provides a uniform interface for the event loop to work with. The class does not do anything.

class EventLoopSocket {
  public:
    virtual void on_pollin() = 0;
    virtual bool on_hangup() = 0;    -- Caller deletes on true

  protected:
    EventLoopSocket(int sock) : _sock(sock) { ... }
    ~EventLoopSocket(int sock) { ... ::close(_sock); ...}
    int _sock = -1;
};

class MessageBusSocket : public EventLoopSocket

A singleton that contains most of the networking code on the socket level, including the event loop. Listens for incoming connections, and creates new Agent:s. Keeps a std::vector of all open sockets, represented by classes derived from EventLoopSocket. The first element is the MessageBusSocket instance itself.

class MessageBusSocket : public EventLoopSocket {
  public:
    MessageBusSocket(int port) : EventLoopSocket(port) {...}

  public:
    void event_loop() { ... uses poll(2) ... }

  private:
    std::vector sockets;

    virtual void on_pollin() override; -- Connection request
    virtual bool on_hangup() override; -- Can/should not happen
};

class AgentSocket : public EventLoopSocket

Base class for class Agent. on_pollin read data from the sockets, builds lines and passes the to Agent::handle_message.

on_hangup does nothing except returns true, asking the caller (i.e. the main loop) to delete the object.

class AgentSocket : public EventLoopSocket {
  public:
    AgentSocket(int sock) : EventLoopSocket(sock) { ... }
    virtual void handle_message (string &msg) = 0; -- see class Agent
    void         write_to_client(string &msg);

  private:
    std::string line;

    virtual void on_pollin() override;
    virtual bool on_hangup() override;
};

class Agent : public AgentSocket

One instance for each active client.

handle_message() processes messages coming from AgentSocket::on_pollin().Broadcast messages are sent to «MessageBus::broadcast(), while command messages are handled internally.write_broadcast_message() writes a message with id message_id to the client, but only if message_id is different from latest_message_id. This is how duplicate suppression is implemented.

class Agent : public AgentSocket {
  public:
    typedef std::vector StringVector;

  public:
    Agent(int sock) : AgentSocket(sock) { ... }

    virtual void handle_message(string &msg);
    void write_broadcast_message(unsigned message_id, string &msg);

  private:
    void subscribe_to    (SubscriptionList *list, int id);
    void unsubscribe_from(SubscriptionList *list);

    std::set> subscriptions;

    uint64 latest_message_id = 0;
};

namespace MessageBus

namespace MessageBus {
    void broadcast(string &msg); -- calls static SubscriptionList::broadacast(msg)
}

class Matcher

class Matcher {
  public:
    typedef Matcher *(Factory)(string &type, string &spec);

    static  void      add_factory  (Factory factory);
    static  Matcher * create(string &type, string &spec);

    virtual bool matches(char *candidate) const = 0;

  protected:
    Matcher(string &type, string &spec);
    virtual ~Matcher();
    string type;
    string spec;
};

Matcher holds pointers to Matcher factories.

static struct {
    Matcher::Factory *factories[10];  -- More than enough
    unsigned n_factories;
} s;

class MatcherUsingRegexp

class MatcherUsingRegexp : public Matcher {
  public:
    static Matcher *factory(string &type, string &spec);

  private:
    MatcherUsingRegexp(string &type,    -- Called by factory()
                       string &spec,
                          ...       );  -- Obscure PCRE-specific arguments

    virtual ~MatcherUsingRegexp();           -- Called by Matcher's destructor

    virtual bool matches(char *candidate) const override;

    -- PCRE-specific member variables
};

This class has no header file. The class declaration lives inside the source file. New objects are created by the factory() function. The factory function is registered with Matcher.

But how does the factory method get registered if it is not visible outside the source file? Here is how:

static struct Register {
    Register() {
        Matcher::add_factory(MatcherUsingRegexp::factory);
    }
} register_factory;

The only purpose of the variable register_factory is to run its constructor. This happens during program initialization, before main().

Now, there is the problem of order of initialization. This is left undefined in the language but a few rules seem to apply to most combinations of compilers and linkers.

  • Zero-initialized static variables (those that are neither on the stack nor on the heap) are initialized before everything else. But be aware that if they require initialization to a nonzero value (static int foo=13;) then that initialization may occur later.
  • Variables are initialized in the order in which their files are linked.

To make sure that adding factories works, the list of factories is implemented using simple C variables, as shown for Matcher.

To make doubly sure, Matcher is linked before any class that registers a factory.

If it should still fail on some system, then it may be necessary to add the factories from main().

Depending on platform, this technique may also work for dynamically loaded code, providing convenient and automatic initialization.

class MatcherUsingText

class MatcherUsingText : public Matcher {
  public:
    static Matcher *factory(string &type, string &spec);

  private:
    enum TextMatchType {
        TEXT_BEGINS_WITH = 3001,
        TEXT_ENDS_WITH,
        TEXT_CONTAINS,
        TEXT_IS
    };

    MatcherUsingText(string        &type,
                     string        &spec,
                     TextMatchType  match_type);

    virtual ~MatcherUsingText();

    virtual bool matches(char *candidate) const override;

    char          *const text;
    unsigned       const text_len;
    TextMatchType  const match_type;
};

class SubscriptionList

class SubscriptionList {
  public:
    SubscriptionList(Matcher *f);
    ~SubscriptionList();
    void    add_subscriber(Agent *client);
    void remove_subscriber(Agent *client);
    bool empty            ();

    static SubscriptionList *get      (string &type, string &spec, string &options);
    static void              broadcast(uint64 message_id, char *category, string &msg);

  private:
    Matcher           *matcher;
    std::set  subscribers;
};

SubscriptionList also holds a set (actually a std::map) of all subscription lists. It is not visible outside the implementation file.

static std::map all_lists;

The map key is a concatenation of the list’s matcher’s type and specification, like “begins-with---/upper” or “regexp---.”.

You can reach me by email at “lars dash 7 dot sdu dot se” or by telephone +46 705 189090

View source for the content of this page.