Providers are classes that add functionality to a Piped process.
Bases: object
Provides shared data as resources.
Example configuration:
contexts:
my_context:
foo: bar
another_context: this is a string
The above contexts would be made available as context.my_context` and ``context.another_context.
Bases: object
Provides shared data as resources, which is persisted when the process is stopped.
Example configuration:
persisted_contexts:
some_context:
kind: json
file: some_context.json
initial_data: {}
The above contexts would be made available as persisted_context.some_context. When the service starts, it reads some_context.json, if it exists, and its contents are provided. If the file does not exist or the contents cannot be parsed, the data in initial_data is provided instead.
Bases: object, twisted.application.service.MultiService
Provides batons from a twisted.spread.pb server.
For details on how perspective broker works, see the Perspective Broker section of the Twisted Documentation <http://twistedmatrix.com/documents/current/core/howto/pb-intro.html>.
Example configuration:
pb:
servers:
named_server:
listen: tcp:8789
processor: processor_name
wait_for_processor: False # this is the default
# if the processor does not callback the deferred, the following default is used
# if the default is not provided, the deferred will be errbacked.
default_callback: None
# if a checker is specified, access to the processor will be restricted,
# and the baton will contain the avatar_id.
checker:
name: twisted.cred.checkers.InMemoryUsernamePasswordDatabaseDontUse
arguments:
username: password
Given the above configuration, a pb.PBServerFactory will be created to listen on the specified port. When methods are called on the server, a baton will be produced. If wait_for_processor is True, incoming batons are buffered until the processor becomes available, otherwise the client receives an errback immediately.
The baton contains the following keys:
Bases: object, twisted.application.service.MultiService
Provides twisted.spread.pb clients.
For details on how perspective broker works, see the Perspective Broker section of the Twisted Documentation <http://twistedmatrix.com/documents/current/core/howto/pb-intro.html>.
Example configuration:
pb:
clients:
named_client:
endpoint: tcp:host=localhost:port=8789
# if the remote server requires a login, specify the username and password:
username: a_username
password: a_password
Given the above configuration, two resources will be provided:
‘pb.client.named_client’ which is a PipedPBClientFactory
- ‘pb.client.named_client.root_object’, the root object (an
twisted.spread.pb.RemoteReference instance.
Bases: object
Provides pipelines as resources.
Pipelines are provided as resources by the PipelineProvider. The pipelines are expected to be defined under the pipelines configuration key:
pipelines:
my_pipeline:
# pipeline definition
another:
nested_pipeline:
# pipeline definition
The above pipelines would be made available as pipeline.my_pipeline and pipeline.another.nested-pipeline.
For the topic page about how to create pipelines, see Working with pipelines.
The following is an processor stub that processes a baton in a pipeline for every baton it processes:
def configure(self, runtime_environment):
# add a dependency to the provided pipeline resource
dm = runtime_environment.dependency_manager
self.pipeline_dependency = dm.add_dependency(self, dict(provider='pipeline.my_pipeline'))
@defer.inlineCallbacks
def process(self, baton):
# get the pipeline from the pipeline_dependency
pipeline = pipeline_dependency.get_resource()
# create the baton we're going to process:
baton_to_process = dict(foo='bar')
# wait until the pipeline has finished processing the baton_to_process:
yield pipeline(baton_to_process)
# return the baton unchanged
defer.returnValue(baton)
For the topic page about dependencies, see Using dependencies.
Bases: object, twisted.application.service.MultiService
Provides an SMTP interface.
This provider uses strports to specify listening ports. For more information, see strports in the Twisted documentation.
Example configuration:
smtp:
my_server:
# a strport or a list of strports to listen to
listen: tcp:10025
# the processor that will handle the messages
processor: processor_name
# to enable STARTTLS, specify the following:
tls:
private_key: examples/server.key
certificate: examples/server.crt
# The following header is added to the email when it is received.
# if this returns null, no header is added.
# According to RFC 821, the Received header should be in the following form:
# Received: FROM domain BY domain [VIA link] [WITH protocol] [ID id] [FOR path];
received_header: 'protocol, helo, origin, recipients: "Received: BY piped.localhost WITH SMTP"'
# The namespaced used by the validators
namespace:
path: os.path
# Validate the to and from addresses before the mail is accepted:
validate_to: 'proto, user: True if str(user.dest) == "accepted@sender.com" else False'
# user is a twisted.mail.smtp.User instance
validate_from: 'proto, helo, origin: True if str(origin) == "accepted@recipient.com" else False'
# helo is a tuple of (helo_string, client_ip)
# origin is a twisted.mail.smtp.Address instance
# create a checker for authenticating the sender. if a checker is
# set, AUTH will be required before any mail is accepted. the server
# will not accept plaintext auth over an unencrypted connection (use
# either ssl or STARTTLS).
checker:
name: twisted.cred.checkers.InMemoryUsernamePasswordDatabaseDontUse
arguments:
username: password
The processor will be invoked with a dicts containing the following keys:
Bases: object
Provides batons that are sent to processors when system events are triggered.
Example configuration:
system-events:
startup:
logical_name: processor_name
shutdown:
logical_name: another_processor_name
Bases: object, twisted.application.service.MultiService
Provides tick-batons that are sent to processors at regular intervals.
Example configuration:
ticks:
interval:
any_name:
interval: 120
processor: processor_name
auto_start: true # if true, starts the interval when the application starts.
The above will create a TickInterval that generates a baton every 120 seconds or every time the previous tick baton finished processing, whichever takes the longest.
See also
The web provider enables running multiple production quality web servers inside the piped process that use pipelines in order to respond to incoming requests.
Bases: object, twisted.application.service.MultiService
Provides HTTP interfaces.
Example configuration:
web:
my_site: # logical name of the web service
enabled: true/false
... # the rest of the keys/values are used by the :class:`WebSite`
Bases: object, twisted.application.service.MultiService
A website in Piped.
Example site configuration:
my_site:
port: 8080
listen: ssl:1234 # overrides the "port" above, see :mod:`twisted.application.strports`
log_exceptions: DEBUG # a piped.log debug level, or null. (default: null)
debug: # configure debugging of the processors
reap_interval: 60 # seconds
max_inactive_time: 300 # seconds
allow: # list of hostnames or ip addresses that are allowed to debug
- localhost
routing:
# mapping that is used to look up resources based on the traversed url.
# this is given to :class:`WebResource`
Bases: twisted.web.resource.Resource
A routed web resource.
Example routing configuration:
my_site:
routing:
__config__:
processor: processor_name # name of processor to run. the processor receives a baton containig the request
nested:
__config__:
debug:
allow: [] # disables debugging of this processor, overriding the site-wide configuration
oricessir: another_processor_name
sparse:
__config__:
no_resource_processor: sparse_processor
js:
__config__:
static: ~/js
foo:
__config__:
processor: foo_processor
static:
path: ~/js/foo
namespace:
now: datetime.datetime.utcnow
delta: datetime.timedelta
preprocessors:
expires: "request: request.setHeader('expires', (now()+delta(seconds=8600)).strftime('%a, %d %b %Y %H:%M:%S UTC'))"
cache-control: "request: request.setHeader('cache-control', 'public,max-age=8600')"
The __config__ may contain the following keys:
Makes a processor available at this resource. Accessing this resource directly causes the request object to be passed into the specified processor. The baton is on the form:
baton = dict(request=request_object)
The processor is expected to call .finish() on the request when the processing is complete. If the processing raises an Exception, the request will be closed automatically and debugging will become available if debugging is enabled and the client is allowed to debug.
Works similar to the processor configuration key, but is used when another resource was not found for request for either this path or any children. If this processor is used to handle child paths without any explicit resources, the request instance will contain a non-empty postpath instance variable, which is a list of child path elements, relative to its location in the routing.
This can be used to create a sparse web site routing.
Makes static resources such as files and directories available under this resource. This option may be a mapping on the form:
path: some_path # required
namespace: # optional, namespace used for the preprocessors
time: time
preprocessors: # a dict of name -> preprocessor definition
logical_name: "request: request.setHeader('serviced-at', time.time())"
Creates a virtually concatenated file:
file_paths:
- file_a.js
- file_b.js
content_type: text/javascript
Accessing the following resources with the above configuration gives:
The baton that is processed in the pipeline, contains the request object, which is an twisted.web.server.Request instance.
Example pipeline that returns the clients host name:
pipelines:
web:
show-host:
- eval-lambda:
input_path: request
output_path: host
lambda: "request: request.getHost()"
- write-web-response:
body_path: host
The first processor fetches the host name from the request, and the second processor writes the response to the client.
When the pipeline processing results in an Exception, the WebResource takes care of setting a proper response code and close the request.
If debugging is turned off or if the client is not allowed to debug, a short html page saying Processing Failed is returned.
If debugging is turned on and the client is allowed to debug, the Exception traceback is registered with a WebDebugger, which enables the client to evaluate python expressions at every frame in the traceback.
Warning
Enabling debugging allows clients to execute arbitrary code within the Piped process.
Debugging can be turned on by using the debug key on the web site:
web:
my_site:
...
debug:
allow: # list of hostnames/ip addresses
- localhost
...
Warning
Keep in mind that this uses the hostname / ip address as seen by the web server inside the Piped process, which may not be correct if for example an proxy is used to access the web server.
Bases: object, twisted.application.service.MultiService
Provides AMQP connections.
Example:
amqp:
connections:
connection_name:
servers:
- tcp:host=localhost:port=5672
max_idle_time: 10 # reconnect if idle more than 10 seconds
parameters:
heartbeat: 3 # heartbeat every 3 seconds
The above configuration will result in a connected AMQProtocol being provided as amqp.connection.connection_name.
See AMQPConnection for more details on the configuration options.
Bases: object, twisted.application.service.MultiService
AMQP connection wrapper.
| Parameters: |
|
|---|
Bases: pika.adapters.twisted_connection.TwistedProtocolConnection
The AMQP protocol used by Piped.
Utility method that provides easy access to shared channels.
| Parameters: | channel_name – A logical named used to identify the channel. |
|---|---|
| Returns: | A deferred that callbacks with the channel or errbacks with a failure that describes the reason the channel is unavailable. |
Bases: object, twisted.application.service.MultiService
Consumes message from AMQP queues.
Example configuration:
amqp:
connections:
connection_name:
basic_consumers:
consumer_name:
queue: name_of_queue_to_consume
qos: # see http://www.rabbitmq.com/amqp-0-9-1-quickref.html#basic.qos
prefetch_count: 200
See AMQPConsumer for more details about available consumer configuration options.
Bases: object, twisted.application.service.Service
Consumes messages from an AMQP queue and processes them with a processor.
| Parameters: |
|
|---|
Bases: object, twisted.application.service.MultiService
Provides AMQP-based RPC clients.
Configuration example:
amqp:
rpc_clients:
my_client:
type: your_package.your_module.RPCClient
consumer:
queue:
queue: explicit_queue_name
exclusive: true
See RPCClientBase, which may serve as an useful base-class.
Bases: object, twisted.application.service.Service
Base class for RPC client implementations.
| Parameters: |
|
|---|
Bases: object
Provides support for running Cyclone applications within Piped.
For more in-depth documentation about Cyclone see: Cyclone on GitHub, and the Tornado documentation.
Configuration example:
cyclone:
site_name:
listen: 8888
type: cyclone.web.Application # or a fully qualified name of a subclass
application:
handlers:
# a tuple: pattern, handler, kwargs (optional), name (optional)
- ['/pattern', name.of.RequestHandler]
# a dict
- pattern: /another/(?P<pattern>.*)
handler: name.of.RequestHandler
name: handler_name # optional
kwargs: # optional
foo: 123
debug: true # see the debugging section below.
debug_allow: # optional list of ip addresses that are allowed to see tracebacks and the interactive debugger.
- 127.0.0.1
debug_timeout: 60 # (default), time in seconds to keep debuggers resident in memory before garbage collecting them.
debug_template: debugger.html # name of the debugger template to use. if not set, uses the built-in template.
The listen parameter is a Twisted strport, which can be used to declare which interfaces the server should be listening on, SSL parameters and more. For more information, see strports in the Twisted documentation.
Each handler may be one of the following:
Fully qualified class name of a RequestHandler instance. The class will be invoked with cls.configure(runtime_environment) once if the method is defined. This allows for the class to perform any necessary setup / bootstrapping into the runtime_environment.
A dict with a class key set: Same as the above.
A dict with the provider key set. The provider is assumed to provide a subclass of RequestHandler as the resource.
If the provider starts with pipeline., the provided resource will not be used as a RequestHandler, but will be called with resource.__call__(baton). The baton contains the following keys:
- handler: A RequestHandler that is handling the request.
- args: A tuple of arguments from the url pattern
- kwargs: A dict of keyword arguments from the url pattern
The pipeline should take care of calling baton['handler'].finish() when the request is finished.
If the application setting ui_modules is a string, it will be loaded to a python object via twisted.python.reflect.namedAny(). If it is a dict, all values are assumed to be strings that are fully qualified name of cyclone.web.UIModule classes.
Application settings that end with _path and are twisted.python.filepath.FilePath instances will be converted to absolute paths. This enables the use of the !path-constructor in the configuration files.
Global dependencies can be added to the application settings under the piped_dependencies key, which should be a dict. This dict will be converted to a DependencyMap before being given to the cyclone.web.Application instance as a setting.
In order to enable debugging tracebacks from the web server, configure the cyclone application as in the example configuration above and make sure your request handlers subclasses piped_cyclone.handlers.DebuggableHandler.
Warning
Enabling debugging enables allowed clients to execute arbitrary code as the user running piped.
To make stack traces keep their frame information, use piped -D when launching piped.
Bases: cyclone.web.RequestHandler
A request handler that lets allowed users view full tracebacks and execute code interactively to inspect the traceback frames.
Subclasses must make sure that prepare() is called. If you want to require login in addition to the built-in ip-based checks, subclass this class and override post_debug() and get_debug():
class AuthingDebuggableHandler(web.RequestHandler):
@web.authenticated
def post_debug(self, *a, **kw):
super(AuthingDebuggableHandler, self).post_debug(*a, **kw)
@web.authenticated
def get_debug(self, *a, **kw):
super(AuthingDebuggableHandler, self).get_debug(*a, **kw)
Returns a HTML-page that serves as a front-end to the debugger.
Executes an expression within a debugger frame.
Override this request handlers methods if the user tries to access a debugger.
Bases: object
A simple proxy that uses a dependency to process web requests.
Assumes the provided resource acts like a cyclone.web.RequestHandler.
Bases: piped_cyclone.handlers.DebuggableHandler
Request handler for requests that will be served through pipelines.
Bases: object, twisted.application.service.MultiService
Embeds manholes in Piped services.
Configuration example:
manholes:
my_manhole:
enabled: true # defaults to true
port: 10022 # defaults to 10022
keys:
public_key_file: path # or public_key: str
private_key_file: path # or private_key: str
checkers: # multiple checkers are allowed
inmemory:
checker: twisted.cred.checkers.InMemoryUsernamePasswordDatabaseDontUse
arguments:
username: password
Bases: object
I provide raw ZMQ-sockets with the paths “zmq.socket.queue_name”.
I expect sockets to be configured like the following:
zmq:
queues:
queue_name:
type: PULL/PUSH/etc.
sockopts:
- key: HWM
value: 100
connects:
- protocol://interface:port
- protocol://interface:port
binds:
- protocol://interface:port
- protocol://interface:port
The type must be a valid ZeroMQ socket type.
alias of Context
Bases: object
I create ZMQProcessorFeeders for zmq queues with processors.
Example configuration:
zmq:
poll_timeout: 100 # poll timeout in milliseconds
queues:
queue_name:
processor: processor_name
Bases: object, twisted.application.service.MultiService
Zookeeper support for Piped services.
Configuration example:
zookeeper:
install_log_stream: true # default. handles the zookeeper log stream with piped.log
clients:
my_client:
servers: localhost:2181
events:
starting: my_processor
Available keys for events are: ‘starting’, ‘stopping’, ‘connected’, ‘reconnecting’, ‘reconnected’, ‘expired’