Available providers

Providers are classes that add functionality to a Piped process.

contexts

class piped.providers.context_provider.ContextProvider

Bases: object

Provides shared data as resources.

Example configuration:

contexts:
    my_context:
        foo: bar
    another_context: this is a string

The above contexts would be made available as context.my_context` and ``context.another_context.

class piped.providers.context_provider.PersistedContextProvider

Bases: object

Provides shared data as resources, which is persisted when the process is stopped.

Example configuration:

persisted_contexts:
    some_context:
        kind: json
        file: some_context.json
        initial_data: {}

The above contexts would be made available as persisted_context.some_context. When the service starts, it reads some_context.json, if it exists, and its contents are provided. If the file does not exist or the contents cannot be parsed, the data in initial_data is provided instead.

Shared state

Contexts can be used to implement shared state. In the above examples, my_context is a dictionary, and any changes to that dictionary will be immediately visible to all other users of the resource.

perspective broker

class piped.providers.spread_provider.PBServerProvider

Bases: object, twisted.application.service.MultiService

Provides batons from a twisted.spread.pb server.

For details on how perspective broker works, see the Perspective Broker section of the Twisted Documentation <http://twistedmatrix.com/documents/current/core/howto/pb-intro.html>.

Example configuration:

pb:
    servers:
        named_server:
            listen: tcp:8789
            processor: processor_name
            wait_for_processor: False # this is the default

            # if the processor does not callback the deferred, the following default is used
            # if the default is not provided, the deferred will be errbacked.
            default_callback: None

            # if a checker is specified, access to the processor will be restricted,
            # and the baton will contain the avatar_id.
            checker:
                name: twisted.cred.checkers.InMemoryUsernamePasswordDatabaseDontUse
                arguments:
                    username: password

Given the above configuration, a pb.PBServerFactory will be created to listen on the specified port. When methods are called on the server, a baton will be produced. If wait_for_processor is True, incoming batons are buffered until the processor becomes available, otherwise the client receives an errback immediately.

The baton contains the following keys:

message
The name of the function being called
args
list of arguments
kwargs
dict of keyword arguments
avatar_id
The username of the authenticated user. This key is only set if a checker was configured.
deferred
A twisted.internet.defer.Deferred object that the processor should callback or errback in order to produce a response to the client.
class piped.providers.spread_provider.PBClientProvider

Bases: object, twisted.application.service.MultiService

Provides twisted.spread.pb clients.

For details on how perspective broker works, see the Perspective Broker section of the Twisted Documentation <http://twistedmatrix.com/documents/current/core/howto/pb-intro.html>.

Example configuration:

pb:
    clients:
        named_client:
            endpoint: tcp:host=localhost:port=8789

            # if the remote server requires a login, specify the username and password:
            username: a_username
            password: a_password

Given the above configuration, two resources will be provided:

  • ‘pb.client.named_client’ which is a PipedPBClientFactory

  • ‘pb.client.named_client.root_object’, the root object (an

    twisted.spread.pb.RemoteReference instance.

pipelines

class piped.providers.pipeline_provider.PipelineProvider

Bases: object

Provides pipelines as resources.

Pipelines are provided as resources by the PipelineProvider. The pipelines are expected to be defined under the pipelines configuration key:

pipelines:
    my_pipeline:
        # pipeline definition
    another:
        nested_pipeline:
            # pipeline definition

The above pipelines would be made available as pipeline.my_pipeline and pipeline.another.nested-pipeline.

For the topic page about how to create pipelines, see Working with pipelines.

Using a pipeline from a processor

The following is an processor stub that processes a baton in a pipeline for every baton it processes:

def configure(self, runtime_environment):
    # add a dependency to the provided pipeline resource
    dm = runtime_environment.dependency_manager
    self.pipeline_dependency = dm.add_dependency(self, dict(provider='pipeline.my_pipeline'))

@defer.inlineCallbacks
def process(self, baton):
    # get the pipeline from the pipeline_dependency
    pipeline = pipeline_dependency.get_resource()

    # create the baton we're going to process:
    baton_to_process = dict(foo='bar')

    # wait until the pipeline has finished processing the baton_to_process:
    yield pipeline(baton_to_process)

    # return the baton unchanged
    defer.returnValue(baton)

For the topic page about dependencies, see Using dependencies.

smtp

class piped.providers.smtp_provider.SMTPProvider

Bases: object, twisted.application.service.MultiService

Provides an SMTP interface.

This provider uses strports to specify listening ports. For more information, see strports in the Twisted documentation.

Example configuration:

smtp:
    my_server:
        # a strport or a list of strports to listen to
        listen: tcp:10025

        # the processor that will handle the messages
        processor: processor_name

        # to enable STARTTLS, specify the following:
        tls:
            private_key: examples/server.key
            certificate: examples/server.crt

        # The following header is added to the email when it is received.
        # if this returns null, no header is added.
        # According to RFC 821, the Received header should be in the following form:
        #   Received: FROM domain BY domain [VIA link] [WITH protocol] [ID id] [FOR path];

        received_header: 'protocol, helo, origin, recipients: "Received: BY piped.localhost WITH SMTP"'

        # The namespaced used by the validators
        namespace:
            path: os.path

        # Validate the to and from addresses before the mail is accepted:
        validate_to: 'proto, user: True if str(user.dest) == "accepted@sender.com" else False'
            # user is a twisted.mail.smtp.User instance

        validate_from: 'proto, helo, origin: True if str(origin) == "accepted@recipient.com" else False'
            # helo is a tuple of (helo_string, client_ip)
            # origin is a twisted.mail.smtp.Address instance

        # create a checker for authenticating the sender. if a checker is
        # set, AUTH will be required before any mail is accepted. the server
        # will not accept plaintext auth over an unencrypted connection (use
        # either ssl or STARTTLS).
        checker:
            name: twisted.cred.checkers.InMemoryUsernamePasswordDatabaseDontUse
            arguments:
                username: password

The processor will be invoked with a dicts containing the following keys:

message
An email.message.Message instance.
from_addr
The senders email address.
to_addr
The recipients email address
avatar_id
The username of the authenticated user. This key is only set if a checker was configured.

system-events

class piped.providers.system_events_provider.SystemEventsProvider

Bases: object

Provides batons that are sent to processors when system events are triggered.

Example configuration:

system-events:
    startup:
        logical_name: processor_name
    shutdown:
        logical_name: another_processor_name

ticks

class piped.providers.tick_provider.TickProvider

Bases: object, twisted.application.service.MultiService

Provides tick-batons that are sent to processors at regular intervals.

Example configuration:

ticks:
    interval:
        any_name:
            interval: 120
            processor: processor_name
            auto_start: true # if true, starts the interval when the application starts.

The above will create a TickInterval that generates a baton every 120 seconds or every time the previous tick baton finished processing, whichever takes the longest.

web

The web provider enables running multiple production quality web servers inside the piped process that use pipelines in order to respond to incoming requests.

Configuration

class piped.providers.web_provider.WebResourceProvider

Bases: object, twisted.application.service.MultiService

Provides HTTP interfaces.

Example configuration:

web:
    my_site: # logical name of the web service
        enabled: true/false
        ... # the rest of the keys/values are used by the :class:`WebSite`
class piped.providers.web_provider.WebSite(site_name, site_configuration)

Bases: object, twisted.application.service.MultiService

A website in Piped.

Example site configuration:

my_site:
    port: 8080
    listen: ssl:1234 # overrides the "port" above, see :mod:`twisted.application.strports`
    log_exceptions: DEBUG # a piped.log debug level, or null. (default: null)
    debug: # configure debugging of the processors
        reap_interval: 60 # seconds
        max_inactive_time: 300 # seconds
        allow: # list of hostnames or ip addresses that are allowed to debug
            - localhost
    routing:
        # mapping that is used to look up resources based on the traversed url.
        # this is given to :class:`WebResource`
class piped.providers.web_provider.WebResource(site, routing)

Bases: twisted.web.resource.Resource

A routed web resource.

Example routing configuration:

my_site:
    routing:
        __config__:
            processor: processor_name # name of processor to run. the processor receives a baton containig the request
        nested:
            __config__:
                debug:
                    allow: [] # disables debugging of this processor, overriding the site-wide configuration
                oricessir: another_processor_name
        sparse:
            __config__:
                no_resource_processor: sparse_processor
        js:
            __config__:
                static: ~/js
            foo:
                __config__:
                    processor: foo_processor
                    static:
                        path: ~/js/foo
                        namespace:
                            now: datetime.datetime.utcnow
                            delta: datetime.timedelta
                        preprocessors:
                            expires: "request: request.setHeader('expires', (now()+delta(seconds=8600)).strftime('%a, %d %b %Y %H:%M:%S UTC'))"
                            cache-control: "request: request.setHeader('cache-control', 'public,max-age=8600')"

The __config__ may contain the following keys:

processor

Makes a processor available at this resource. Accessing this resource directly causes the request object to be passed into the specified processor. The baton is on the form:

baton = dict(request=request_object)

The processor is expected to call .finish() on the request when the processing is complete. If the processing raises an Exception, the request will be closed automatically and debugging will become available if debugging is enabled and the client is allowed to debug.

no_resource_processor

Works similar to the processor configuration key, but is used when another resource was not found for request for either this path or any children. If this processor is used to handle child paths without any explicit resources, the request instance will contain a non-empty postpath instance variable, which is a list of child path elements, relative to its location in the routing.

This can be used to create a sparse web site routing.

static

Makes static resources such as files and directories available under this resource. This option may be a mapping on the form:

path: some_path # required
namespace: # optional, namespace used for the preprocessors
    time: time
preprocessors: # a dict of name -> preprocessor definition
    logical_name: "request: request.setHeader('serviced-at', time.time())"
concatenated

Creates a virtually concatenated file:

file_paths:
    - file_a.js
    - file_b.js
content_type: text/javascript
debug
Overrides the site-wide debug option for this processor. Only applies if a processor is specified.

Accessing the following resources with the above configuration gives:

Responding to requests

The baton that is processed in the pipeline, contains the request object, which is an twisted.web.server.Request instance.

Example pipeline that returns the clients host name:

pipelines:
    web:
        show-host:
            - eval-lambda:
                input_path: request
                output_path: host
                lambda: "request: request.getHost()"
            - write-web-response:
                body_path: host

The first processor fetches the host name from the request, and the second processor writes the response to the client.

Debugging

When the pipeline processing results in an Exception, the WebResource takes care of setting a proper response code and close the request.

If debugging is turned off or if the client is not allowed to debug, a short html page saying Processing Failed is returned.

If debugging is turned on and the client is allowed to debug, the Exception traceback is registered with a WebDebugger, which enables the client to evaluate python expressions at every frame in the traceback.

Warning

Enabling debugging allows clients to execute arbitrary code within the Piped process.

Debugging can be turned on by using the debug key on the web site:

web:
    my_site:
        ...
        debug:
            allow: # list of hostnames/ip addresses
                - localhost
        ...

Warning

Keep in mind that this uses the hostname / ip address as seen by the web server inside the Piped process, which may not be correct if for example an proxy is used to access the web server.

Better tracebacks

More detailed tracebacks with live objects are available if piped -D is used to start the process.

Contrib providers

amqp

Connecting

class piped_amqp.providers.AMQPConnectionProvider

Bases: object, twisted.application.service.MultiService

Provides AMQP connections.

Example:

amqp:
    connections:
        connection_name:
            servers:
                - tcp:host=localhost:port=5672
            max_idle_time: 10 # reconnect if idle more than 10 seconds
            parameters:
                heartbeat: 3 # heartbeat every 3 seconds

The above configuration will result in a connected AMQProtocol being provided as amqp.connection.connection_name.

See AMQPConnection for more details on the configuration options.

class piped_amqp.providers.AMQPConnection(name, servers, max_idle_time=None, reconnect_interval=1, parameters=None)

Bases: object, twisted.application.service.MultiService

AMQP connection wrapper.

Parameters:
  • name – Logical name of the connection.
  • servers

    List of endpoints to connect to. The endpoints are tried in a round-robin fashion. See twisted.internet.endpoints.clientFromString for details on the format.

    Example: tcp:host=localhost:port=5672

  • max_idle_time – The time (in seconds) the connection can be idle (not sending or receiving data) before it is forcibly closed.
  • reconnect_interval – How long to sleep before reconnecting.
  • parameters – Used to create the connection parameters. See the pika documenation.
class piped_amqp.providers.AMQProtocol(parameters)

Bases: pika.adapters.twisted_connection.TwistedProtocolConnection

The AMQP protocol used by Piped.

get_named_channel(*args, **kwargs)

Utility method that provides easy access to shared channels.

Parameters:channel_name – A logical named used to identify the channel.
Returns:A deferred that callbacks with the channel or errbacks with a failure that describes the reason the channel is unavailable.

Consuming

class piped_amqp.providers.AMQPConsumerProvider

Bases: object, twisted.application.service.MultiService

Consumes message from AMQP queues.

Example configuration:

amqp:
    connections:
        connection_name:
            basic_consumers:
                consumer_name:
                    queue: name_of_queue_to_consume
                    qos: # see http://www.rabbitmq.com/amqp-0-9-1-quickref.html#basic.qos
                        prefetch_count: 200

See AMQPConsumer for more details about available consumer configuration options.

class piped_amqp.providers.AMQPConsumer(name, processor, connection, queue=None, qos=None, no_ack=False, exclusive=False, ack_after_failed_processing=False, ack_after_successful_processing=True, nack_after_failed_processing=True, channel_reopen_interval=1, log_processor_exceptions='warn')

Bases: object, twisted.application.service.Service

Consumes messages from an AMQP queue and processes them with a processor.

Parameters:
  • name – Logical name of this consumer.
  • processor – The processor used to process the messages.
  • connection – Name of the connection.
  • queue – Either a name (string) or a queue declaration (dict). See queue_declare.
  • qos – Used to specify the QOS on the channel.
  • no_ack – Informs the broker that we do not intend to ack consumed messages. If no_ack is true, ack_after/nack_after settings are ignored.
  • ack_after_successful_processing – If true, acks messages if processing finishes without errbacking.
  • nack_after_failed_processing – If true, rejects messages if processing errbacks.
  • ack_after_failed_processing – If true, acks messages even if processing errbacks.
  • exclusive – Request exclusive consumer access, meaning only this consumer can access the queue.
  • channel_reopen_interval – Time (in seconds) to wait before reopening the consuming channel if it closes.
  • log_processor_exceptions – Log level for exceptions raised by our processor. Set to None to disable.

RPC helpers

class piped_amqp.rpc.RPCClientProvider

Bases: object, twisted.application.service.MultiService

Provides AMQP-based RPC clients.

Configuration example:

amqp:
    rpc_clients:
        my_client:
            type: your_package.your_module.RPCClient
            consumer:
                queue:
                    queue: explicit_queue_name
                    exclusive: true

See RPCClientBase, which may serve as an useful base-class.

class piped_amqp.rpc.RPCClientBase(name, connection, reopen_consumer_channel_interval=1, consumer=None)

Bases: object, twisted.application.service.Service

Base class for RPC client implementations.

Parameters:
  • name – Logical name of this rpc client.
  • connection – Name of the AMQP connection.
  • consumer

    Configuration options for the consuming:

    no_ack
    Whether the server should require the consumer to ack the responses. Defaults to True.
    exclusive
    Request exclusive consumer access, meaning only this consumer can access the queue. Defaults to True.
    queue
    Name of the response queue (string) or a queue declaration (dict). Defaults to the empty string, which creates an anonymous queue. See queue_declare.
  • reopen_consumer_channel_interval – The number of seconds to wait before reopening the consumer channel if it is closed.

cyclone

class piped_cyclone.providers.CycloneProvider

Bases: object

Provides support for running Cyclone applications within Piped.

For more in-depth documentation about Cyclone see: Cyclone on GitHub, and the Tornado documentation.

Configuration example:

cyclone:
    site_name:
        listen: 8888
        type: cyclone.web.Application # or a fully qualified name of a subclass
        application:
            handlers:
                # a tuple: pattern, handler, kwargs (optional), name (optional)
                - ['/pattern', name.of.RequestHandler]
                # a dict
                - pattern: /another/(?P<pattern>.*)
                  handler: name.of.RequestHandler
                  name: handler_name # optional
                  kwargs: # optional
                    foo: 123

            debug: true # see the debugging section below.
            debug_allow: # optional list of ip addresses that are allowed to see tracebacks and the interactive debugger.
                - 127.0.0.1
            debug_timeout: 60 # (default), time in seconds to keep debuggers resident in memory before garbage collecting them.
            debug_template: debugger.html  # name of the debugger template to use. if not set, uses the built-in template.

The listen parameter is a Twisted strport, which can be used to declare which interfaces the server should be listening on, SSL parameters and more. For more information, see strports in the Twisted documentation.

Each handler may be one of the following:

  • Fully qualified class name of a RequestHandler instance. The class will be invoked with cls.configure(runtime_environment) once if the method is defined. This allows for the class to perform any necessary setup / bootstrapping into the runtime_environment.

  • A dict with a class key set: Same as the above.

  • A dict with the provider key set. The provider is assumed to provide a subclass of RequestHandler as the resource.

    If the provider starts with pipeline., the provided resource will not be used as a RequestHandler, but will be called with resource.__call__(baton). The baton contains the following keys:

    • handler: A RequestHandler that is handling the request.
    • args: A tuple of arguments from the url pattern
    • kwargs: A dict of keyword arguments from the url pattern

    The pipeline should take care of calling baton['handler'].finish() when the request is finished.

If the application setting ui_modules is a string, it will be loaded to a python object via twisted.python.reflect.namedAny(). If it is a dict, all values are assumed to be strings that are fully qualified name of cyclone.web.UIModule classes.

Application settings that end with _path and are twisted.python.filepath.FilePath instances will be converted to absolute paths. This enables the use of the !path-constructor in the configuration files.

Global dependencies can be added to the application settings under the piped_dependencies key, which should be a dict. This dict will be converted to a DependencyMap before being given to the cyclone.web.Application instance as a setting.

In order to enable debugging tracebacks from the web server, configure the cyclone application as in the example configuration above and make sure your request handlers subclasses piped_cyclone.handlers.DebuggableHandler.

Warning

Enabling debugging enables allowed clients to execute arbitrary code as the user running piped.

To make stack traces keep their frame information, use piped -D when launching piped.

class piped_cyclone.handlers.DebuggableHandler(application, request, transforms=None, **kwargs)

Bases: cyclone.web.RequestHandler

A request handler that lets allowed users view full tracebacks and execute code interactively to inspect the traceback frames.

Subclasses must make sure that prepare() is called. If you want to require login in addition to the built-in ip-based checks, subclass this class and override post_debug() and get_debug():

class AuthingDebuggableHandler(web.RequestHandler):
    @web.authenticated
    def post_debug(self, *a, **kw):
        super(AuthingDebuggableHandler, self).post_debug(*a, **kw)

    @web.authenticated
    def get_debug(self, *a, **kw):
        super(AuthingDebuggableHandler, self).get_debug(*a, **kw)
get_debug(*a, **kw)

Returns a HTML-page that serves as a front-end to the debugger.

post_debug(*a, **kw)

Executes an expression within a debugger frame.

prepare()

Override this request handlers methods if the user tries to access a debugger.

class piped_cyclone.handlers.PipedRequestHandlerProxy(dependency)

Bases: object

A simple proxy that uses a dependency to process web requests.

Assumes the provided resource acts like a cyclone.web.RequestHandler.

class piped_cyclone.handlers.PipelineRequestHandler(*args, **kwargs)

Bases: piped_cyclone.handlers.DebuggableHandler

Request handler for requests that will be served through pipelines.

database

manholes

class piped_manhole.providers.ManholeProvider

Bases: object, twisted.application.service.MultiService

Embeds manholes in Piped services.

Configuration example:

manholes:
    my_manhole:
        enabled: true # defaults to true
        port: 10022 # defaults to 10022
        keys:
            public_key_file: path # or public_key: str
            private_key_file: path # or private_key: str
        checkers: # multiple checkers are allowed
            inmemory:
                checker: twisted.cred.checkers.InMemoryUsernamePasswordDatabaseDontUse
                arguments:
                    username: password

zmq

class piped_zmq.providers.ZMQSocketProvider

Bases: object

I provide raw ZMQ-sockets with the paths “zmq.socket.queue_name”.

I expect sockets to be configured like the following:

zmq:
    queues:
        queue_name:
            type: PULL/PUSH/etc.
            sockopts:
                - key: HWM
                  value: 100
            connects:
                - protocol://interface:port
                - protocol://interface:port
            binds:
                - protocol://interface:port
                - protocol://interface:port

The type must be a valid ZeroMQ socket type.

context_factory

alias of Context

class piped_zmq.providers.ZMQProcessorFeederProvider

Bases: object

I create ZMQProcessorFeeders for zmq queues with processors.

Example configuration:

zmq:
    poll_timeout: 100 # poll timeout in milliseconds
    queues:
        queue_name:
            processor: processor_name

zookeeper

class piped_zookeeper.providers.ZookeeperClientProvider

Bases: object, twisted.application.service.MultiService

Zookeeper support for Piped services.

Configuration example:

zookeeper:
    install_log_stream: true # default. handles the zookeeper log stream with piped.log
    clients:
        my_client:
            servers: localhost:2181
            events:
                starting: my_processor

Available keys for events are: ‘starting’, ‘stopping’, ‘connected’, ‘reconnecting’, ‘reconnected’, ‘expired’