We will solve a variant Project Euler problem #6 using Piped. The difference between the orignal problem and our variant is in bold:
The sum of the squares of the first ten natural numbers is,
1 2 + 2 2 + ... + 10 2 = 385 The square of the sum of the first ten natural numbers is,
(1 + 2 + ... + 10) 2 = 55 2 = 3025 Hence the difference between the sum of the squares of the first ten natural numbers and the square of the sum is 3025 - 385 = 2640.
Find the difference between the sum of the squares of the list of input numbers and the square of the sum.
First, we’ll start by solving it in a single process. Then we’ll show how to distribute the calculations to worker nodes write the responses back to the client.
web:
site:
routing:
__config__:
processor: pipeline.web.compute
pipelines:
web:
compute:
# parse the request arguments
- eval-lambda:
input_path: request
output_path: numbers
lambda: "request: [int(a) for a in request.args['n']]"
# calculate the sum of the squares
- eval-lambda:
input_path: numbers
output_path: sum
lambda: "numbers: sum([number**2 for number in numbers])"
# calculate the square of the sum
- eval-lambda:
input_path: numbers
output_path: square
lambda: "numbers: sum(numbers)**2"
- run-pipeline:
pipeline: .write-response
write-response:
# create a json-encoded response and write it to the client
- eval-lambda:
output_path: content
lambda: "baton: dict(sum=baton['sum'], square=baton['square'], diff=baton['square']-baton['sum'])"
- encode-json:
input_path: content
- write-web-response
Run this configuration:
$ piped -nc rpc-server.yaml
2011-05-30 21:03:38+0200 [-] Log opened.
2011-05-30 21:03:38+0200 [-] twistd 11.0.0 (/Users/username/.virtualenvs/Piped/bin/python2.7 2.7.1) starting up.
2011-05-30 21:03:38+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2011-05-30 21:03:38+0200 [-] twisted.web.server.Site starting on 8080
2011-05-30 21:03:38+0200 [-] Starting factory <twisted.web.server.Site instance at 0x102b72908>
By opening http://localhost:8080/?n=1&n=2&n=3, we can verify that the sum, square and difference is as expected.
Testing a service manually after every configuration change is both time-consuming and burdensome.
includes:
- rpc-server.yaml
plugins:
bundles:
my_bundle:
- rpc_tutorial
web:
site:
port: 8088
system-events:
startup:
system-test: pipeline.system-testing.start
pipelines:
system-testing:
start:
chained_consumers:
- create-statustest-reporter
- run-pipeline:
pipeline: .run
- wait-for-statustest-reporter:
done: True
- shutdown
chained_error_consumers:
- print-failure-traceback
- shutdown
# add any test-processors to this test-pipeline
run:
- test-rpc
The testing pipelines use a processor named test-rpc which we create in the rpc_tutorial package:
$ mkdir rpc_tutorial
$ touch rpc_tutorial/__init__.py
Create rpc_tutorial/test_rpc.py with the following contents:
import time
import json
from zope import interface
from twisted.web import client
from twisted.internet import defer
from piped.plugins.status_testing import statustest, processors
from piped import processing
client.HTTPClientFactory.noisy = False
class TestRPCProcessor(processors.StatusTestProcessor):
interface.classProvides(processing.IProcessor)
name = 'test-rpc'
class TestRPC(statustest.StatusTestCase):
@defer.inlineCallbacks
def get_result(self, *numbers):
numbers = [str(num) for num in numbers]
result_json = yield client.getPage('http://localhost:8088/?n=%s'%'&n='.join(numbers))
defer.returnValue(json.loads(result_json))
@defer.inlineCallbacks
def statustest_single_number(self):
result = yield self.get_result(42)
self.assertEquals(result, dict(sum=42**2, square=42**2, diff=0))
@defer.inlineCallbacks
def statustest_ten_numbers(self):
result = yield self.get_result(1,2,3,4,5,6,7,8,9,10)
self.assertEquals(result, dict(sum=385, square=3025, diff=2640))
We can now test the service by running the test-rpc.yaml configuration:
$ PYTHONPATH=. piped -nc test-rpc.yaml
2011-05-30 21:49:20+0200 [-] Log opened.
2011-05-30 21:49:20+0200 [-] twistd 11.0.0 (/Users/username/.virtualenvs/Piped/bin/python2.7 2.7.1) starting up.
2011-05-30 21:49:20+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2011-05-30 21:49:20+0200 [-] twisted.web.server.Site starting on 8088
2011-05-30 21:49:20+0200 [-] Starting factory <twisted.web.server.Site instance at 0x102e16e60>
2011-05-30 21:49:21+0200 [status_test] rpc_tutorial.test_rpc
2011-05-30 21:49:21+0200 [status_test] TestRPC
2011-05-30 21:49:21+0200 [HTTPChannel,0,127.0.0.1] statustest_single_number ... 127.0.0.1 - - [30/May/2011:19:49:20 +0000] "GET /?n=42 HTTP/1.0" 200 40 "-" "Twisted PageGetter"
2011-05-30 21:49:21+0200 [status_test] [OK]
2011-05-30 21:49:21+0200 [HTTPChannel,1,127.0.0.1] statustest_ten_numbers ... 127.0.0.1 - - [30/May/2011:19:49:20 +0000] "GET /?n=1&n=2&n=3&n=4&n=5&n=6&n=7&n=8&n=9&n=10 HTTP/1.0" 200 42 "-" "Twisted PageGetter"
2011-05-30 21:49:21+0200 [status_test] [OK]
2011-05-30 21:49:21+0200 [status_test]
2011-05-30 21:49:21+0200 [status_test] -------------------------------------------------------------------------------
2011-05-30 21:49:21+0200 [status_test] Ran 2 tests in 0.010s
2011-05-30 21:49:21+0200 [status_test]
2011-05-30 21:49:21+0200 [status_test] PASSED (successes=2)
2011-05-30 21:49:21+0200 [-] (TCP Port 8088 Closed)
2011-05-30 21:49:21+0200 [-] Stopping factory <twisted.web.server.Site instance at 0x102e16e60>
2011-05-30 21:49:21+0200 [-] Main loop terminated.
2011-05-30 21:49:21+0200 [-] Server Shut Down.
Decide on a communication protocol.
Varies by platform:
Download the appropiate ZeroMQ binary from http://www.lfd.uci.edu/~gohlke/pythonlibs/#pyzmq.
The easiest way to install ZeroMQ on OS X is by using Homebrew
$ brew install zeromq
$ easy_install pyzmq
Follow the installation instructions for zeromq on http://www.zeromq.org/intro:get-the-software, then easy_install pyzmq:
$ easy_install pyzmq
We’re going to use two four of queues: PUSH, PULL, SUB and PUB.
Push to workers that publishes back to the server.
ZeroMQ sends strings, not structured data, so we have to serialize the messages both to and from the workers.
After getting a response from a worker, we need to see if the request can be finished. This means we have to store the request in-memory from the time the client makes it until we’re ready to respond.
We start by implementing a worker that is going to perform both the calculations. This worker requires four queues: two input queues and two output queues.
The worker will receive json-encoded messages from the input queues and write a json-encoded response to the corresponding output queue.
We implement a square client in rpc-client-square.yaml:
includes:
- queues.yaml
zmq:
queues:
worker_square_in:
processor: pipeline.zmq.square
pipelines:
zmq:
square:
- decode-json
- eval-lambda:
input_path: numbers
output_path: square
lambda: "numbers: sum(numbers)**2"
- encode-json
- send-zmq-message:
queue_name: worker_square_out
... a sum client in rpc-client-sum.yaml:
includes:
- queues.yaml
zmq:
queues:
worker_sum_in:
processor: pipeline.zmq.sum
pipelines:
zmq:
sum:
- decode-json
- eval-lambda:
input_path: numbers
output_path: sum
lambda: "numbers: sum([number**2 for number in numbers])"
- encode-json
- send-zmq-message:
queue_name: worker_sum_out
... and for convenince, we create a configuration that incorporates both these workers into one single worker in rpc-client.yaml:
includes:
- rpc-client-sum.yaml
- rpc-client-square.yaml
A reader with a keen eye will notice that we’ve moved the zmq queue definitions to its own file, which makes it easier. Even if a queue is defined, it will not be created before it is required, so it is safe for multiple processes to import the same zmq queue definitions. The contents of queues.yaml should be as follows:
zmq:
queues:
# queues used by a square worker
worker_square_in:
type: PULL
connects:
- tcp://0.0.0.0:6000
worker_square_out:
type: PUB
connects:
- tcp://0.0.0.0:6001
# queues used by a sum worker
worker_sum_in:
type: PULL
connects:
- tcp://0.0.0.0:5000
worker_sum_out:
type: PUB
connects:
- tcp://0.0.0.0:5001
The zmq.sum and zmq.square pipelines expect to receive a json-encoded baton on the form:
{
id: id,
numbers: [a, b, c]
}
... and will respond with a json-encoded baton:
{
id: id,
numbers: [a, b, c],
sum/square: ...,
}
First we create the test-client test processor in rpc_tutorial/test_client.py:
import json
from zope import interface
from twisted.internet import defer
from piped.plugins.status_testing import statustest, processors
from piped import processing
class TestClientProcessor(processors.StatusTestProcessor):
interface.classProvides(processing.IProcessor)
name = 'test-client'
class TestClient(statustest.StatusTestCase):
def setUp(self, sum_pipeline, square_pipeline):
self.sum_pipeline = sum_pipeline
self.square_pipeline = square_pipeline
@defer.inlineCallbacks
def statustest_simple_sum(self):
input = json.dumps(dict(numbers=[1,2,3]))
results = yield self.sum_pipeline(input)
output = json.loads(results[0])
self.assertEquals(output['sum'], 14)
@defer.inlineCallbacks
def statustest_simple_square(self):
input = json.dumps(dict(numbers=[1,2,3]))
results = yield self.square_pipeline(input)
output = json.loads(results[0])
self.assertEquals(output['square'], 36)
def configure(self, runtime_environment):
self.dependencies = runtime_environment.create_dependency_map(self,
sum_pipeline = dict(provider='pipeline.zmq.sum'),
square_pipeline = dict(provider='pipeline.zmq.square')
)
def get_namespace(self, baton):
return self.dependencies
Add an entry for rpc-client.yaml in the list of includes in test-rpc.yaml and add the test-client test processor to the testing pipeline:
includes:
- rpc-server.yaml
- rpc-client.yaml
...
run:
- test-client
- test-rpc
Running the test configuration should now print something like this:
$ PYTHONPATH=. piped -nc test-rpc.yaml
2011-05-31 13:28:47+0200 [-] Log opened.
2011-05-31 13:28:47+0200 [-] twistd 11.0.0 (/Users/username/.virtualenvs/Piped/bin/python2.7 2.7.1) starting up.
2011-05-31 13:28:47+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2011-05-31 13:28:47+0200 [-] /Users/username/pydev/Piped/piped/conf.py:25: piped.exceptions.ConfigurationWarning:
WARNING: configuration file loaded multiple times: "queues.yaml"
2011-05-31 13:28:47+0200 [-] twisted.web.server.Site starting on 8080
2011-05-31 13:28:47+0200 [-] Starting factory <twisted.web.server.Site instance at 0x102989bd8>
2011-05-31 13:28:48+0200 [status_test] rpc_tutorial.test_rpc
2011-05-31 13:28:48+0200 [status_test] TestRPC
2011-05-31 13:28:48+0200 [-] statustest_single_number ... 127.0.0.1 - - [31/May/2011:11:28:47 +0000] "GET /?n=42 HTTP/1.0" 200 40 "-" "Twisted PageGetter"
2011-05-31 13:28:48+0200 [status_test] [OK]
2011-05-31 13:28:48+0200 [-] statustest_ten_numbers ... 127.0.0.1 - - [31/May/2011:11:28:47 +0000] "GET /?n=1&n=2&n=3&n=4&n=5&n=6&n=7&n=8&n=9&n=10 HTTP/1.0" 200 42 "-" "Twisted PageGetter"
2011-05-31 13:28:48+0200 [status_test] [OK]
2011-05-31 13:28:48+0200 [status_test] rpc_tutorial.test_client
2011-05-31 13:28:48+0200 [status_test] TestClient
2011-05-31 13:28:48+0200 [status_test] statustest_simple_square ... [OK]
2011-05-31 13:28:48+0200 [status_test] statustest_simple_sum ... [OK]
2011-05-31 13:28:48+0200 [status_test]
2011-05-31 13:28:48+0200 [status_test] -------------------------------------------------------------------------------
2011-05-31 13:28:48+0200 [status_test] Ran 4 tests in 0.087s
2011-05-31 13:28:48+0200 [status_test]
2011-05-31 13:28:48+0200 [status_test] PASSED (successes=4)
2011-05-31 13:28:48+0200 [-] (TCP Port 8080 Closed)
2011-05-31 13:28:48+0200 [-] Stopping factory <twisted.web.server.Site instance at 0x102989bd8>
2011-05-31 13:28:48+0200 [-] Main loop terminated.
2011-05-31 13:28:48+0200 [-] Server Shut Down.
In this section, we will extend our rpc-server to use workers to perform its computations.
The workers operates independently of our server, so we have to handle partial responses from the workers before writing the finished response to the client.
includes:
- queues.yaml
zmq:
queues:
server_sum_in:
processor: pipeline.zmq.partial_response
server_square_in:
processor: pipeline.zmq.partial_response
web:
site:
port: 8080
debug:
allow:
- localhost
routing:
__config__:
processor: pipeline.web.compute
# We use a shared context (that starts out as an empty dictionary) to
# store requests that haven't been completed yet.
contexts:
pending: {}
pipelines:
web:
compute:
- fetch-context:
context: pending
- exec-code:
code: |
request = baton['request']
request_id = id(request)
# store the baton by the request id in the pending context
baton['pending'][request_id] = baton
# return a baton that can be json-encoded and sent to the workers
return dict(id=request_id, numbers=[int(a) for a in request.args['n']])
- encode-json
# send the encoded baton to both the sum and the square output queue.
- send-zmq-message:
queue_name: server_sum_out
retries: 10
- send-zmq-message:
queue_name: server_square_out
retries: 10
write-response:
- eval-lambda:
output_path: content
lambda: "baton: dict(sum=baton['sum'], square=baton['square'], diff=baton['square']-baton['sum'])"
- encode-json:
input_path: content
- write-web-response
zmq:
partial_response:
# the workers send their answer as a json-encoded baton
- decode-json
- fetch-context:
context: pending
# there's nothing we can do if we cant find a pending baton with the given id:
- stop:
decider: "baton: baton.get('id') not in baton['pending']"
# retrieve our pending baton
- eval-lambda:
output_path: pending_baton
lambda: "baton: baton['pending'][baton['id']]"
# copy the sum/square to the pending baton if sum/square exists:
- remap:
mapping:
sum: pending_baton.sum
square: pending_baton.square
# if we don't have a complete response to the client, we can't continue
- stop:
input_path: pending_baton
decider: "pending_baton: 'sum' not in pending_baton or 'square' not in pending_baton"
# remove the request from the pending requests
- eval-lambda:
output_path: null
lambda: "baton: baton['pending'].pop(baton['id'])"
- run-pipeline:
input_path: pending_baton
pipeline: web.write-response
We can verify that this implementation works the same way as the previous one by running our system tests.
If a server has no workers or messages to or from the workers are lost, a client may be waiting for a request indefinitely. To avoid this, we need to be able to remove requests that have been pending for too long without being finished.
This is a three step process:
The first step can be done by adding a processor to the web.compute pipeline before the exec-code processor:
- eval-lambda:
namespace:
time: time.time
output_path: added
lambda: "_: time()"
To regularily check the for old batons in the pending context, we create a tick interval that will generate batons and process them in a reap_pending_batons pipeline on a regular interval. We create rpc-server-reaper.yaml with the following contents:
ticks:
interval:
reaper:
# create a baton every second and process it in the reap-pending-batons pipeline:
interval: 1
processor: pipeline.reap-pending-batons
pipelines:
reap-pending-batons:
- fetch-context:
context: pending
- exec-code:
input_path: pending
output_path: reap_batons
namespace:
time: time.time
code: |
# give requests two seconds to complete:
cutoff = time() - 2
# create a list of batons that have exceeded the timeout:
reap_batons = list()
for id, pending_baton in input.items():
if pending_baton['added'] < cutoff:
# we found a baton that was too old, so we remove it from the pending dictionary and add it
# to the list of batons we're going to reap
input.pop(id)
reap_batons.append(pending_baton)
return reap_batons
# process every baton returned by the above processor in the reap-baton pipeline:
- for-each:
input_path: reap_batons
pipeline: reap-baton
fail_on_error: true
reap-baton:
# we reap a baton writing a simple response to the client.
- write-web-response:
fallback_content: '{"success": false}'
Insert the above processor into the web.work pipeline and append rpc-server-reaper.yaml to the list of includes in rpc-server.yaml.
class TestRPCWithoutWorkersProcessor(processors.StatusTestProcessor):
interface.classProvides(processing.IProcessor)
name = 'test-rpc-without-workers'
class TestRPCWithoutWorkers(statustest.StatusTestCase):
def setUp(self, poller):
self.poller = poller
# we temporarily stop the zmq poller to prevent the workers from receiving any messages
self.poller.stopService()
def tearDown(self):
self.poller.startService()
@defer.inlineCallbacks
def statustest_single_number(self):
started = time.time()
result = yield client.getPage('http://localhost:8080/?n=42')
finished = time.time()
# the server should have given the workers two seconds to produce both partial responses:
self.assertTrue((finished-started) > 2)
# and our reap-baton pipeline should have provided us with a fallback response.
self.assertEquals(json.loads(result), dict(success=False))
def configure(self, runtime_environment):
self.dependencies = runtime_environment.create_dependency_map(self,
poller = dict(provider='zmq.poller')
)
def get_namespace(self, baton):
return dict(poller=self.dependencies.poller)
Add the test processor to the testing pipeline:
run:
- test-rpc
- test-client
- test-rpc-without-workers
Running the tests now should result in output resembling this:
$ PYTHONPATH=. piped -nc test-rpc.yaml
2011-05-31 15:52:35+0200 [-] Log opened.
2011-05-31 15:52:35+0200 [-] twistd 11.0.0 (/Users/username/.virtualenvs/Piped/bin/python2.7 2.7.1) starting up.
2011-05-31 15:52:35+0200 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2011-05-31 15:52:35+0200 [-] /Users/username/pydev/Piped/piped/conf.py:25: piped.exceptions.ConfigurationWarning:
WARNING: configuration file loaded multiple times: "queues.yaml"
2011-05-31 15:52:36+0200 [-] twisted.web.server.Site starting on 8080
2011-05-31 15:52:36+0200 [-] Starting factory <twisted.web.server.Site instance at 0x102e237a0>
2011-05-31 15:52:36+0200 [status_test] rpc_tutorial.test_rpc
2011-05-31 15:52:36+0200 [status_test] TestRPC
2011-05-31 15:52:36+0200 [-] statustest_single_number ... 127.0.0.1 - - [31/May/2011:13:52:36 +0000] "GET /?n=42 HTTP/1.0" 200 40 "-" "Twisted PageGetter"
2011-05-31 15:52:36+0200 [status_test] [OK]
2011-05-31 15:52:36+0200 [-] statustest_ten_numbers ... 127.0.0.1 - - [31/May/2011:13:52:36 +0000] "GET /?n=1&n=2&n=3&n=4&n=5&n=6&n=7&n=8&n=9&n=10 HTTP/1.0" 200 42 "-" "Twisted PageGetter"
2011-05-31 15:52:36+0200 [status_test] [OK]
2011-05-31 15:52:36+0200 [status_test] rpc_tutorial.test_client
2011-05-31 15:52:36+0200 [status_test] TestClient
2011-05-31 15:52:36+0200 [status_test] statustest_simple_square ... [OK]
2011-05-31 15:52:36+0200 [status_test] statustest_simple_sum ... [OK]
2011-05-31 15:52:36+0200 [status_test] rpc_tutorial.test_rpc
2011-05-31 15:52:36+0200 [status_test] TestRPCWithoutWorkers
2011-05-31 15:52:39+0200 [-] statustest_single_number ... 127.0.0.1 - - [31/May/2011:13:52:39 +0000] "GET /?n=42 HTTP/1.0" 200 18 "-" "Twisted PageGetter"
2011-05-31 15:52:39+0200 [status_test] [OK]
2011-05-31 15:52:39+0200 [status_test]
2011-05-31 15:52:39+0200 [status_test] -------------------------------------------------------------------------------
2011-05-31 15:52:39+0200 [status_test] Ran 5 tests in 3.000s
2011-05-31 15:52:39+0200 [status_test]
2011-05-31 15:52:39+0200 [status_test] PASSED (successes=5)
2011-05-31 15:52:39+0200 [-] (TCP Port 8080 Closed)
2011-05-31 15:52:39+0200 [-] Stopping factory <twisted.web.server.Site instance at 0x102e237a0>
2011-05-31 15:52:39+0200 [-] Main loop terminated.
2011-05-31 15:52:39+0200 [-] Server Shut Down.