Creating a distributed service

The problem

We will solve a variant Project Euler problem #6 using Piped. The difference between the orignal problem and our variant is in bold:

The sum of the squares of the first ten natural numbers is,

1 2 + 2 2 + ... + 10 2 = 385 The square of the sum of the first ten natural numbers is,

(1 + 2 + ... + 10) 2 = 55 2 = 3025 Hence the difference between the sum of the squares of the first ten natural numbers and the square of the sum is 3025 - 385 = 2640.

Find the difference between the sum of the squares of the list of input numbers and the square of the sum.

First, we’ll start by solving it in a single process. Then we’ll show how to distribute the calculations to worker nodes write the responses back to the client.

Solving using a single process

web:
    site:
        routing:
            __config__:
                processor: pipeline.web.compute

pipelines:
    web:
        compute:
            # parse the request arguments
            - eval-lambda:
                input_path: request
                output_path: numbers
                lambda: "request: [int(a) for a in request.args['n']]"

            # calculate the sum of the squares
            - eval-lambda:
                input_path: numbers
                output_path: sum
                lambda: "numbers: sum([number**2 for number in numbers])"

            # calculate the square of the sum
            - eval-lambda:
                input_path: numbers
                output_path: square
                lambda: "numbers: sum(numbers)**2"

            - run-pipeline:
                pipeline: .write-response

        write-response:
            # create a json-encoded response and write it to the client
            - eval-lambda:
                output_path: content
                lambda: "baton: dict(sum=baton['sum'], square=baton['square'], diff=baton['square']-baton['sum'])"

            - encode-json:
                input_path: content

            - write-web-response

Run this configuration:

$ piped -nc rpc-server.yaml
2011-05-30 21:03:38+0200 [-] Log opened.
2011-05-30 21:03:38+0200 [-] twistd 11.0.0 (/Users/username/.virtualenvs/Piped/bin/python2.7 2.7.1) starting up.
2011-05-30 21:03:38+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2011-05-30 21:03:38+0200 [-] twisted.web.server.Site starting on 8080
2011-05-30 21:03:38+0200 [-] Starting factory <twisted.web.server.Site instance at 0x102b72908>

By opening http://localhost:8080/?n=1&n=2&n=3, we can verify that the sum, square and difference is as expected.

Testing the service

Testing a service manually after every configuration change is both time-consuming and burdensome.

includes:
    - rpc-server.yaml

plugins:
    bundles:
        my_bundle:
            - rpc_tutorial

web:
    site:
        port: 8088

system-events:
    startup:
        system-test: pipeline.system-testing.start

pipelines:
    system-testing:
        start:
            chained_consumers:
                - create-statustest-reporter
                - run-pipeline:
                    pipeline: .run
                - wait-for-statustest-reporter:
                    done: True
                - shutdown

            chained_error_consumers:
                - print-failure-traceback
                - shutdown

        # add any test-processors to this test-pipeline
        run:
            - test-rpc

The testing pipelines use a processor named test-rpc which we create in the rpc_tutorial package:

$ mkdir rpc_tutorial
$ touch rpc_tutorial/__init__.py

Create rpc_tutorial/test_rpc.py with the following contents:

import time
import json

from zope import interface
from twisted.web import client
from twisted.internet import defer

from piped.plugins.status_testing import statustest, processors
from piped import processing


client.HTTPClientFactory.noisy = False


class TestRPCProcessor(processors.StatusTestProcessor):
    interface.classProvides(processing.IProcessor)
    name = 'test-rpc'

    class TestRPC(statustest.StatusTestCase):

        @defer.inlineCallbacks
        def get_result(self, *numbers):
            numbers = [str(num) for num in numbers]
            result_json = yield client.getPage('http://localhost:8088/?n=%s'%'&n='.join(numbers))
            defer.returnValue(json.loads(result_json))

        @defer.inlineCallbacks
        def statustest_single_number(self):
            result = yield self.get_result(42)

            self.assertEquals(result, dict(sum=42**2, square=42**2, diff=0))

        @defer.inlineCallbacks
        def statustest_ten_numbers(self):
            result = yield self.get_result(1,2,3,4,5,6,7,8,9,10)

            self.assertEquals(result, dict(sum=385, square=3025, diff=2640))

We can now test the service by running the test-rpc.yaml configuration:

$ PYTHONPATH=. piped -nc test-rpc.yaml
2011-05-30 21:49:20+0200 [-] Log opened.
2011-05-30 21:49:20+0200 [-] twistd 11.0.0 (/Users/username/.virtualenvs/Piped/bin/python2.7 2.7.1) starting up.
2011-05-30 21:49:20+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2011-05-30 21:49:20+0200 [-] twisted.web.server.Site starting on 8088
2011-05-30 21:49:20+0200 [-] Starting factory <twisted.web.server.Site instance at 0x102e16e60>
2011-05-30 21:49:21+0200 [status_test] rpc_tutorial.test_rpc
2011-05-30 21:49:21+0200 [status_test]   TestRPC
2011-05-30 21:49:21+0200 [HTTPChannel,0,127.0.0.1]     statustest_single_number ... 127.0.0.1 - - [30/May/2011:19:49:20 +0000] "GET /?n=42 HTTP/1.0" 200 40 "-" "Twisted PageGetter"
2011-05-30 21:49:21+0200 [status_test]                                           [OK]
2011-05-30 21:49:21+0200 [HTTPChannel,1,127.0.0.1]     statustest_ten_numbers ... 127.0.0.1 - - [30/May/2011:19:49:20 +0000] "GET /?n=1&n=2&n=3&n=4&n=5&n=6&n=7&n=8&n=9&n=10 HTTP/1.0" 200 42 "-" "Twisted PageGetter"
2011-05-30 21:49:21+0200 [status_test]                                             [OK]
2011-05-30 21:49:21+0200 [status_test]
2011-05-30 21:49:21+0200 [status_test] -------------------------------------------------------------------------------
2011-05-30 21:49:21+0200 [status_test] Ran 2 tests in 0.010s
2011-05-30 21:49:21+0200 [status_test]
2011-05-30 21:49:21+0200 [status_test] PASSED (successes=2)
2011-05-30 21:49:21+0200 [-] (TCP Port 8088 Closed)
2011-05-30 21:49:21+0200 [-] Stopping factory <twisted.web.server.Site instance at 0x102e16e60>
2011-05-30 21:49:21+0200 [-] Main loop terminated.
2011-05-30 21:49:21+0200 [-] Server Shut Down.

Distributing

Decide on a communication protocol.

Getting ZeroMQ

Varies by platform:

Windows

Download the appropiate ZeroMQ binary from http://www.lfd.uci.edu/~gohlke/pythonlibs/#pyzmq.

OS X

The easiest way to install ZeroMQ on OS X is by using Homebrew

$ brew install zeromq
$ easy_install pyzmq

Linux

Follow the installation instructions for zeromq on http://www.zeromq.org/intro:get-the-software, then easy_install pyzmq:

$ easy_install pyzmq

Quick primer on message queues

We’re going to use two four of queues: PUSH, PULL, SUB and PUB.

Push to workers that publishes back to the server.

ZeroMQ sends strings, not structured data, so we have to serialize the messages both to and from the workers.

After getting a response from a worker, we need to see if the request can be finished. This means we have to store the request in-memory from the time the client makes it until we’re ready to respond.

The workflow

Below is a diagram that illustrates the workflow of our distributed service.

digraph distributing_workflow {
pack = false;

client [shape=box];

client -> receive_request;
write_response -> client [constraint=false];

subgraph cluster_worker {
    label = "Worker";
    worker_sum_in -> sum -> worker_sum_out
    worker_square_in -> square -> worker_square_out
}

subgraph cluster_server {
    label = "Server";

    receive_request -> compute

    compute -> server_sum_out
    compute -> server_square_out

    server_sum_in -> receive_partial
    server_square_in -> receive_partial

    receive_partial -> write_response

    { rank = same; receive_request; write_response; }

}

server_sum_out [shape=invhouse];
worker_sum_out [shape=invhouse];
server_square_out [shape=invhouse];
worker_square_out [shape=invhouse];

server_sum_in [shape=invhouse];
worker_sum_in [shape=invhouse];
server_square_in [shape=invhouse];
worker_square_in [shape=invhouse];

server_sum_out -> worker_sum_in [tailport=s, headport=n, weight=1];
server_square_out -> worker_square_in [tailport=s, headport=n];

worker_sum_out -> server_sum_in [tailport=s, headport=n];
worker_square_out -> server_square_in [tailport=s, headport=n];
}

Implementing workers

We start by implementing a worker that is going to perform both the calculations. This worker requires four queues: two input queues and two output queues.

The worker will receive json-encoded messages from the input queues and write a json-encoded response to the corresponding output queue.

We implement a square client in rpc-client-square.yaml:

includes:
    - queues.yaml

zmq:
    queues:
        worker_square_in:
            processor: pipeline.zmq.square
    
pipelines:
    zmq:
        square:
            - decode-json
            - eval-lambda:
                input_path: numbers
                output_path: square
                lambda: "numbers: sum(numbers)**2"
            - encode-json
            - send-zmq-message:
                queue_name: worker_square_out

... a sum client in rpc-client-sum.yaml:

includes:
    - queues.yaml

zmq:
    queues:
        worker_sum_in:
            processor: pipeline.zmq.sum
    
pipelines:
    zmq:
        sum:
            - decode-json
            - eval-lambda:
                input_path: numbers
                output_path: sum
                lambda: "numbers: sum([number**2 for number in numbers])"
            - encode-json
            - send-zmq-message:
                queue_name: worker_sum_out

... and for convenince, we create a configuration that incorporates both these workers into one single worker in rpc-client.yaml:

includes:
    - rpc-client-sum.yaml
    - rpc-client-square.yaml

A reader with a keen eye will notice that we’ve moved the zmq queue definitions to its own file, which makes it easier. Even if a queue is defined, it will not be created before it is required, so it is safe for multiple processes to import the same zmq queue definitions. The contents of queues.yaml should be as follows:

zmq:
    queues:
        # queues used by a square worker
        worker_square_in:
            type: PULL
            connects:
                - tcp://0.0.0.0:6000

        worker_square_out:
            type: PUB
            connects:
                - tcp://0.0.0.0:6001

        # queues used by a sum worker
        worker_sum_in:
            type: PULL
            connects:
                - tcp://0.0.0.0:5000

        worker_sum_out:
            type: PUB
            connects:
                - tcp://0.0.0.0:5001

The zmq.sum and zmq.square pipelines expect to receive a json-encoded baton on the form:

{
    id: id,
    numbers: [a, b, c]
}

... and will respond with a json-encoded baton:

{
    id: id,
    numbers: [a, b, c],
    sum/square: ...,
}

Testing the worker

First we create the test-client test processor in rpc_tutorial/test_client.py:

import json

from zope import interface
from twisted.internet import defer

from piped.plugins.status_testing import statustest, processors
from piped import processing


class TestClientProcessor(processors.StatusTestProcessor):
    interface.classProvides(processing.IProcessor)
    name = 'test-client'

    class TestClient(statustest.StatusTestCase):

        def setUp(self, sum_pipeline, square_pipeline):
            self.sum_pipeline = sum_pipeline
            self.square_pipeline = square_pipeline

        @defer.inlineCallbacks
        def statustest_simple_sum(self):
            input = json.dumps(dict(numbers=[1,2,3]))
            results = yield self.sum_pipeline(input)
            output = json.loads(results[0])
            self.assertEquals(output['sum'], 14)

        @defer.inlineCallbacks
        def statustest_simple_square(self):
            input = json.dumps(dict(numbers=[1,2,3]))
            results = yield self.square_pipeline(input)
            output = json.loads(results[0])
            self.assertEquals(output['square'], 36)

    def configure(self, runtime_environment):
        self.dependencies = runtime_environment.create_dependency_map(self,
            sum_pipeline = dict(provider='pipeline.zmq.sum'),
            square_pipeline = dict(provider='pipeline.zmq.square')
        )

    def get_namespace(self, baton):
        return self.dependencies

