Paths Architecture

Rough Draft March 10, 2000
emrek@cs.stanford.edu

Outline

This is a quick sketch of what I think are the main points of my paths architecture. If you're new to paths, you may also want to read an Overview of Paths. This document is a rough draft. All comments are greatly appreciated. Thanks!


Path Creation and Instantiation

Vocabulary:

  1. Partial Path: an unconnected (or partially connected) path (two adjacent operators in a path are considered unconnected if there is a type mismatch between their inputs and outputs).
  2. Logical Path: A completely connected path.
  3. Physical Path: A logical path whose operators have been assigned to hosts for execution.
  4. Path Instance: A running path. Once a path is running, information about it (it's member operators, etc) is not explicitly maintained, and must be discovered through introspection.

The process of creating a path is divided into three stages:

  1. The Path Finder: Given some request for a path, will generate a a logical path. A request can take the form of a partial path, or a query in a higher-level language.
  2. The Path Placer: Given a logical path, a path placer will assign each operator to run on a specific host. A path placer may choose o assign operators to run only on a single-machine, within a cluster, or across the wide-area. Generates a physical path.
  3. Path Dispatcher: splits and dispatches the physical path to appropriate hosts. Each host will receive instructions for implementing that part of the path assigned to them.
  4. Path Instantiator: the path instantiator will instantiate operators and connectors and begin data flow.

Because the path creation process is itself a path, new stages can be added and existing stages can be replaced easily. Examples of possible useful stages includes: adding caching operators around operators in a logical path, parallelizing operators within a path, adding supervisor and logging operators, and adding a higher-level query parser before (or instead of) the Path Finder.

Figure 1: An outline of the "Path Creation" path. Note: queues between operators are not shown in this figure for clarity.


Path Execution


A path with source, transformer, and sink operators
Figure 2: A simple path single-machine with source, transformer, and sink operators. The arrows indicate the flow of data between operators and queues.


Queueing ADUs Figure 3: ADU Queues and a Global Schedule

Here's a quick example of the operation of a path (see figure 1):
  1. The source operator generates an ADU, and enqueues it in its output queue.
    
      {
        ADU adu = generateADU();
        cfg.getQueue( "myoutput" ).enqueue( adu );
      }
    
    
  2. When the ADU is added to the queue, the queue also adds the ADU to the global schedule.
  3. The main thread in the host takes the ADU off of the global schedule. This ADU has a reference to the queue it's in. The main thread looks up the set of workers listening to this queue and picks one. In this example, there's only a single worker, the transformer, listening to this queue.
  4. The transformer runs in the main thread's context. The operators all run in this single thread-context. No blocking calls are allowed, though operators are allowed to launch worker threads. There is no requirement that an operator generate an output ADU immediately or even at all; nor do the operator's output ADUs have to have a one-to-one relationship with input ADUs.
    
      /* the transformer implements a receive function... */
      public void receive( ADU adu ) {
        ADU out = doSomething( adu );
        cfg.getQueue( "myoutput" ).enqueue( out );
      }
    
  5. When the transformer enqueues its output ADU, it goes through the same process as the ADU outputted by the source operator.
  6. The sink operator acts just like the transformer operator, except that it never generates any output ADUs (surprise!). A sink operator generally displays or stores this ADU itself or communicates this ADU to some external entity for display or storage.

Clusters


A basic path host is a single-node, with no networking capability built-in. The clustering and load-balancing functionality is built on top of the path architecture as a set of paths run by default.

The first bit of functionality that needs to be added to make a path host cluster aware is simple communication between machines in a cluster. What we see in figure 4 is a path that implements a simple listener and announcer. First, the ADUListener starts listening on some TCP port for ADUs. When it receives an ADU, it forwards it on to a named local queue. Of course, the existence of this ADUListener has to be known by the other machines in the cluster. So, every second, the Timer operator sends a signal to the ADUListener, who sends the host:port announcement to the AnnounceHost operator. The AnnounceHost operator broadcasts this message to a well-known multicast group/port.

Figure 4: Listening for messages from the cluster, and broadcasting our existence.

An AnnouncementListener operator receives these multicast announcements and forwards the list of hosts to interested operators (for example, the cluster-placer operator discussed in the Path Creation section would receive a list of these hosts).

This system will be extended to support a load-balancing by adding a "load-measurer" operator, and announcing the load on a machine along with the host:port pair.


Fault-tolerance


Here's a quick proposal for adding fault-tolerance to a path. It's not meant to be the perfect way to do fault-tolerance. Rather, it should illustrate how this sort of functionality is easily be implemented in this architecture.

The basic idea is to have "Supervisor" operators listening to status messages from operators. If these status messages fail to arrive or indicate some failure has occurred, the supervisor will have enough information to restart the failed portion of the path. Note that a Supervisor only has to know about the operators it's directly supervising and their immediate neighbors.

To avoid clogging the network with a large number of status messages, the supervisor can be co-located with the operators its supervising. If the whole machine crashes, the Supervisor will need to be restarted as well. Since the supervisor is itself an operator, it can be supervised just like any other operator. For this case, we need to supervise from another machine.

Some fine details: