Available processors

Utility processors

call-dependency

class piped.processors.util_processors.DependencyCaller(dependency, method='__call__', arguments=<object object at 0x106009410>, unpack_arguments=False, output_path=None, *a, **kw)

Bases: piped.processors.base.Processor

Calls a method on a dependency.

This processor may be useful if you want to call a method on a provided dependency.

Parameters:
  • dependency – The dependency to use.
  • method – The name of the method to call.
  • arguments – The arguments to call the method with. Defaults to no arguments.
  • unpack_arguments – Whether to unpack arguments
  • output_path – Where to store the output in the baton.

callback-deferred

class piped.processors.util_processors.CallbackDeferred(deferred_path='deferred', result=Ellipsis, result_path='result', **kw)

Bases: piped.processors.base.Processor

Callbacks a deferred.

Parameters:
  • deferred_path – The path to the deferred inside the baton.
  • result – The result used to callback the deferred. This takes precedence over result_path
  • result_path – A path to the result in the baton.

clean-baton

class piped.processors.util_processors.BatonCleaner(keep=None, remove=None, **kw)

Bases: piped.processors.base.Processor

Filters the baton by removing unwanted attributes.

Expects at least one of the two keyword arguments:

Parameters:
  • keep – List of attributes to keep. Any attributes not in this list will be removed.
  • remove – List of attributes to remove. Any attribute in this list will be removed.

collect-batons

class piped.processors.util_processors.BatonCollector(list=None, deepcopy=False, describer=None, **kw)

Bases: piped.processors.base.Processor

Appends batons that pass through it to the list its instantiated with. Useful to e.g. inspect how a baton appears at various stages of the processing, or as a sink.

Parameters:deepcopy – Whether to deepcopy the batons as they pass through. If enabled, this will show the batons as they were when they passed through — if not, subsequent processors may have modified it.

decode-string

class piped.processors.util_processors.StringDecoder(encoding, **kw)

Bases: piped.processors.base.InputOutputProcessor

format-string

class piped.processors.util_processors.StringFormatter(format=None, format_path=None, unpack=True, **kw)

Bases: piped.processors.base.InputOutputProcessor

Formats a string.

See the format string syntax in the Python documentation.

Parameters:
  • format – An inline format string.
  • format_path – Path to a format string within the baton.
  • unpack – Whether to unpack input lists, tuples and dicts when formatting. This enables using a variable number of arguments and keyword arguments to str.format() depending on the input value.

encode-string

class piped.processors.util_processors.StringEncoder(encoding, **kw)

Bases: piped.processors.base.InputOutputProcessor

eval-lambda

class piped.processors.util_processors.LambdaProcessor(namespace=None, dependencies=None, **kw)

Bases: piped.processors.base.InputOutputProcessor

Given a path to a value in the baton, apply the provided lambda function.

Parameters:
  • lambda – A string that defines a lambda function when eval()-ed. Note that the lambda-keyword should not be provided in the string.
  • dependencies – A dict of local dependency names to their resource configurations. Providers that are strings are converted to tuples as required by the dependency manager. See found.processing.dependency.DependencyManager.create_dependency_map
  • namespace – A dict that defines the namespace the lambda runs in. Values in this dict will be passed to reflect.namedAny before being made available to the lambda function.

exec-code

class piped.processors.util_processors.ExecProcessor(code, inline_callbacks=False, use_file=True, namespace=None, dependencies=None, **kw)

Bases: piped.processors.base.InputOutputProcessor

Given a path to a value in the baton, execute the provided code.

Parameters:
  • code – A string that defines the code to run. The code will we wrapped in a function with the following signature: def compiled_process(self, input, baton):.
  • inline_callbacks – Whether to wrap compiled_process in twisted.internet.defer.inlineCallbacks()
  • use_file – If True, write the code contents to a temporary file so that the code is shown in any tracebacks.
  • dependencies – A dict of local dependency names to their resource configurations. Providers that are strings are converted to tuples as required by the dependency manager. See found.processing.dependency.DependencyManager.create_dependency_map()
  • namespace – A dict that defines the namespace the code runs in. Values in this dict will be passed to reflect.namedAny before being made available to the code as part of the globals.

flatten-list-of-dictionaries

class piped.processors.util_processors.FlattenDictionaryList(key_path, uniquify=False, sort=True, **kw)

Bases: piped.processors.base.InputOutputProcessor