Add an entry for rpc-client.yaml in the list of includes in test-rpc.yaml and add the test-client test processor to the testing pipeline:

includes:
    - rpc-server.yaml
    - rpc-client.yaml

...

        run:
            - test-client
            - test-rpc

Running the test configuration should now print something like this:

$ PYTHONPATH=. piped -nc test-rpc.yaml
2011-05-31 13:28:47+0200 [-] Log opened.
2011-05-31 13:28:47+0200 [-] twistd 11.0.0 (/Users/username/.virtualenvs/Piped/bin/python2.7 2.7.1) starting up.
2011-05-31 13:28:47+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2011-05-31 13:28:47+0200 [-] /Users/username/pydev/Piped/piped/conf.py:25: piped.exceptions.ConfigurationWarning:
    WARNING: configuration file loaded multiple times: "queues.yaml"
2011-05-31 13:28:47+0200 [-] twisted.web.server.Site starting on 8080
2011-05-31 13:28:47+0200 [-] Starting factory <twisted.web.server.Site instance at 0x102989bd8>
2011-05-31 13:28:48+0200 [status_test] rpc_tutorial.test_rpc
2011-05-31 13:28:48+0200 [status_test]   TestRPC
2011-05-31 13:28:48+0200 [-]     statustest_single_number ... 127.0.0.1 - - [31/May/2011:11:28:47 +0000] "GET /?n=42 HTTP/1.0" 200 40 "-" "Twisted PageGetter"
2011-05-31 13:28:48+0200 [status_test]                                           [OK]
2011-05-31 13:28:48+0200 [-]     statustest_ten_numbers ... 127.0.0.1 - - [31/May/2011:11:28:47 +0000] "GET /?n=1&n=2&n=3&n=4&n=5&n=6&n=7&n=8&n=9&n=10 HTTP/1.0" 200 42 "-" "Twisted PageGetter"
2011-05-31 13:28:48+0200 [status_test]                                             [OK]
2011-05-31 13:28:48+0200 [status_test] rpc_tutorial.test_client
2011-05-31 13:28:48+0200 [status_test]   TestClient
2011-05-31 13:28:48+0200 [status_test]     statustest_simple_square ...                                           [OK]
2011-05-31 13:28:48+0200 [status_test]     statustest_simple_sum ...                                              [OK]
2011-05-31 13:28:48+0200 [status_test]
2011-05-31 13:28:48+0200 [status_test] -------------------------------------------------------------------------------
2011-05-31 13:28:48+0200 [status_test] Ran 4 tests in 0.087s
2011-05-31 13:28:48+0200 [status_test]
2011-05-31 13:28:48+0200 [status_test] PASSED (successes=4)
2011-05-31 13:28:48+0200 [-] (TCP Port 8080 Closed)
2011-05-31 13:28:48+0200 [-] Stopping factory <twisted.web.server.Site instance at 0x102989bd8>
2011-05-31 13:28:48+0200 [-] Main loop terminated.
2011-05-31 13:28:48+0200 [-] Server Shut Down.

Making the server use workers

In this section, we will extend our rpc-server to use workers to perform its computations.

The workers operates independently of our server, so we have to handle partial responses from the workers before writing the finished response to the client.

includes:
    - queues.yaml

zmq:
    queues:
        server_sum_in:
            processor: pipeline.zmq.partial_response
        server_square_in:
            processor: pipeline.zmq.partial_response
web:
    site:
        port: 8080
        debug:
            allow:
                - localhost
        routing:
            __config__:
                processor: pipeline.web.compute

# We use a shared context (that starts out as an empty dictionary) to
# store requests that haven't been completed yet.
contexts:
    pending: {}
    
pipelines:
    web:
        compute:
            - fetch-context:
                context: pending
            
            - exec-code:
                code: |
                    request = baton['request']
                    request_id = id(request)

                    # store the baton by the request id in the pending context
                    baton['pending'][request_id] = baton

                    # return a baton that can be json-encoded and sent to the workers
                    return dict(id=request_id, numbers=[int(a) for a in request.args['n']])

            - encode-json

            # send the encoded baton to both the sum and the square output queue.
            - send-zmq-message:
                queue_name: server_sum_out
                retries: 10
            - send-zmq-message:
                queue_name: server_square_out
                retries: 10
                
        write-response:
            - eval-lambda:
                output_path: content
                lambda: "baton: dict(sum=baton['sum'], square=baton['square'], diff=baton['square']-baton['sum'])"
            
            - encode-json:
                input_path: content 
            
            - write-web-response

    zmq:
        partial_response:
            # the workers send their answer as a json-encoded baton
            - decode-json
            
            - fetch-context:
                context: pending

            # there's nothing we can do if we cant find a pending baton with the given id:
            - stop:
                decider: "baton: baton.get('id') not in baton['pending']"

            # retrieve our pending baton
            - eval-lambda:
                output_path: pending_baton
                lambda: "baton: baton['pending'][baton['id']]"

            # copy the sum/square to the pending baton if sum/square exists:
            - remap:
                mapping:
                    sum: pending_baton.sum
                    square: pending_baton.square

            # if we don't have a complete response to the client, we can't continue
            - stop:
                input_path: pending_baton
                decider: "pending_baton: 'sum' not in pending_baton or 'square' not in pending_baton"

            # remove the request from the pending requests
            - eval-lambda:
                output_path: null
                lambda: "baton: baton['pending'].pop(baton['id'])"
           
            - run-pipeline:
                input_path: pending_baton
                pipeline: web.write-response

We can verify that this implementation works the same way as the previous one by running our system tests.

Extending the server

Adding timeouts

If a server has no workers or messages to or from the workers are lost, a client may be waiting for a request indefinitely. To avoid this, we need to be able to remove requests that have been pending for too long without being finished.

This is a three step process:

  1. Add a timestamp to the pending_baton when its added to the context.
  2. Regularily check the pending context for batons that have been pending for too long.
  3. If a pending_baton has been pending for too long, remove it from the pending batons context.

The first step can be done by adding a processor to the web.compute pipeline before the exec-code processor:

            - eval-lambda:
                namespace:
                    time: time.time
                output_path: added
                lambda: "_: time()"
            

To regularily check the for old batons in the pending context, we create a tick interval that will generate batons and process them in a reap_pending_batons pipeline on a regular interval. We create rpc-server-reaper.yaml with the following contents:

ticks:
    interval:
        reaper:
            # create a baton every second and process it in the reap-pending-batons pipeline:
            interval: 1
            processor: pipeline.reap-pending-batons

pipelines:
    reap-pending-batons:
        - fetch-context:
            context: pending

        - exec-code:
            input_path: pending
            output_path: reap_batons
            namespace:
                time: time.time
            code: |
                # give requests two seconds to complete:
                cutoff = time() - 2

                # create a list of batons that have exceeded the timeout:
                reap_batons = list()

                for id, pending_baton in input.items():
                    if pending_baton['added'] < cutoff:
                        # we found a baton that was too old, so we remove it from the pending dictionary and add it
                        # to the list of batons we're going to reap
                        input.pop(id)
                        reap_batons.append(pending_baton)

                return reap_batons

        # process every baton returned by the above processor in the reap-baton pipeline:
        - for-each:
            input_path: reap_batons
            pipeline: reap-baton
            fail_on_error: true

    reap-baton:
        # we reap a baton writing a simple response to the client.
        - write-web-response:
            fallback_content: '{"success": false}'

Insert the above processor into the web.work pipeline and append rpc-server-reaper.yaml to the list of includes in rpc-server.yaml.

Testing the timeouts