Reduce a list of dictionaries to a list of values, given a key which occurs in the dictionaries.

For example, if we have a list::
l = [dict(author=’J. Doe’, email='f@o.o‘), dict(author=’J. Smith’, id=42))]
then processing that list given key_path=’author’ will result in::
result = [‘J. Doe’, ‘J. Smith’]

flatten-nested-lists

class piped.processors.util_processors.NestedListFlattener(paths, path_prefix='', **kw)

Bases: piped.processors.base.Processor

Flatten nested lists into a single list.

For example:

>>> baton = dict(data=['One', ['Two', 'Three'], ['Four']])
>>> NestedListFlattener(paths=['data']).process(baton)
{'data': ['One', 'Two', 'Three', 'Four']}

group-by-value

class piped.processors.util_processors.DictGrouper(key_path, input_path, output_path=None, fallback=None, **kw)

Bases: piped.processors.base.Processor

increment-counter

class piped.processors.util_processors.CounterIncrementer(counter_path, increment=1, **kw)

Bases: piped.processors.base.Processor

Increases the counter found at counter_path with increment.

lambda-decider

class piped.processors.util_processors.LambdaConditional(input_path='', namespace=None, **kw)

Bases: piped.processors.base.Processor

log

class piped.processors.util_processors.Logger(message=None, message_path=None, level='info', **kw)

Bases: piped.processors.base.Processor

Logs a message with the configured log-level.

The message is either configured at message, or looked up in the baton at message_path.

The message is logged with the configured level.

See also

piped.log

merge-with-dict

class piped.processors.util_processors.MergeWithDictProcessor(dict=None, merge_args={}, **kw)

Bases: piped.processors.base.Processor

Processor that merges the baton with the provided dictionary.

Expects a kw-argument “dict”, which is the dictionary to merge.

passthrough

class piped.processors.util_processors.Passthrough(node_name=None)

Bases: piped.processors.base.Processor

prefix-string

class piped.processors.util_processors.StringPrefixer(prefix, **kw)

Bases: piped.processors.util_processors.StringFormatter

Prefixes the string at input_path with the prefix.

pretty-print

class piped.processors.util_processors.PrettyPrint(path='', formatter='baton: baton', namespace=None, *a, **kw)

Bases: piped.processors.base.Processor

Prettyprints the baton before passing it on.

No changes are made to the baton.

raise-exception

class piped.processors.util_processors.RaiseException(type='exceptions.Exception', args=None, kwargs=None, **kw)

Bases: piped.processors.base.Processor

Raise an exception of the specified type.

The exception is instantiated with the optional args and kwargs.

remap

class piped.processors.util_processors.RemapProcessor(extend=False, copy=False, deep_copy=False, **kw)

Bases: piped.processors.base.MappingProcessor

Remaps a dictionary.

Expects to be instantiated with a dictionary that copies values at one path to another one.

For example, giving the mapping {‘b.c’: ‘a’} and the baton dict(b=dict(c=’d’)), the output will be dict(a=’d’, b=dict(c=’d’)).

set-value

class piped.processors.util_processors.ValueSetter(path, value, **kw)

Bases: piped.processors.base.Processor

Sets the value at path.

set-values

class piped.processors.util_processors.MappingSetter(mapping, path_prefix='', **kw)

Bases: piped.processors.base.Processor

Takes a path-to-value-mapping and sets values at the specified paths.

A path_prefix can be specified, if all the paths in the mapping share a common prefix.

shutdown

..autoclass:: Shutdown

stop

class piped.processors.util_processors.Stopper(input_path='', decider='input: bool(input)', namespace=None, **kw)

Bases: piped.processors.base.Processor

Stops processing when the configured lambda returns true.

An optional input_path can be specified, as well as a namespace. These are explained in detail in TODO: Some refererence.

trap-failure

class piped.processors.util_processors.TrapFailure(error_types, output_path=None, *a, **kw)

Bases: piped.processors.base.Processor

Traps failures of the specified types.

If the encountered exception is not one of the expected exception types, this processor will raise the original exception, preserving the traceback.

Parameters:
  • error_types – A single or a list of fully qualified exception class names that should be trapped.
  • output_path – If one of the expected error types are trapped, this value will be set to the matching error type.

wait

class piped.processors.util_processors.Waiter(delay, **kw)

Bases: piped.processors.base.Processor

wrap-coroutine

class piped.processors.util_processors.CoroutineWrapper(coroutine, **kw)

Bases: piped.processors.base.Processor

Passes batons to the wrapped coroutine.

Context processors

See also

contexts

fetch-context

class piped.processors.context_processors.ContextFetcher(context, output_path=Ellipsis, **kw)

Bases: piped.processors.base.Processor

Fetches a shared context.

If output_path is None, it defaults to the same as the context name.

fetch-persisted-context

class piped.processors.context_processors.PersistedContextFetcher(context, output_path=Ellipsis, **kw)

Bases: piped.processors.context_processors.ContextFetcher

Datetime processors

format-date

class piped.processors.datetime_processors.DateFormatter(format_string, **kw)

Bases: piped.processors.base.InputOutputProcessor

Formats a date according to a format.

parse-date

class piped.processors.datetime_processors.DateTimeParser(format_string, as_date=False, **kw)

Bases: piped.processors.base.InputOutputProcessor

Parses a timestamp according to format_string.

If as_date is true, then a date-object is returned, instead of a datetime.

File processors

append-to-file

class piped.processors.file_processors.FileAppender(file_path, input_path='', format=u'%sn', formatter='input: input', namespace=None, encode='utf8', **kw)

Bases: piped.processors.base.Processor

Append something to a file.

The file defined by file_path is opened in append-mode.

For every baton processed, the input found at input_path is passed through a formatter, which is defined by a lambda, in the same way eval-lambda is. The default formatter simply passes the input through to the format-string.

The output of the formatter is then combined with format, which is a format string.

Note that the output of the formatter need not be a string. It can e.g. be a dictionary, which is then converted to a string via the format-string.

The result of combining the format with the output from the formatter is assumed to be a unicode-string, which is then encoded to a UTF8-bytestring before appending to the file.

Note: The processor does not deal with flushing the buffers to disk.

append-to-log

class piped.processors.file_processors.LogAppender(file_path, **kw)

Bases: piped.processors.file_processors.FileAppender

Append something to a log.

JSON processors

decode-json

class piped.processors.json_processors.JsonDecoder(decoder='json.JSONDecoder', **kw)

Bases: piped.processors.base.InputOutputProcessor

Decodes JSON.

The input may either be a string or a file-like object.

Parameters:decoder – A fully qualified name of the json.JSONDecoder.

encode-json

class piped.processors.json_processors.JsonEncoder(encoder='piped.util.PipedJSONEncoder', indent=None, **kw)

Bases: piped.processors.base.InputOutputProcessor

Encodes JSON.

Parameters:
  • encoder – A fully qualified name of the json.JSONEncoder.
  • indent – If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.

Perspective broker processors

pb-call-remote

class piped.processors.spread_processors.CallRemote(client, name, args_path=None, kwargs_path=None, output_path='result', **kw)

Bases: piped.processors.base.Processor

Calls a remote function using PB.

Pipeline processors

diagram-dependencies

class piped.processors.pipeline_processors.DependencyDiagrammer(output_path='dot', **kw)

Bases: piped.processors.base.Processor

Makes a dot-representation of the dependency graph.

diagram-pipelines

class piped.processors.pipeline_processors.PipelineDiagrammer(output_path='dot', **kw)

Bases: piped.processors.base.Processor

Makes a dot-representation of the pipelines of every processor graph-evaluator.

for-each

class piped.processors.pipeline_processors.ForEach(pipeline, chunk_size=None, namespace=None, result_processor='results: results[-1]', parallel=False, done_on_first=False, fail_on_error=False, **kw)

Bases: piped.processors.base.InputOutputProcessor

ForEach is an In/Out-processor that invokes a pipeline for every item in its input.

If the input is a dict, the processor will iterate over the values and the output will also be a dict where the values are the results from the processing.

Parameters:
  • pipeline – The pipeline to process the items in.
  • chunk_size – If specified, the input-iterable is chunked. E.g if chunk_size=2, and the input iterable is ‘[1, 2, 3, 4, 5]`, then the pipeline is invoked with three times, with the batons being [1, 2], [3, 4] and [5].
  • result_processor – A lambda-definition. The lambda is invoked once for each item in the input with the results from the target pipeline.
  • namespace – A dict specifiying a namespace for the result_processor.
  • input_path – Path to the input in the baton. The input is assumed to be an iterable.
  • parallel – If true, then the iterable is exhausted, and the pipeline is invoked with all items in parallel. Then we wait until all of them have completed.
  • done_on_first – If true, then the result of the pipeline that completes first is returned. The results/failures of the other pipelines are dropped, unless they all fail. In that case, an exceptions.AllPipelinesFailedError is raised.
  • fail_on_error – If true, then any failure in the processing will cause the result to be that failure. If it is false, which is the default, then the errors are represented as failure-instances in the output.

run-pipeline

class piped.processors.pipeline_processors.PipelineRunner(pipeline=Ellipsis, pipeline_path=Ellipsis, only_last_result=True, trace_path=None, *a, **kw)

Bases: piped.processors.base.InputOutputProcessor

” Processes a baton in another pipeline.

If a pipeline_path is specified, the processor creates a temporary dependency during processing and takes care of removing this dependency afterwards. If this processor is the first consumer of the pipeline, exceptions that would otherwise be raised during startup may be raised during this processors processing.

Note that the pipeline is only requested as a dependency, and not deconstructed after use. This means that unused resources may remain in process afterwards.

Parameters:
  • pipeline – The name of the pipeline to process the baton in. A name may either be absolute or relative. Relative names start with a ., and each dot means that the named pipeline is one level higher up. To reference a sibling pipeline, start with a single dot.
  • pipeline_path – The path to where the pipeline name in the baton. The pipeline name in the baton is resolved in the same way as the pipeline argument. Using both a pipeline and pipeline_path argument to this processor results in a configuration error.
  • only_last_result – Whether to use only the last result. Since a pipeline may have several sinks, the results from processing will always be a list, and this option discards any output from any baton except the last.
  • trace_path – Path to store tracing results to. Cannot be an empty string. Defaults to None, which means no tracing.

scatter-gather

class piped.processors.pipeline_processors.ScatterGatherer(mapping=None, copy=True, input_path_prefix='', output_path_prefix='', **kw)

Bases: piped.processors.base.Processor

Process processors

parse-iostat-output

class piped.processors.process_processors.IOStatParser(format=None, **kw)

Bases: piped.processors.base.InputOutputProcessor

Parses output from iostat.

Format is a list of 2-tuples where the first element is the name of the device and the second element is a list of measurement names that are be used to parse the input into a dict.

Example format:

format:
  - - disk0
    - - KB/t
      - tps
      - MB/s
  # alternative yaml layouts with equivalent structure:
  - [cpu, [user, system, idle]]
  - - load
    - [1m, 5m, 15m]

The above format can be used to parse the following output:

'   24.00   2  0.05   3  3 94  0.61 0.69 0.79'

into:

{'cpu': {'idle': 94.0, 'system': 3.0, 'user': 3.0},
 'disk0': {'KB/t': 24.00, 'MB/s': 0.05, 'tps': 2.0},
 'load': {'15m': 0.79, '1m': 0.61, '5m': 0.69}}

render-dot

class piped.processors.process_processors.RenderDot(type='png', **kw)

Bases: piped.processors.base.InputOutputProcessor

Renders a dot graph.

SMTP processors

create-email-message

class piped.processors.smtp_processors.CreateEmailMessage(output_path='message', payload_path=None, headers=None, **kw)

Bases: piped.processors.base.Processor

Creates an instance of email.message.Message.

Parameters:
  • output_path – The path in the baton where the email.message.Message should be stored.
  • payload_path – The path to the payload in the baton.
  • headers

    A dict of initial headers to set in the email message.

    Example:

    - create-email-message:
        ...
        headers:
            Subject: my-subject
            From: Sender Name <sender@example.com>
            To: Recipient Name <recipient@example.com>
    

replace-email-headers

class piped.processors.smtp_processors.SetMessageHeaders(headers, message_path='message', **kw)

Bases: piped.processors.base.Processor

Set or replace message headers of an email.message.Message.

If the header already exists in the message, it will be replaced, otherwise it will be added.

Parameters:
  • message_path – The path to the email.message.Message in the baton.
  • headers – A dict of headers and their values which should be set

send-email

class piped.processors.smtp_processors.SendEmail(message_path='message', configuration=None, **kw)

Bases: piped.processors.base.Processor

Send an email.

Parameters:
  • message_path – The path to the message in the baton.
  • from – The email address of the sender.
  • from_path – The path to the senders email address in the baton. Only one of from and from_path may be used.
  • to – The email address of the recipient.
  • to_path – The path to the recipients email address in the baton. Only one of to and to_path may be used.
  • configuration

    The configuration argument may contain arguments accepted by send_mail(), except from_addr, to_addr and file which is provided by this processor:

    static smtp_provider.send_mail(from_addr, to_addr, file, host='localhost', port=25, timeout=60, retries=5, use_ssl=False, require_transport_security=None, require_authentication=None, username=None, password=None, helo_fallback=False)

    A utility function used to send email.

    Parameters:
    • from_addr – Sender email address.
    • to_addr – Recipients email address.
    • file – The full message, including headers. Any file-like or any object with a __str__ can be used.
    • host – Which SMTP server to use.
    • port – Which port to connect to.
    • use_ssl – Whether the server speaks SSL.
    • require_transport_security

      Whether to require transport security (TLS) before sending the email.

      Defaults to True if a username is specified.

      Defaults to False if using SSL due to Twisted #3989.

    • require_authentication – Whether to login before sending an email. Defaults to True if a username is specified, False otherwise.
    • username – The username used when logging in.
    • password – The password used when logging in.
    • timeout – Number of seconds to wait before timing out a connection. If None, perform no timeout checking.
    • retries – Number of retries if we cannot connect to the server.
    • helo_fallback – Whether to fallback to HELO if EHLO fails.

Tick processors

Processors that deal with tick intervals.

start-tick-interval

class piped.processors.tick_processors.StartInterval(interval, **kw)

Bases: piped.processors.tick_processors.IntervalProcessor

Starts a tick interval.

Parameters:interval – The name of the interval. See TickProvider.

stop-tick-interval

class piped.processors.tick_processors.StopInterval(interval, **kw)

Bases: piped.processors.tick_processors.IntervalProcessor

Stops a tick interval.

Parameters:interval – The name of the interval. See TickProvider.

Trace processors

diagram-trace

class piped.processors.trace_processors.DiagramTrace(input_path='trace', output_path='dot', show_unused_processors=True, **kw)

Bases: piped.processors.base.InputOutputProcessor

Create a dot-graph of trace results.

Parameters:
  • input_path – The path to the trace in the baton.
  • output_path – The path to save the dot graph to.
  • show_unused_processors – Whether to show unused processors in the dot graph. Defaults to true.

render-trace

class piped.processors.trace_processors.RenderTrace(request_path='request', svg_path='dot', input_path='trace', output_path='content', skip_if_nonexistent=False, template=None, json_encoder=None, **kw)

Bases: piped.processors.base.InputOutputProcessor

Creates a HTML-page containing the results of a rendered trace.

Parameters:
  • request_path – Path to the web request in the baton.
  • svg_path – Path to the rendered svg.
  • input_path – Path to the trace in the baton.
  • output_path – Path to write the html to in the baton.
  • skip_if_nonexistent – Whether to skip this processor if there is no trace. Defaults to False, as we will render the necessary HTML form even if there is no trace.
  • template – Name of the html template to use.
  • json_encoder – Fully qualified name of the json encoder to use.

Web processors

determine-ip

class piped.processors.web_processors.IPDeterminer(output_path='ip', proxied=False, proxy_header='x-forwarded-for', fail_if_not_proxied=False, **kw)

Bases: piped.processors.web_processors.HttpRequestProcessor

Determine the IP of the HTTP-client.

If proxied is true, then the proxy_header, which defaults to “x-forwarded-for”, is used to get the IP.

If an IP is not found at the proxy header, the client-IP is returned — unless fail_if_not_proxied is true, in which case a PipedError is raised.

extract-web-request-arguments

class piped.processors.web_processors.ExtractRequestArguments(request_path='request', *a, **kw)

Bases: piped.processors.base.MappingProcessor

Extract arguments from a twisted.web.server.Request-like object.

The input paths in the mapping is lookup up in the request arguments and copied to the specified output paths.

The mapping support the following additional keywords:

only_first
Only returns the first request argument by that name. Defaults to True.
load_json
Causes the value to be loaded as json before being copied into the baton. Defaults to False.

Consider the following example configuration:

mapping:
    - foo
    - bar:
        only_first: false
    - baz:
        load_json: true
    - zip:
        output_path: zap

Using the above configuration to extract the request arguments of a request to http://.../?foo=1&foo=2&bar=3&bar=4&baz={"test":[5,6,7]}&zip=8 results in the following baton:

request: <Request object>
foo: '1'
bar: ['1', '2']
baz:
    test: [5, 6, 7]
zap: '8'

Note that the integers in the request are not parsed. For more advanced input validation, see the validate-with-formencode processor.

Parameters:
  • request_path – Path to the request object in the baton.
  • skip_if_nonexistent – Whether to skip mapping entries that are not found in the request.

set-http-expires

class piped.processors.web_processors.SetExpireHeader(timedelta, **kw)

Bases: piped.processors.web_processors.HttpRequestProcessor

Set cache headers to indicate that the response should be cached for timedelta seconds.

Parameters:timedelta – a dictionary with the keys days, hours, minutes and seconds. The resulting timedelta is the sum of these.

set-http-headers

class piped.processors.web_processors.SetHttpHeaders(headers, **kw)

Bases: piped.processors.web_processors.HttpRequestProcessor

Adds headers as response headers.

web-client-get-page

class piped.processors.web_processors.ClientGetPage(base_url=None, url='url', method='GET', headers=None, agent=None, timeout=0, cookies=None, follow_redirect=True, redirect_limit=20, after_found_get=False, output_path='page', postdata=None, *a, **kw)

Bases: piped.processors.base.Processor

A simple web client agent for simple HTTP requests.

If any of the following arguments resolve to a callable, it is called without any arguments and the return value is used.

Parameters:
  • base_url – A string that is prepended to the given url.
  • url – If url is a list, it is flattened to a string by joining with ‘/’
  • method – The HTTP method to use in the request.
  • headers – Dict of headers.
  • agent – Client agent string.
  • timeout – Set a max
  • cookies – Dict of cookies
  • follow_redirect – Whether to follow redirects.
  • redirect_limit – The maximum number of HTTP redirects that can occur before it is assumed that the redirection is endless
  • after_found_get – Deviate from the HTTP 1.1 RFC by handling redirects the same way as most web browsers; if the request method is POST and a 302 status is encountered, the redirect is followed with a GET method
  • postdata – Data to post. If it is a buffer (has a callable .read(), postdata.read() is called and the result is used.
  • output_path – Path to use for the page.
Returns:

write-web-response

class piped.processors.web_processors.ResponseWriter(content_path='content', content_type=None, response_code=None, encoding='utf8', finish=True, fallback_content=Ellipsis, **kw)

Bases: piped.processors.web_processors.HttpRequestProcessor

A processor that writes the response to a twisted.web.server.Request

Parameters:response_code (int or str) – Either an integer response code or a string. If a string is supplied, it is converted to an integer by looking up the response codes defined in twisted.web.http during initialization.

XML processors

remove-markup

class piped.processors.xml_processors.MarkupRemover(mapping, input_path_prefix='', output_path_prefix='', skip_if_nonexistent=True, input_fallback=None, *a, **kw)

Bases: piped.processors.base.MappingProcessor

Removes all markup from the text at the provided paths.

Parameters:
  • mapping

    The mapping represents a list of input and output paths and any additional configuration they mapping entry requires.

    The mapping may be a list of map entries, a dict that maps input paths to a map entry.

    A map entry specifies the input_path, the output_path and any additional configuration options for the map entry. If no output_path is specified, it defaults to being the same as input_path. The input_path should be skipped if the map entry is part of a input_path -> map entry dict.

    A map entry may be a string, in which case it is rewritten to a dict(input_path=map_entry)

    Additional configuration options are passed to the validate_additional_kwargs() function during initialization, and which subclasses are highly encouraged to override and perform any necessary validation in. These will be passed as keyword arguments to process_mapping() along with the input.

    Example mappings:

    foo: foo
    bar:
        an_option: test
    baz:
        output_path: zip
    

    This is equivalent with:

    - foo
    - bar:
        an_option: test
    - baz:
        output_path: zip
    

    And also:

    - foo
    - bar:
        an_option: test
    - baz: zip
    
  • input_path_prefix – Prefix used on all input paths.
  • output_path_prefix – Prefix used on all output paths. If an output_path for a map entry is None, however, the prefix is not used.
  • skip_if_nonexistent – Whether to skip the processor for a single map entry if the input is not found in the baton.
  • input_fallback – The value to use if the input is not found and skip_if_nonexistent is false.

Contrib processors

Status testing processors

create-statustest-reporter

class piped_status_testing.processors.ReporterCreator(reporter='piped_status_testing.statustest.ProcessorReporter', processor=None, arguments=None, output_path='reporter', **kw)

Bases: piped.processors.base.Processor

Create a reporter for statustests.

A reporter is responsible for handling the results of test runs. The default reporter supports passing test results to a separate processor, gathering the test results for inspection after the test suites have been run and optionally printing the results to the console.

Parameters:
  • reporter – The fully qualified name of the reporter class to instantiate. This should be a subclass of PipelineReporter.
  • processor – The name of a processor that will be used. The resulting dependency object is passed to the reporter as the first argument. If set to None the dependency will also be None, and no processor will be used.
  • arguments – A class:dict that contains additional arguments that are passed to the reporter during instantiation. Arguments with the suffix _path are fetched from the baton at the path given by the value and the suffix is stripped from the key.
  • output_path – A path to where the reporter should be stored in the baton.

wait-for-statustest-reporter

class piped_status_testing.processors.WaitForReporterProcessing(reporter_path='reporter', done=False, **kw)

Bases: piped.processors.base.Processor

Wait for the processor that processes reporter results to finish processing.

Since the reporter processing may be asynchronous, this processor may be used to wait until all the currently queued reporter processing is completed.

Parameters:
  • reporter_path – The path to the reporter in the baton.
  • done – Whether to call reporter.done() after waiting for the reporter processing.

Validation processors

validate-with-formencode

class piped_validation.processors.FormEncodeValidator(schema, input_path, output_path, **kw)

Bases: piped.processors.base.Processor

ZMQ processors

parse-as-mongrel-request

class piped_zmq.mongrel2_processors.MongrelRequestToBatonParser(node_name=None)

Bases: piped.processors.base.Processor

send-zmq-message

class piped_zmq.processors.MessageSender(queue_name, message_path='', retries=None, retry_wait=0.1, *a, **kw)

Bases: piped.processors.base.Processor

send-mongrel-reply

class piped_zmq.mongrel2_processors.MongrelReplySender(queue_name, response_path='http_response', close=True, *a, **kw)

Bases: piped.processors.base.Processor

ZooKeeper processors

create-zookeeper-node

class piped_zookeeper.processors.CreateZooKeeperNode(path, data=None, flags=None, create_intermediary_nodes=False, output_path='create', **kw)

Bases: piped_zookeeper.processors.ZooKeeperProcessor

Create a ZooKeeper node.

Parameters:
  • path – Path to the node.
  • data – The data to set.
  • flags – A flag or a list of flags that will be used when creating the node. For example: [EPHEMERAL, SEQUENCE], which will create an ephemeral, sequential node.
  • create_intermediary_nodes – Whether to create intermediary nodes id they don’t exist.
  • output_path – Path to store the output metadata to in the baton.

get-zookeeper-children

class piped_zookeeper.processors.GetZooKeeperChildren(path, cached=True, output_path='children', *a, **kw)

Bases: piped_zookeeper.processors.ZooKeeperProcessor

Get a list of children for a ZooKeeper node.

Parameters:
  • path – Path to the node.
  • cached – Whether to cache the result.
  • output_path – Path to store the list of children to in the baton.

get-zookeeper-data

class piped_zookeeper.processors.GetZooKeeperData(path, cached=True, output_path='data', metadata_output_path=None, *a, **kw)

Bases: piped_zookeeper.processors.ZooKeeperProcessor

Get the data of a ZooKeeper node.

Parameters:
  • path – Path to the node.
  • cached – Whether to cache the result.
  • output_path – Path to store the node contents to in the baton.
  • metadata_output_path – Path to store the node metadata to in the baton.

set-zookeeper-data

class piped_zookeeper.processors.SetZooKeeperData(path, data, create_intermediary_nodes=False, output_path='set', **kw)

Bases: piped_zookeeper.processors.ZooKeeperProcessor

Set the contents of a ZooKeeper node.

Parameters:
  • path – Path to the node.
  • data – The data to set.
  • create_intermediary_nodes – Whether to create intermediary nodes if they don’t exist.
  • output_path – Path to store the output metadata to in the baton.

zookeeper-node-exists

class piped_zookeeper.processors.ZooKeeperNodeExists(path, cached=True, output_path='exists', metadata_output_path=None, *a, **kw)

Bases: piped_zookeeper.processors.ZooKeeperProcessor

Check whether a given ZooKeeper node exists.

Parameters:
  • path – Path to the node.
  • cached – Whether to cache the result.
  • output_path – Path to store whether the node existed or not in the baton.
  • metadata_output_path – If the node exists, a path to store the metadata to.

Table Of Contents

Previous topic

Available providers

Next topic

Util

This Page