class TestRPCWithoutWorkersProcessor(processors.StatusTestProcessor):
    interface.classProvides(processing.IProcessor)
    name = 'test-rpc-without-workers'

    class TestRPCWithoutWorkers(statustest.StatusTestCase):
        
        def setUp(self, poller):
            self.poller = poller
            # we temporarily stop the zmq poller to prevent the workers from receiving any messages
            self.poller.stopService()
            
        def tearDown(self):
            self.poller.startService()

        @defer.inlineCallbacks
        def statustest_single_number(self):
            started = time.time()
            result = yield client.getPage('http://localhost:8080/?n=42')
            finished = time.time()
            
            # the server should have given the workers two seconds to produce both partial responses:
            self.assertTrue((finished-started) > 2)
            # and our reap-baton pipeline should have provided us with a fallback response.
            self.assertEquals(json.loads(result), dict(success=False))
            
            
    def configure(self, runtime_environment):
        self.dependencies = runtime_environment.create_dependency_map(self,
            poller = dict(provider='zmq.poller')
        )
        
    def get_namespace(self, baton):
        return dict(poller=self.dependencies.poller)

Add the test processor to the testing pipeline:

        run:
            - test-rpc
            - test-client
            - test-rpc-without-workers

Running the tests now should result in output resembling this:

$ PYTHONPATH=. piped -nc test-rpc.yaml
2011-05-31 15:52:35+0200 [-] Log opened.
2011-05-31 15:52:35+0200 [-] twistd 11.0.0 (/Users/username/.virtualenvs/Piped/bin/python2.7 2.7.1) starting up.
2011-05-31 15:52:35+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2011-05-31 15:52:35+0200 [-] /Users/username/pydev/Piped/piped/conf.py:25: piped.exceptions.ConfigurationWarning:
    WARNING: configuration file loaded multiple times: "queues.yaml"
2011-05-31 15:52:36+0200 [-] twisted.web.server.Site starting on 8080
2011-05-31 15:52:36+0200 [-] Starting factory <twisted.web.server.Site instance at 0x102e237a0>
2011-05-31 15:52:36+0200 [status_test] rpc_tutorial.test_rpc
2011-05-31 15:52:36+0200 [status_test]   TestRPC
2011-05-31 15:52:36+0200 [-]     statustest_single_number ... 127.0.0.1 - - [31/May/2011:13:52:36 +0000] "GET /?n=42 HTTP/1.0" 200 40 "-" "Twisted PageGetter"
2011-05-31 15:52:36+0200 [status_test]                                           [OK]
2011-05-31 15:52:36+0200 [-]     statustest_ten_numbers ... 127.0.0.1 - - [31/May/2011:13:52:36 +0000] "GET /?n=1&n=2&n=3&n=4&n=5&n=6&n=7&n=8&n=9&n=10 HTTP/1.0" 200 42 "-" "Twisted PageGetter"
2011-05-31 15:52:36+0200 [status_test]                                             [OK]
2011-05-31 15:52:36+0200 [status_test] rpc_tutorial.test_client
2011-05-31 15:52:36+0200 [status_test]   TestClient
2011-05-31 15:52:36+0200 [status_test]     statustest_simple_square ...                                           [OK]
2011-05-31 15:52:36+0200 [status_test]     statustest_simple_sum ...                                              [OK]
2011-05-31 15:52:36+0200 [status_test] rpc_tutorial.test_rpc
2011-05-31 15:52:36+0200 [status_test]   TestRPCWithoutWorkers
2011-05-31 15:52:39+0200 [-]     statustest_single_number ... 127.0.0.1 - - [31/May/2011:13:52:39 +0000] "GET /?n=42 HTTP/1.0" 200 18 "-" "Twisted PageGetter"
2011-05-31 15:52:39+0200 [status_test]                                           [OK]
2011-05-31 15:52:39+0200 [status_test]
2011-05-31 15:52:39+0200 [status_test] -------------------------------------------------------------------------------
2011-05-31 15:52:39+0200 [status_test] Ran 5 tests in 3.000s
2011-05-31 15:52:39+0200 [status_test]
2011-05-31 15:52:39+0200 [status_test] PASSED (successes=5)
2011-05-31 15:52:39+0200 [-] (TCP Port 8080 Closed)
2011-05-31 15:52:39+0200 [-] Stopping factory <twisted.web.server.Site instance at 0x102e237a0>
2011-05-31 15:52:39+0200 [-] Main loop terminated.
2011-05-31 15:52:39+0200 [-] Server Shut Down.