[PIPELINE] Questions about pipeline

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

[PIPELINE] Questions about pipeline

Aquator
Hi,

I am playing with pipeline, and I have some questions regarding to usage.

What is the suggested method of avoiding "traffic-jam" in the pipe? I mean, when a stage produces results fast, followed by a long-running stage. I will run out of stack space in large amount of input data.

Currently, my solution uses the context raise/registerListener methods. The slow stage notifies it's "feeder" stage, that new data can be processed. Is there any better ideas for this problem?

My other issue is about branches. Is there a way to attach separated branches together? For example, a stage needs input from two different branches. Is there any solution to apply a synchronized data flow? (Lets say I have two branches, one produces A-s, and the other produces B-s. I want a stage, that is being fed by those two branches, and produces a sequence of ABABAB...) Is there an implementation for such behaviour?

Finally, I am interested about the various StageDrivers. I'd like some more detailed informations
then the API. Especially usage advices, samples, to help choose the best stagedriver for the certain stages.

Thanks in advance for your time,
Istvan Cseh

______________________________________________________________________
Olcsó repülőjegyet mindenkinek!
Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is.
repulojegy.budavartours.hu


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

Ken Tanaka
Hello Istvan,

I recommend limiting the input queue size that is used by the stage
driver. The DedicatedThreadStageDriverFactory uses a blocking queue. If
you are configuring your pipeline with a Digester configuration file the
XML would look like this (look for the word "capacity" near the end):

<?xml version="1.0" encoding="UTF-8"?>

<!--
    Document   : conf_examplepipeline.xml
    Description:
        Configuration file for data loading pipeline
-->

<pipeline>
   
    <driverFactory
       
className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"

        id="df1" faultToleranceLevel="ALL"/>
   
    <!-- Find the data file(s), surveyData.YYYY-MM-DD.
         ** FAST STAGE **
         Input:  starting directory
         Output: Data File object
    -->
    <!--
================================================================== -->
    <stage className="org.apache.commons.pipeline.stage.FileFinderStage"
           filePattern="surveyData.*"
           driverFactoryId="df1" />
   
    <feed>
        <value>/data/prod/ingestNow</value>
    </feed>
   


    <!-- Unpack the data file. Each input file becomes a bunch of data
bean objects.
         ** FAST STAGE **
         Input: Data File object
         Output: Data point object
    -->
    <!--
================================================================== -->
    <stage className="gov.noaa.eds.SurveyReaderStage"
           driverFactoryId="df1" />

    <!-- Write the data beans to the database
         ** SLOW STAGE - Set ArrayBlockingQueueFactory capacity to 10 **
       
         The queueFactory must be a blocking type that implements the
         java.util.concurrent.BlockingQueue interface.
       
         Input:  Data point object
         Output: Data point object
    -->
    <!--
================================================================== -->
    <stage className="gov.noaa.eds.WriteSurveyToDatabaseStage"
           driverFactoryId="df1" >
        <property propName="queueFactory"
                 
className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
                  capacity="10" fair="false"/>
    </stage>

</pipeline>

Alternatively, you could set up the driver factory with the same
property so that all stages have the same input queue size:

    <driverFactory
       
className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"

        id="df1" faultToleranceLevel="ALL">
        <property propName="queueFactory"
                 
className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
                  capacity="10" fair="false"/>
    </driverFactory>

Aquator wrote:

> Hi,
>
> I am playing with pipeline, and I have some questions regarding to usage.
>
> What is the suggested method of avoiding "traffic-jam" in the pipe? I mean, when a stage produces results fast, followed by a long-running stage. I will run out of stack space in large amount of input data.
>
> Currently, my solution uses the context raise/registerListener methods. The slow stage notifies it's "feeder" stage, that new data can be processed. Is there any better ideas for this problem?
>
> My other issue is about branches. Is there a way to attach separated branches together? For example, a stage needs input from two different branches. Is there any solution to apply a synchronized data flow? (Lets say I have two branches, one produces A-s, and the other produces B-s. I want a stage, that is being fed by those two branches, and produces a sequence of ABABAB...) Is there an implementation for such behaviour?
>
>  
Normally each branch should carry just one type of data, otherwise all
the stages on a branch of pipeline processing end up have the same
conditional logic at the beginning to sort out what to do with the
incoming objects. If you do need to combine information back into
another branch, you should do it by raising events, which can bring
along with them the data transport beans you wish combine. The receiving
stage needs to register a listener to bring those in. I have some
documentation on this, but it's not publicly available yet. If you want
I can email you the HTML directly.

> Finally, I am interested about the various StageDrivers. I'd like some more detailed informations
> then the API. Especially usage advices, samples, to help choose the best stagedriver for the certain stages.
>
> Thanks in advance for your time,
> Istvan Cseh
>
> ______________________________________________________________________
> Olcsó repülőjegyet mindenkinek!
> Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is.
> repulojegy.budavartours.hu
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>  



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

Ken Tanaka
In reply to this post by Aquator
I created some documentation for some Pipeline concepts. The following
links are to a self-signed server, so you will have to allow a security
exception to view them.

https://www.ngdc.noaa.gov/wiki/index.php?title=PipelineBasics

https://www.ngdc.noaa.gov/wiki/index.php?title=QnD_Pipeline_Events

-Ken

Aquator wrote:
> Hi,
>
> I am playing with pipeline, and I have some questions regarding to usage.
>
>
>  
...
> Finally, I am interested about the various StageDrivers. I'd like some more detailed informations
> then the API. Especially usage advices, samples, to help choose the best stagedriver for the certain stages.
>
> Thanks in advance for your time,
> Istvan Cseh
>  


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

Ken Tanaka
In reply to this post by Aquator
The Pipeline Basics tutorial has now been incorporated into the project
page. Thanks to some help and cleanup from Rahul Akolkar the
documentation submitted was installed quickly. See

http://commons.apache.org/sandbox/pipeline/pipeline_basics.html

-Ken

Aquator wrote:

> Hi,
>
> I am playing with pipeline, and I have some questions regarding to usage.
>
> What is the suggested method of avoiding "traffic-jam" in the pipe? I mean, when a stage produces results fast, followed by a long-running stage. I will run out of stack space in large amount of input data.
>
> Currently, my solution uses the context raise/registerListener methods. The slow stage notifies it's "feeder" stage, that new data can be processed. Is there any better ideas for this problem?
>
> My other issue is about branches. Is there a way to attach separated branches together? For example, a stage needs input from two different branches. Is there any solution to apply a synchronized data flow? (Lets say I have two branches, one produces A-s, and the other produces B-s. I want a stage, that is being fed by those two branches, and produces a sequence of ABABAB...) Is there an implementation for such behaviour?
>
> Finally, I am interested about the various StageDrivers. I'd like some more detailed informations
> then the API. Especially usage advices, samples, to help choose the best stagedriver for the certain stages.
>
> Thanks in advance for your time,
> Istvan Cseh
>
> ______________________________________________________________________
> Olcsó repülőjegyet mindenkinek!
> Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is.
> repulojegy.budavartours.hu
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>  


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

tdudgeon
Ken Tanaka wrote:
> The Pipeline Basics tutorial has now been incorporated into the project
> page. Thanks to some help and cleanup from Rahul Akolkar the
> documentation submitted was installed quickly. See
>
> http://commons.apache.org/sandbox/pipeline/pipeline_basics.html
>
> -Ken
>

That documentation is really useful. Thanks!

Could I follow up one of the earlier questions in this thread on
branching and merging.


 From those docs it looks to me like the way data was set to a branch is
a bit strange. There appears to be a FileReaderStage class that has Java
bean property called htmlPipelineKey:
<stage className="com.demo.pipeline.stages.FileReaderStage"
driverFactoryId="df1" htmlPipelineKey="sales2html"/>

and later in the pipeline a branch is defined that names the pipeline
according to that name:
<pipeline key="sales2html">

This seems pretty inflexible to me. Any branches have to be hardcoded
into the stage definition. I was expecting a situation where multiple
stages could be the recipients of the output of any stage, and these can
be "wired up" dynamically. e.g. something like this:


          |--stage2
          |
stage1---+--stage3
          |
          |--stage4

so that all you needed to do was to define a stage5 as one more
downstream stage for stage 1 and it would transparently receive the data.

Is this possible, or does the branching have to be hard-coded into the
stage definition?


Similarly for merging. To follow up the previous question, let say I had
stageA that output some A's and stage B that output some B's (lets
assume both A's and B's are simple numbers). Now I wanted to have a
stageC that takes all A's and all B's and generates some output with
the, (lets assume the output is A * B so that every combination of A * B
is output). So this would look like this:

stageA--+
         |
         |----stageC
         |
stageB--+

Is it possble to do this, so that stageA and stageB are both writing to
stageC, but that stageC can distinguish the 2 different streams of data?


Many thanks.

Tim






---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

Ken Tanaka


Tim Dudgeon wrote:

> Ken Tanaka wrote:
>> The Pipeline Basics tutorial has now been incorporated into the
>> project page. Thanks to some help and cleanup from Rahul Akolkar the
>> documentation submitted was installed quickly. See
>>
>> http://commons.apache.org/sandbox/pipeline/pipeline_basics.html
>>
>> -Ken
>>
>
> That documentation is really useful. Thanks!
>
Wow, someone is actually looking at this. I'll work on cleaning up the
documentation some. I hope people realize that some of the color-coded
examples got some inadvertent newlines added--but this isn't relevant to
your questions.

> Could I follow up one of the earlier questions in this thread on
> branching and merging.
>
>
> From those docs it looks to me like the way data was set to a branch
> is a bit strange. There appears to be a FileReaderStage class that has
> Java bean property called htmlPipelineKey:
> <stage className="com.demo.pipeline.stages.FileReaderStage"
> driverFactoryId="df1" htmlPipelineKey="sales2html"/>
>
> and later in the pipeline a branch is defined that names the pipeline
> according to that name:
> <pipeline key="sales2html">
>
> This seems pretty inflexible to me. Any branches have to be hardcoded
> into the stage definition. I was expecting a situation where multiple
> stages could be the recipients of the output of any stage, and these
> can be "wired up" dynamically. e.g. something like this:
>
>
>          |--stage2
>          |
> stage1---+--stage3
>          |
>          |--stage4
>
> so that all you needed to do was to define a stage5 as one more
> downstream stage for stage 1 and it would transparently receive the data.
>
> Is this possible, or does the branching have to be hard-coded into the
> stage definition?
I wouldn't call the way branches are specified "hard coding", since the
xml file here is a configuration file. For our current use, branches are
pretty rare, so the pipeline framework deals best with simple cases that
are fairly linear. Also, if stage1 is a branching stage, then that stage
was written with branching in mind, and the "htmlPipelineKey" is a
hard-coded property name in the stage source code, so it can direct
output when it passes data out to the framework. To simplify matters,
all your branching stages could follow a convention of using "branchKey"
(or some other generic name), then you wouldn't have to remember what
variable holds the branch name for which stage.

A stage could be written to take an arbitrary number of branch names,
and thus send output down multiple branches, although it can get
complicated configuring rules on what goes where if the same thing isn't
going to all the branches. So rather than making stage1 a branching
stage, it could be followed by "stageMulti", which would send copies of
it's input to a number of outputs:

                  |-----stage2
                  |
stage1----stageMulti----stage3
                  |
                  |-----stage4

stageMulti could then be used to add branching to any stage it follows.

I can imagine making configuration files a little simpler with regards
to setting up branching, but the more intelligent configuration file
reader to handle that hasn't been written.

>
>
> Similarly for merging. To follow up the previous question, let say I
> had stageA that output some A's and stage B that output some B's (lets
> assume both A's and B's are simple numbers). Now I wanted to have a
> stageC that takes all A's and all B's and generates some output with
> the, (lets assume the output is A * B so that every combination of A *
> B is output). So this would look like this:
>
> stageA--+
>         |
>         |----stageC
>         |
> stageB--+
>
> Is it possble to do this, so that stageA and stageB are both writing
> to stageC, but that stageC can distinguish the 2 different streams of
> data?
>
>
First off, the current design expects all pipelines to start with one
stage, to accept feed values out of the config file (or place command
line arguments into the first stage queue if the main pipeline
application was been written to do that). So maybe you have a stageInit
which takes a single number like "3"

feed "3" --> stageInit----stageA
                |
                ----------stageB

stageInit can then pass "3" on to stageA and stageB, possibly causing
stageA to create 3 2-digit numbers and stageB to create 3 3-digit numbers.

For merging, stageC will accept normal input from a stage as well as
watch for events carrying additional data. stageC may well have to
accumulate input and then produce output as events are received. Stages
normally accept one input, which is either a feed or the output of the
stage immediately preceding them. Input from elsewhere or from more than
one source is currently handled as events raised by the source and
received by a "notify" method in the receiving stage.

feed "3" --> stageInit----stageA-------------stageC --> 10*111, 10*222,
10*333, 20*111, 20*222, 20*333, 30*111....
                |      3          10, 20, 30    :
                ----------stageB................:
                       3          111, 222, 333
---- normal data flow
.... event passed data

Like branching, for our uses merging is rare. Also beware of running out
of memory if you are doing any accumulation of data to merge input from
more than one stage.

-Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

tdudgeon
Hi Ken,

Thanks for the rapid response.
First, let me explain some background here.
I am looking for Java based pipelining solutions to incorporate into an
exisiting application. The use of pipelining is well established in the
  sector, with applications like Pipeline Pilot and Knime, and so many
of the common needs have been well established over several years by
these applciations.

Key issues that my initial investigations of Jakarta Pipeline seem to
identify are:

1. Branching is very common. This typically takes 2 forms:
1.1. Splitting data. A stage could (for instance) have 2 output ports,
"pass" and "fail". Data is processed by the stage and sent to whichever
port is appropriate. Different stages would be attached to each port,
resulting in the pipeline being brached by this pass/fail decision.
1.2. Attaching multiple stages to a particular output port.
The stage just sends its output onwards. It has no interest in what
happens once the data is sent, and is not concerned whether zero, one or
  100 stages receive the output. This is the stage1,2,3,4 scenario I
outlined previously.

2. Merging is also common (though less common than branching).
By analogy with braching, I would see this conceptually as a stage
having multiple input ports (A and B in the merging example).


Taken together I can see a generalisation here using named ports (input
and outut), which is similar, but not identical, to your current concept
of branches.

So you have:
BaseStage.emit(String branch, Object obj);
whereas I would conceptually see this as:
emit(String port, Object obj);
and you have:
Stage.process(Object obj);
whereas I would would conceptually see this as:
Stage.process(String port, Object obj);

And when a pipeline is being assembled a downstream stage is attached to
a particular port of a stage, not the stage itself. It then just
recieves data sent to that particular port, but not the other ports.

I'd love to hear how compatible the current system is with this way of
seeing things. Are we just talking about a new type of Stage
implementation, or a more fundamental incompatibility at the API level.


Many thanks.

Tim



Ken Tanaka wrote:

>
>
> Tim Dudgeon wrote:
>> Ken Tanaka wrote:
>>> The Pipeline Basics tutorial has now been incorporated into the
>>> project page. Thanks to some help and cleanup from Rahul Akolkar the
>>> documentation submitted was installed quickly. See
>>>
>>> http://commons.apache.org/sandbox/pipeline/pipeline_basics.html
>>>
>>> -Ken
>>>
>>
>> That documentation is really useful. Thanks!
>>
> Wow, someone is actually looking at this. I'll work on cleaning up the
> documentation some. I hope people realize that some of the color-coded
> examples got some inadvertent newlines added--but this isn't relevant to
> your questions.
>> Could I follow up one of the earlier questions in this thread on
>> branching and merging.
>>
>>
>> From those docs it looks to me like the way data was set to a branch
>> is a bit strange. There appears to be a FileReaderStage class that has
>> Java bean property called htmlPipelineKey:
>> <stage className="com.demo.pipeline.stages.FileReaderStage"
>> driverFactoryId="df1" htmlPipelineKey="sales2html"/>
>>
>> and later in the pipeline a branch is defined that names the pipeline
>> according to that name:
>> <pipeline key="sales2html">
>>
>> This seems pretty inflexible to me. Any branches have to be hardcoded
>> into the stage definition. I was expecting a situation where multiple
>> stages could be the recipients of the output of any stage, and these
>> can be "wired up" dynamically. e.g. something like this:
>>
>>
>>          |--stage2
>>          |
>> stage1---+--stage3
>>          |
>>          |--stage4
>>
>> so that all you needed to do was to define a stage5 as one more
>> downstream stage for stage 1 and it would transparently receive the data.
>>
>> Is this possible, or does the branching have to be hard-coded into the
>> stage definition?
> I wouldn't call the way branches are specified "hard coding", since the
> xml file here is a configuration file. For our current use, branches are
> pretty rare, so the pipeline framework deals best with simple cases that
> are fairly linear. Also, if stage1 is a branching stage, then that stage
> was written with branching in mind, and the "htmlPipelineKey" is a
> hard-coded property name in the stage source code, so it can direct
> output when it passes data out to the framework. To simplify matters,
> all your branching stages could follow a convention of using "branchKey"
> (or some other generic name), then you wouldn't have to remember what
> variable holds the branch name for which stage.
>
> A stage could be written to take an arbitrary number of branch names,
> and thus send output down multiple branches, although it can get
> complicated configuring rules on what goes where if the same thing isn't
> going to all the branches. So rather than making stage1 a branching
> stage, it could be followed by "stageMulti", which would send copies of
> it's input to a number of outputs:
>
>                  |-----stage2
>                  |
> stage1----stageMulti----stage3
>                  |
>                  |-----stage4
>
> stageMulti could then be used to add branching to any stage it follows.
>
> I can imagine making configuration files a little simpler with regards
> to setting up branching, but the more intelligent configuration file
> reader to handle that hasn't been written.
>>
>>
>> Similarly for merging. To follow up the previous question, let say I
>> had stageA that output some A's and stage B that output some B's (lets
>> assume both A's and B's are simple numbers). Now I wanted to have a
>> stageC that takes all A's and all B's and generates some output with
>> the, (lets assume the output is A * B so that every combination of A *
>> B is output). So this would look like this:
>>
>> stageA--+
>>         |
>>         |----stageC
>>         |
>> stageB--+
>>
>> Is it possble to do this, so that stageA and stageB are both writing
>> to stageC, but that stageC can distinguish the 2 different streams of
>> data?
>>
>>
> First off, the current design expects all pipelines to start with one
> stage, to accept feed values out of the config file (or place command
> line arguments into the first stage queue if the main pipeline
> application was been written to do that). So maybe you have a stageInit
> which takes a single number like "3"
>
> feed "3" --> stageInit----stageA
>                |
>                ----------stageB
>
> stageInit can then pass "3" on to stageA and stageB, possibly causing
> stageA to create 3 2-digit numbers and stageB to create 3 3-digit numbers.
>
> For merging, stageC will accept normal input from a stage as well as
> watch for events carrying additional data. stageC may well have to
> accumulate input and then produce output as events are received. Stages
> normally accept one input, which is either a feed or the output of the
> stage immediately preceding them. Input from elsewhere or from more than
> one source is currently handled as events raised by the source and
> received by a "notify" method in the receiving stage.
>
> feed "3" --> stageInit----stageA-------------stageC --> 10*111, 10*222,
> 10*333, 20*111, 20*222, 20*333, 30*111....
>                |      3          10, 20, 30    :
>                ----------stageB................:
>                       3          111, 222, 333
> ---- normal data flow
> .... event passed data
>
> Like branching, for our uses merging is rare. Also beware of running out
> of memory if you are doing any accumulation of data to merge input from
> more than one stage.
>
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

Ken Tanaka
Hi Tim,

Tim Dudgeon wrote:
> Hi Ken,
>
> Thanks for the rapid response.
> First, let me explain some background here.
> I am looking for Java based pipelining solutions to incorporate into
> an exisiting application. The use of pipelining is well established in
> the  sector, with applications like Pipeline Pilot and Knime, and so
> many of the common needs have been well established over several years
> by these applciations.
Have you also looked at Pentaho?

>
> Key issues that my initial investigations of Jakarta Pipeline seem to
> identify are:
>
> 1. Branching is very common. This typically takes 2 forms:
> 1.1. Splitting data. A stage could (for instance) have 2 output ports,
> "pass" and "fail". Data is processed by the stage and sent to
> whichever port is appropriate. Different stages would be attached to
> each port, resulting in the pipeline being brached by this pass/fail
> decision.
> 1.2. Attaching multiple stages to a particular output port.
> The stage just sends its output onwards. It has no interest in what
> happens once the data is sent, and is not concerned whether zero, one
> or  100 stages receive the output. This is the stage1,2,3,4 scenario I
> outlined previously.
>
> 2. Merging is also common (though less common than branching).
> By analogy with braching, I would see this conceptually as a stage
> having multiple input ports (A and B in the merging example).
>
At present, the structure for storing stages is a linked list, and
branches are implemented as additional pipelines accessed by a name
through a HashMap. To generally handle branching and merging, a directed
acyclic graph (DAG) would better serve, but that would require the
pipeline code to be rewritten at this level. Arguments could also be
made for allowing cycles, as in directed graphs, but that would be
harder to debug, and with a GUI might be a step toward a visual
programming language--so I don't think this should be pursued yet unless
there are volunteers...

>
> Taken together I can see a generalisation here using named ports
> (input and outut), which is similar, but not identical, to your
> current concept of branches.
>
> So you have:
> BaseStage.emit(String branch, Object obj);
> whereas I would conceptually see this as:
> emit(String port, Object obj);
> and you have:
> Stage.process(Object obj);
> whereas I would would conceptually see this as:
> Stage.process(String port, Object obj);
>
> And when a pipeline is being assembled a downstream stage is attached
> to a particular port of a stage, not the stage itself. It then just
> recieves data sent to that particular port, but not the other ports.
I could see that this would work, but would need either modifying a
number of stages already written, or maybe creating a compatibility
stage driver that takes older style stages so that the input object
comes from a configured port name, usually "input" and a sends the
output to  configured output ports named "output" and whatever the
previous branch name(s) were, if any. Stages that used to look for
events for input should be rewritten to read multiple inputs (
Stage.process(String port, Object obj) as you suggested). Events would
then be reserved for truly out-of-band signals between stages rather
than carrying data for processing.
>
> I'd love to hear how compatible the current system is with this way of
> seeing things. Are we just talking about a new type of Stage
> implementation, or a more fundamental incompatibility at the API level.
>
I think you have some good ideas. This is changing the Stage
implementation, which affects on the order of 60 stages for us that
override the process method, unless the compatibility stage driver works
out. The top level pipeline would also be restructured. The amount of
work required puts this out of the near term for me to work on it, but
there may be other developers/contributors to take this on.

-Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

tdudgeon
Ken Tanaka wrote:

> Hi Tim,
>
> Tim Dudgeon wrote:
>> Hi Ken,
>>
>> Thanks for the rapid response.
>> First, let me explain some background here.
>> I am looking for Java based pipelining solutions to incorporate into
>> an exisiting application. The use of pipelining is well established in
>> the  sector, with applications like Pipeline Pilot and Knime, and so
>> many of the common needs have been well established over several years
>> by these applciations.
> Have you also looked at Pentaho?
I took a look, but it doesn't seem to be what I'm after.

>>
>> Key issues that my initial investigations of Jakarta Pipeline seem to
>> identify are:
>>
>> 1. Branching is very common. This typically takes 2 forms:
>> 1.1. Splitting data. A stage could (for instance) have 2 output ports,
>> "pass" and "fail". Data is processed by the stage and sent to
>> whichever port is appropriate. Different stages would be attached to
>> each port, resulting in the pipeline being brached by this pass/fail
>> decision.
>> 1.2. Attaching multiple stages to a particular output port.
>> The stage just sends its output onwards. It has no interest in what
>> happens once the data is sent, and is not concerned whether zero, one
>> or  100 stages receive the output. This is the stage1,2,3,4 scenario I
>> outlined previously.
>>
>> 2. Merging is also common (though less common than branching).
>> By analogy with braching, I would see this conceptually as a stage
>> having multiple input ports (A and B in the merging example).
>>
> At present, the structure for storing stages is a linked list, and
> branches are implemented as additional pipelines accessed by a name
> through a HashMap. To generally handle branching and merging, a directed
> acyclic graph (DAG) would better serve, but that would require the
> pipeline code to be rewritten at this level. Arguments could also be
> made for allowing cycles, as in directed graphs, but that would be
> harder to debug, and with a GUI might be a step toward a visual
> programming language--so I don't think this should be pursued yet unless
> there are volunteers...
>

I agree, DAG would be better, but cycles could be needeed too, so DG
would be better too.
But, yes, I am ideally wanting visual designer too.


>>
>> Taken together I can see a generalisation here using named ports
>> (input and outut), which is similar, but not identical, to your
>> current concept of branches.
>>
>> So you have:
>> BaseStage.emit(String branch, Object obj);
>> whereas I would conceptually see this as:
>> emit(String port, Object obj);
>> and you have:
>> Stage.process(Object obj);
>> whereas I would would conceptually see this as:
>> Stage.process(String port, Object obj);
>>
>> And when a pipeline is being assembled a downstream stage is attached
>> to a particular port of a stage, not the stage itself. It then just
>> recieves data sent to that particular port, but not the other ports.
> I could see that this would work, but would need either modifying a
> number of stages already written, or maybe creating a compatibility
> stage driver that takes older style stages so that the input object
> comes from a configured port name, usually "input" and a sends the
> output to  configured output ports named "output" and whatever the
> previous branch name(s) were, if any. Stages that used to look for
> events for input should be rewritten to read multiple inputs (
> Stage.process(String port, Object obj) as you suggested). Events would
> then be reserved for truly out-of-band signals between stages rather
> than carrying data for processing.

Agreed, I think with would be good. I think existing stages could be
made compatible by having a default input and output port, and to use
those if not specific port was specified.
A default in/out port would probably be necessary to allow simple
auto-wiring.

>>
>> I'd love to hear how compatible the current system is with this way of
>> seeing things. Are we just talking about a new type of Stage
>> implementation, or a more fundamental incompatibility at the API level.
>>
> I think you have some good ideas. This is changing the Stage
> implementation, which affects on the order of 60 stages for us that
> override the process method, unless the compatibility stage driver works
> out. The top level pipeline would also be restructured. The amount of
> work required puts this out of the near term for me to work on it, but
> there may be other developers/contributors to take this on.

I need to investigate more fully here, and consider the other options.
But potentially this is certainly of interest.

So is all that's necessary to prototype this to create a new Stage
implementation, with new emit( ... ) and process( ... ) methods?


Thanks

Tim




>
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

Ken Tanaka


Tim Dudgeon wrote:
> Ken Tanaka wrote:
>> Hi Tim,
...

>>>
>> At present, the structure for storing stages is a linked list, and
>> branches are implemented as additional pipelines accessed by a name
>> through a HashMap. To generally handle branching and merging, a
>> directed acyclic graph (DAG) would better serve, but that would
>> require the pipeline code to be rewritten at this level. Arguments
>> could also be made for allowing cycles, as in directed graphs, but
>> that would be harder to debug, and with a GUI might be a step toward
>> a visual programming language--so I don't think this should be
>> pursued yet unless there are volunteers...
>>
>
> I agree, DAG would be better, but cycles could be needeed too, so DG
> would be better too.
> But, yes, I am ideally wanting visual designer too.
>
I'd like a visual designer too at some point, but that's a ways off into
the future.

>
>>>
>>> Taken together I can see a generalisation here using named ports
>>> (input and outut), which is similar, but not identical, to your
>>> current concept of branches.
>>>
>>> So you have:
>>> BaseStage.emit(String branch, Object obj);
>>> whereas I would conceptually see this as:
>>> emit(String port, Object obj);
>>> and you have:
>>> Stage.process(Object obj);
>>> whereas I would would conceptually see this as:
>>> Stage.process(String port, Object obj);
>>>
>>> And when a pipeline is being assembled a downstream stage is
>>> attached to a particular port of a stage, not the stage itself. It
>>> then just recieves data sent to that particular port, but not the
>>> other ports.
>> I could see that this would work, but would need either modifying a
>> number of stages already written, or maybe creating a compatibility
>> stage driver that takes older style stages so that the input object
>> comes from a configured port name, usually "input" and a sends the
>> output to  configured output ports named "output" and whatever the
>> previous branch name(s) were, if any. Stages that used to look for
>> events for input should be rewritten to read multiple inputs (
>> Stage.process(String port, Object obj) as you suggested). Events
>> would then be reserved for truly out-of-band signals between stages
>> rather than carrying data for processing.
>
> Agreed, I think with would be good. I think existing stages could be
> made compatible by having a default input and output port, and to use
> those if not specific port was specified.
> A default in/out port would probably be necessary to allow simple
> auto-wiring.
>
>>>
>>> I'd love to hear how compatible the current system is with this way
>>> of seeing things. Are we just talking about a new type of Stage
>>> implementation, or a more fundamental incompatibility at the API level.
>>>
>> I think you have some good ideas. This is changing the Stage
>> implementation, which affects on the order of 60 stages for us that
>> override the process method, unless the compatibility stage driver
>> works out. The top level pipeline would also be restructured. The
>> amount of work required puts this out of the near term for me to work
>> on it, but there may be other developers/contributors to take this on.
>
> I need to investigate more fully here, and consider the other options.
> But potentially this is certainly of interest.
>
> So is all that's necessary to prototype this to create a new Stage
> implementation, with new emit( ... ) and process( ... ) methods?
I'm thinking it's more involved than that. To really deal well with the
arbitrary number of downstream stages rather than just one means
changing the digester rules
<http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html>
on specifying what follows. Normally a stage is connected to the
preceding stage if it is listed in that order in the configuration file.
This should be a default behavior, but if  stage2 and stage3 both follow
stage1 then some notation of which is the previous stage is needed.

stage1----stage2
    |
    |-----stage3

might be set up as conf_pipe.xml:
<pipeline>
   ...
   <stage className="com.demo.pipeline.stages.Stage1"
driverFactoryId="df1" stageId="stage1"/>
   <stage className="com.demo.pipeline.stages.Stage2"
driverFactoryId="df1"/>
   <stage className="com.demo.pipeline.stages.Stage3"
driverFactoryId="df1" follows="stage1"/>
</pipeline>

I propose the 'follows="stage1"' attribute to connect stage3 to stage1
instead of stage2 immediately preceding. This seems cleaner than setting
up a branch and matching up branch key names between the branching stage
and the secondary pipeline(s). Can you think of a cleaner way to
configure this?

The Pipeline.java class will need to be modified to build and maintain a
DAG structure rather than a linked list. The incoming data are managed
by a queue in the stage driver, which would change to a group of queues,
allowing multiple inputs (ports). I'm assuming there is an open source
directed acyclic graph library out there that can replace the linked list.

-Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

tdudgeon
See comments below.

Tim

Ken Tanaka wrote:

>
>
> Tim Dudgeon wrote:
>> Ken Tanaka wrote:
>>> Hi Tim,
> ...
>>>>
>>> At present, the structure for storing stages is a linked list, and
>>> branches are implemented as additional pipelines accessed by a name
>>> through a HashMap. To generally handle branching and merging, a
>>> directed acyclic graph (DAG) would better serve, but that would
>>> require the pipeline code to be rewritten at this level. Arguments
>>> could also be made for allowing cycles, as in directed graphs, but
>>> that would be harder to debug, and with a GUI might be a step toward
>>> a visual programming language--so I don't think this should be
>>> pursued yet unless there are volunteers...
>>>
>>
>> I agree, DAG would be better, but cycles could be needeed too, so DG
>> would be better too.
>> But, yes, I am ideally wanting visual designer too.
>>
> I'd like a visual designer too at some point, but that's a ways off into
> the future.
>>
>>>>
>>>> Taken together I can see a generalisation here using named ports
>>>> (input and outut), which is similar, but not identical, to your
>>>> current concept of branches.
>>>>
>>>> So you have:
>>>> BaseStage.emit(String branch, Object obj);
>>>> whereas I would conceptually see this as:
>>>> emit(String port, Object obj);
>>>> and you have:
>>>> Stage.process(Object obj);
>>>> whereas I would would conceptually see this as:
>>>> Stage.process(String port, Object obj);
>>>>
>>>> And when a pipeline is being assembled a downstream stage is
>>>> attached to a particular port of a stage, not the stage itself. It
>>>> then just recieves data sent to that particular port, but not the
>>>> other ports.
>>> I could see that this would work, but would need either modifying a
>>> number of stages already written, or maybe creating a compatibility
>>> stage driver that takes older style stages so that the input object
>>> comes from a configured port name, usually "input" and a sends the
>>> output to  configured output ports named "output" and whatever the
>>> previous branch name(s) were, if any. Stages that used to look for
>>> events for input should be rewritten to read multiple inputs (
>>> Stage.process(String port, Object obj) as you suggested). Events
>>> would then be reserved for truly out-of-band signals between stages
>>> rather than carrying data for processing.
>>
>> Agreed, I think with would be good. I think existing stages could be
>> made compatible by having a default input and output port, and to use
>> those if not specific port was specified.
>> A default in/out port would probably be necessary to allow simple
>> auto-wiring.
>>
>>>>
>>>> I'd love to hear how compatible the current system is with this way
>>>> of seeing things. Are we just talking about a new type of Stage
>>>> implementation, or a more fundamental incompatibility at the API level.
>>>>
>>> I think you have some good ideas. This is changing the Stage
>>> implementation, which affects on the order of 60 stages for us that
>>> override the process method, unless the compatibility stage driver
>>> works out. The top level pipeline would also be restructured. The
>>> amount of work required puts this out of the near term for me to work
>>> on it, but there may be other developers/contributors to take this on.
>>
>> I need to investigate more fully here, and consider the other options.
>> But potentially this is certainly of interest.
>>
>> So is all that's necessary to prototype this to create a new Stage
>> implementation, with new emit( ... ) and process( ... ) methods?
> I'm thinking it's more involved than that. To really deal well with the
> arbitrary number of downstream stages rather than just one means
> changing the digester rules
> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html>
> on specifying what follows. Normally a stage is connected to the
> preceding stage if it is listed in that order in the configuration file.
> This should be a default behavior, but if  stage2 and stage3 both follow
> stage1 then some notation of which is the previous stage is needed.
>
> stage1----stage2
>    |
>    |-----stage3
>
> might be set up as conf_pipe.xml:
> <pipeline>
>   ...
>   <stage className="com.demo.pipeline.stages.Stage1"
> driverFactoryId="df1" stageId="stage1"/>
>   <stage className="com.demo.pipeline.stages.Stage2"
> driverFactoryId="df1"/>
>   <stage className="com.demo.pipeline.stages.Stage3"
> driverFactoryId="df1" follows="stage1"/>
> </pipeline>
>
> I propose the 'follows="stage1"' attribute to connect stage3 to stage1
> instead of stage2 immediately preceding. This seems cleaner than setting
> up a branch and matching up branch key names between the branching stage
> and the secondary pipeline(s). Can you think of a cleaner way to
> configure this?

I think we're in danger of looking at this the wrong way. The XML should
reflect the underlying data model, not drive it. But to stick with this
paradigm I would think it might be best to explicity define the
connections in the model definition. Maybe something more like this:

<pipeline>
   ...
   <stage className="com.demo.pipeline.stages.Stage1"
        driverFactoryId="df1" stageId="stage1">
   </stage>
   <stage className="com.demo.pipeline.stages.Stage2"
        driverFactoryId="df1">
        <input stageId="stage1" outputPort="pass"/>
   </stage>
   <stage className="com.demo.pipeline.stages.Stage3"
        driverFactoryId="df1">
        <input stageId="stage1" outputPort="pass"/>
   </stage>
   <stage className="com.demo.pipeline.stages.Stage4"
        driverFactoryId="df1">
        <input stageId="stage1" outputPort="fail" inputPort="aPort"/>
   </stage>
</pipeline>

I think this would allow more flexibility, as:
1. a stage could define multiple inputs if it needed to.
2. each connection is explicity defined and could have extra attributes
added in future (e.g. a disable attribute to disable execution of that
part of the pipeline.
3. The concept of input can probably be generalised to include the
"feed", allowing multiple feeds to be used (as discussed earlier in this
thread). e.g. stage1 would also have an input that would be the feed.


>
> The Pipeline.java class will need to be modified to build and maintain a
> DAG structure rather than a linked list. The incoming data are managed
> by a queue in the stage driver, which would change to a group of queues,
> allowing multiple inputs (ports). I'm assuming there is an open source
> directed acyclic graph library out there that can replace the linked list.

If defined as I propose I'm not sure a specific graph library is
necessary. The model just comprises a set of stages that know how they
are connected. e.g. the connections are already implicit in the model.
But this probably needs more thought.



>
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

Ken Tanaka


Tim Dudgeon wrote:

> See comments below.
>
> Tim
>
> Ken Tanaka wrote:
>>
>>
>> Tim Dudgeon wrote:
>>> Ken Tanaka wrote:
>>>> Hi Tim,
>> ...
>>>>>
>>>> At present, the structure for storing stages is a linked list, and
>>>> branches are implemented as additional pipelines accessed by a name
>>>> through a HashMap. To generally handle branching and merging, a
>>>> directed acyclic graph (DAG) would better serve, but that would
>>>> require the pipeline code to be rewritten at this level. Arguments
>>>> could also be made for allowing cycles, as in directed graphs, but
>>>> that would be harder to debug, and with a GUI might be a step
>>>> toward a visual programming language--so I don't think this should
>>>> be pursued yet unless there are volunteers...
>>>>
>>>
>>> I agree, DAG would be better, but cycles could be needeed too, so DG
>>> would be better too.
>>> But, yes, I am ideally wanting visual designer too.
>>>
>> I'd like a visual designer too at some point, but that's a ways off
>> into the future.
>>>
>>>>>
>>>>> Taken together I can see a generalisation here using named ports
>>>>> (input and outut), which is similar, but not identical, to your
>>>>> current concept of branches.
>>>>>
>>>>> So you have:
>>>>> BaseStage.emit(String branch, Object obj);
>>>>> whereas I would conceptually see this as:
>>>>> emit(String port, Object obj);
>>>>> and you have:
>>>>> Stage.process(Object obj);
>>>>> whereas I would would conceptually see this as:
>>>>> Stage.process(String port, Object obj);
>>>>>
>>>>> And when a pipeline is being assembled a downstream stage is
>>>>> attached to a particular port of a stage, not the stage itself. It
>>>>> then just recieves data sent to that particular port, but not the
>>>>> other ports.
>>>> I could see that this would work, but would need either modifying a
>>>> number of stages already written, or maybe creating a compatibility
>>>> stage driver that takes older style stages so that the input object
>>>> comes from a configured port name, usually "input" and a sends the
>>>> output to  configured output ports named "output" and whatever the
>>>> previous branch name(s) were, if any. Stages that used to look for
>>>> events for input should be rewritten to read multiple inputs (
>>>> Stage.process(String port, Object obj) as you suggested). Events
>>>> would then be reserved for truly out-of-band signals between stages
>>>> rather than carrying data for processing.
>>>
>>> Agreed, I think with would be good. I think existing stages could be
>>> made compatible by having a default input and output port, and to
>>> use those if not specific port was specified.
>>> A default in/out port would probably be necessary to allow simple
>>> auto-wiring.
>>>
>>>>>
>>>>> I'd love to hear how compatible the current system is with this
>>>>> way of seeing things. Are we just talking about a new type of
>>>>> Stage implementation, or a more fundamental incompatibility at the
>>>>> API level.
>>>>>
>>>> I think you have some good ideas. This is changing the Stage
>>>> implementation, which affects on the order of 60 stages for us that
>>>> override the process method, unless the compatibility stage driver
>>>> works out. The top level pipeline would also be restructured. The
>>>> amount of work required puts this out of the near term for me to
>>>> work on it, but there may be other developers/contributors to take
>>>> this on.
>>>
>>> I need to investigate more fully here, and consider the other options.
>>> But potentially this is certainly of interest.
>>>
>>> So is all that's necessary to prototype this to create a new Stage
>>> implementation, with new emit( ... ) and process( ... ) methods?
>> I'm thinking it's more involved than that. To really deal well with
>> the arbitrary number of downstream stages rather than just one means
>> changing the digester rules
>> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html>
>> on specifying what follows. Normally a stage is connected to the
>> preceding stage if it is listed in that order in the configuration
>> file. This should be a default behavior, but if  stage2 and stage3
>> both follow stage1 then some notation of which is the previous stage
>> is needed.
>>
>> stage1----stage2
>>    |
>>    |-----stage3
>>
>> might be set up as conf_pipe.xml:
>> <pipeline>
>>   ...
>>   <stage className="com.demo.pipeline.stages.Stage1"
>> driverFactoryId="df1" stageId="stage1"/>
>>   <stage className="com.demo.pipeline.stages.Stage2"
>> driverFactoryId="df1"/>
>>   <stage className="com.demo.pipeline.stages.Stage3"
>> driverFactoryId="df1" follows="stage1"/>
>> </pipeline>
>>
>> I propose the 'follows="stage1"' attribute to connect stage3 to
>> stage1 instead of stage2 immediately preceding. This seems cleaner
>> than setting up a branch and matching up branch key names between the
>> branching stage and the secondary pipeline(s). Can you think of a
>> cleaner way to configure this?
>
> I think we're in danger of looking at this the wrong way. The XML
> should reflect the underlying data model, not drive it. But to stick
> with this paradigm I would think it might be best to explicity define
> the connections in the model definition. Maybe something more like this:
>
> <pipeline>
>   ...
>   <stage className="com.demo.pipeline.stages.Stage1"    
>     driverFactoryId="df1" stageId="stage1">
>   </stage>
>   <stage className="com.demo.pipeline.stages.Stage2"
>     driverFactoryId="df1">
>     <input stageId="stage1" outputPort="pass"/>
>   </stage>
Just to clarify, for Stage2, when you specify '<input stageId="stage1"
outputPort="pass"/>', 'outputPort="pass"' refers to an output port of
stage1 and named "pass" and is not specifying that the stage2 output
port is named "pass", right? So Stage1 has two output ports, named
"pass" and "fail", and this would be documented somewhere so you knew
what to connect to when you wrote the configuration XML?
>   <stage className="com.demo.pipeline.stages.Stage3"
>     driverFactoryId="df1">
>     <input stageId="stage1" outputPort="pass"/>
>   </stage>
>   <stage className="com.demo.pipeline.stages.Stage4"
>     driverFactoryId="df1">
>     <input stageId="stage1" outputPort="fail" inputPort="aPort"/>
>   </stage>
So here Stage4 has an input port named "aPort" and it is loaded from the
stage1 output port named "fail"?
> </pipeline>
>
> I think this would allow more flexibility, as:
> 1. a stage could define multiple inputs if it needed to.
If I understand you correctly, suppose there is a stage5 that has input
ports "aPort" and "bPort" that we would like to receive data from stage2
and stage3 ("pass" output port from both). Then it would be specified as
follows:

  <stage className="com.demo.pipeline.stages.Stage5"
    driverFactoryId="df1">
    <input stageId="stage2" outputPort="pass" inputPort="aPort"/>
    <input stageId="stage3" outputPort="pass" inputPort="bPort"/>
  </stage>

I also assume that Stage2 and Stage3 are given stageIds of "stage2" and
"stage3" respectively.

[stage1]------------>[stage2]------------>[stage5]
     |   pass->(in)           pass->aPort   ^
     |                                      |
     +-------------->[stage3]---------------+
     |   pass->(in)           pass->bPort
     |
     +-------------->[stage4]
         fail->aPort

> 2. each connection is explicity defined and could have extra
> attributes added in future (e.g. a disable attribute to disable
> execution of that part of the pipeline.
> 3. The concept of input can probably be generalised to include the
> "feed", allowing multiple feeds to be used (as discussed earlier in
> this thread). e.g. stage1 would also have an input that would be the
> feed.
>
Do you envision a stage with two inputs (aPort and bPort) waiting until
there are inputs on both before its stageDriver invokes the process
method? If stage5 needs two inputs, and stage2 provides 3 values and
stage3 provides 2 values, there are just 2 complete pairs of values. The
third value from stage2 could wait indefinitely for a matching input
from stage3. Currently stages run until their queue is empty, but with
multiple inputs that could be imbalanced, it might be better to set the
quit condition to any one queue is empty and all upstream stages claim
to be complete. Any non-empty queues on exit can trigger a warning.

>
>>
>> The Pipeline.java class will need to be modified to build and
>> maintain a DAG structure rather than a linked list. The incoming data
>> are managed by a queue in the stage driver, which would change to a
>> group of queues, allowing multiple inputs (ports). I'm assuming there
>> is an open source directed acyclic graph library out there that can
>> replace the linked list.
>
> If defined as I propose I'm not sure a specific graph library is
> necessary. The model just comprises a set of stages that know how they
> are connected. e.g. the connections are already implicit in the model.
> But this probably needs more thought.
>
Currently the linked list of stages is used for lazy initialization, to
find the next stage feeder the first time it is used. To allow general
connections, the downstream feeder link could become an array of
subsequent stageDrivers, with the connections set up as the pipeline is
built. In that case, then a DAG library would not be needed, and we
could keep the linked list as is.


-Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

tdudgeon
Ken Tanaka wrote:

>
>
> Tim Dudgeon wrote:
>> See comments below.
>>
>> Tim
>>
>> Ken Tanaka wrote:
>>>
>>>
>>> Tim Dudgeon wrote:
>>>> Ken Tanaka wrote:
>>>>> Hi Tim,
>>> ...
>>>>>>
>>>>> At present, the structure for storing stages is a linked list, and
>>>>> branches are implemented as additional pipelines accessed by a name
>>>>> through a HashMap. To generally handle branching and merging, a
>>>>> directed acyclic graph (DAG) would better serve, but that would
>>>>> require the pipeline code to be rewritten at this level. Arguments
>>>>> could also be made for allowing cycles, as in directed graphs, but
>>>>> that would be harder to debug, and with a GUI might be a step
>>>>> toward a visual programming language--so I don't think this should
>>>>> be pursued yet unless there are volunteers...
>>>>>
>>>>
>>>> I agree, DAG would be better, but cycles could be needeed too, so DG
>>>> would be better too.
>>>> But, yes, I am ideally wanting visual designer too.
>>>>
>>> I'd like a visual designer too at some point, but that's a ways off
>>> into the future.
>>>>
>>>>>>
>>>>>> Taken together I can see a generalisation here using named ports
>>>>>> (input and outut), which is similar, but not identical, to your
>>>>>> current concept of branches.
>>>>>>
>>>>>> So you have:
>>>>>> BaseStage.emit(String branch, Object obj);
>>>>>> whereas I would conceptually see this as:
>>>>>> emit(String port, Object obj);
>>>>>> and you have:
>>>>>> Stage.process(Object obj);
>>>>>> whereas I would would conceptually see this as:
>>>>>> Stage.process(String port, Object obj);
>>>>>>
>>>>>> And when a pipeline is being assembled a downstream stage is
>>>>>> attached to a particular port of a stage, not the stage itself. It
>>>>>> then just recieves data sent to that particular port, but not the
>>>>>> other ports.
>>>>> I could see that this would work, but would need either modifying a
>>>>> number of stages already written, or maybe creating a compatibility
>>>>> stage driver that takes older style stages so that the input object
>>>>> comes from a configured port name, usually "input" and a sends the
>>>>> output to  configured output ports named "output" and whatever the
>>>>> previous branch name(s) were, if any. Stages that used to look for
>>>>> events for input should be rewritten to read multiple inputs (
>>>>> Stage.process(String port, Object obj) as you suggested). Events
>>>>> would then be reserved for truly out-of-band signals between stages
>>>>> rather than carrying data for processing.
>>>>
>>>> Agreed, I think with would be good. I think existing stages could be
>>>> made compatible by having a default input and output port, and to
>>>> use those if not specific port was specified.
>>>> A default in/out port would probably be necessary to allow simple
>>>> auto-wiring.
>>>>
>>>>>>
>>>>>> I'd love to hear how compatible the current system is with this
>>>>>> way of seeing things. Are we just talking about a new type of
>>>>>> Stage implementation, or a more fundamental incompatibility at the
>>>>>> API level.
>>>>>>
>>>>> I think you have some good ideas. This is changing the Stage
>>>>> implementation, which affects on the order of 60 stages for us that
>>>>> override the process method, unless the compatibility stage driver
>>>>> works out. The top level pipeline would also be restructured. The
>>>>> amount of work required puts this out of the near term for me to
>>>>> work on it, but there may be other developers/contributors to take
>>>>> this on.
>>>>
>>>> I need to investigate more fully here, and consider the other options.
>>>> But potentially this is certainly of interest.
>>>>
>>>> So is all that's necessary to prototype this to create a new Stage
>>>> implementation, with new emit( ... ) and process( ... ) methods?
>>> I'm thinking it's more involved than that. To really deal well with
>>> the arbitrary number of downstream stages rather than just one means
>>> changing the digester rules
>>> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html>
>>> on specifying what follows. Normally a stage is connected to the
>>> preceding stage if it is listed in that order in the configuration
>>> file. This should be a default behavior, but if  stage2 and stage3
>>> both follow stage1 then some notation of which is the previous stage
>>> is needed.
>>>
>>> stage1----stage2
>>>    |
>>>    |-----stage3
>>>
>>> might be set up as conf_pipe.xml:
>>> <pipeline>
>>>   ...
>>>   <stage className="com.demo.pipeline.stages.Stage1"
>>> driverFactoryId="df1" stageId="stage1"/>
>>>   <stage className="com.demo.pipeline.stages.Stage2"
>>> driverFactoryId="df1"/>
>>>   <stage className="com.demo.pipeline.stages.Stage3"
>>> driverFactoryId="df1" follows="stage1"/>
>>> </pipeline>
>>>
>>> I propose the 'follows="stage1"' attribute to connect stage3 to
>>> stage1 instead of stage2 immediately preceding. This seems cleaner
>>> than setting up a branch and matching up branch key names between the
>>> branching stage and the secondary pipeline(s). Can you think of a
>>> cleaner way to configure this?
>>
>> I think we're in danger of looking at this the wrong way. The XML
>> should reflect the underlying data model, not drive it. But to stick
>> with this paradigm I would think it might be best to explicity define
>> the connections in the model definition. Maybe something more like this:
>>
>> <pipeline>
>>   ...
>>   <stage className="com.demo.pipeline.stages.Stage1"        
>> driverFactoryId="df1" stageId="stage1">
>>   </stage>
>>   <stage className="com.demo.pipeline.stages.Stage2"
>>     driverFactoryId="df1">
>>     <input stageId="stage1" outputPort="pass"/>
>>   </stage>
> Just to clarify, for Stage2, when you specify '<input stageId="stage1"
> outputPort="pass"/>', 'outputPort="pass"' refers to an output port of
> stage1 and named "pass" and is not specifying that the stage2 output
> port is named "pass", right? So Stage1 has two output ports, named
> "pass" and "fail", and this would be documented somewhere so you knew
> what to connect to when you wrote the configuration XML?

Yes. That's the idea. Maybe different names might help here.


>>   <stage className="com.demo.pipeline.stages.Stage3"
>>     driverFactoryId="df1">
>>     <input stageId="stage1" outputPort="pass"/>
>>   </stage>
>>   <stage className="com.demo.pipeline.stages.Stage4"
>>     driverFactoryId="df1">
>>     <input stageId="stage1" outputPort="fail" inputPort="aPort"/>
>>   </stage>
> So here Stage4 has an input port named "aPort" and it is loaded from the
> stage1 output port named "fail"?


Correct again.

>> </pipeline>
>>
>> I think this would allow more flexibility, as:
>> 1. a stage could define multiple inputs if it needed to.
> If I understand you correctly, suppose there is a stage5 that has input
> ports "aPort" and "bPort" that we would like to receive data from stage2
> and stage3 ("pass" output port from both). Then it would be specified as
> follows:
>
>  <stage className="com.demo.pipeline.stages.Stage5"
>    driverFactoryId="df1">
>    <input stageId="stage2" outputPort="pass" inputPort="aPort"/>
>    <input stageId="stage3" outputPort="pass" inputPort="bPort"/>
>  </stage>
>

Correct again.

> I also assume that Stage2 and Stage3 are given stageIds of "stage2" and
> "stage3" respectively.
>
> [stage1]------------>[stage2]------------>[stage5]
>     |   pass->(in)           pass->aPort   ^
>     |                                      |
>     +-------------->[stage3]---------------+
>     |   pass->(in)           pass->bPort
>     |
>     +-------------->[stage4]
>         fail->aPort
>

That's the idea. Its obviously a bit on an extreme case as it uses
branching and merging, but it covers what I think is needed.


>> 2. each connection is explicity defined and could have extra
>> attributes added in future (e.g. a disable attribute to disable
>> execution of that part of the pipeline.
>> 3. The concept of input can probably be generalised to include the
>> "feed", allowing multiple feeds to be used (as discussed earlier in
>> this thread). e.g. stage1 would also have an input that would be the
>> feed.
>>
> Do you envision a stage with two inputs (aPort and bPort) waiting until
> there are inputs on both before its stageDriver invokes the process
> method? If stage5 needs two inputs, and stage2 provides 3 values and
> stage3 provides 2 values, there are just 2 complete pairs of values. The
> third value from stage2 could wait indefinitely for a matching input
> from stage3. Currently stages run until their queue is empty, but with
> multiple inputs that could be imbalanced, it might be better to set the
> quit condition to any one queue is empty and all upstream stages claim
> to be complete. Any non-empty queues on exit can trigger a warning.

Yes, there are multiple scenarios here. e.g.
1. pairwise processing.
a1 + b1
a2 + b2
a3 + b3
...

2. combinatorial processing
a1 + b1
a1 + b2
a1 + b3
a2 + b1
a2 + b2
....

(maybe others too)


Clearly 1's inputs has to be matched, and if they are not some exit
rules would be required.
Also, as you have mentioned before there are potential memory issues
with combinatorial processing of large sets of data, but I think
solutions can be found for this.


Tim


>>
>>>
>>> The Pipeline.java class will need to be modified to build and
>>> maintain a DAG structure rather than a linked list. The incoming data
>>> are managed by a queue in the stage driver, which would change to a
>>> group of queues, allowing multiple inputs (ports). I'm assuming there
>>> is an open source directed acyclic graph library out there that can
>>> replace the linked list.
>>
>> If defined as I propose I'm not sure a specific graph library is
>> necessary. The model just comprises a set of stages that know how they
>> are connected. e.g. the connections are already implicit in the model.
>> But this probably needs more thought.
>>
> Currently the linked list of stages is used for lazy initialization, to
> find the next stage feeder the first time it is used. To allow general
> connections, the downstream feeder link could become an array of
> subsequent stageDrivers, with the connections set up as the pipeline is
> built. In that case, then a DAG library would not be needed, and we
> could keep the linked list as is.
>
>
> -Ken


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [PIPELINE] Questions about pipeline

Ken Tanaka
Thanks for confirming the examples. I figure that fleshing this out now
should speed development at a future time.

Tim Dudgeon wrote:
> Ken Tanaka wrote:
>>
>>
>> Tim Dudgeon wrote:
>>> See comments below.
>>>
>>> Tim
...

>
>
>>> 2. each connection is explicity defined and could have extra
>>> attributes added in future (e.g. a disable attribute to disable
>>> execution of that part of the pipeline.
>>> 3. The concept of input can probably be generalised to include the
>>> "feed", allowing multiple feeds to be used (as discussed earlier in
>>> this thread). e.g. stage1 would also have an input that would be the
>>> feed.
>>>
>> Do you envision a stage with two inputs (aPort and bPort) waiting
>> until there are inputs on both before its stageDriver invokes the
>> process method? If stage5 needs two inputs, and stage2 provides 3
>> values and stage3 provides 2 values, there are just 2 complete pairs
>> of values. The third value from stage2 could wait indefinitely for a
>> matching input from stage3. Currently stages run until their queue is
>> empty, but with multiple inputs that could be imbalanced, it might be
>> better to set the quit condition to any one queue is empty and all
>> upstream stages claim to be complete. Any non-empty queues on exit
>> can trigger a warning.
>
> Yes, there are multiple scenarios here. e.g.
> 1. pairwise processing.
> a1 + b1
> a2 + b2
> a3 + b3
> ...
>
> 2. combinatorial processing
> a1 + b1
> a1 + b2
> a1 + b3
> a2 + b1
> a2 + b2
> ....
>
> (maybe others too)
>
>
> Clearly 1's inputs has to be matched, and if they are not some exit
> rules would be required.
> Also, as you have mentioned before there are potential memory issues
> with combinatorial processing of large sets of data, but I think
> solutions can be found for this.
>
In general I prefer to stick to scenario 1. Any deviation can be handled
by making a stage behave differently rather than putting this capability
into the framework. A stage can be written to do nothing but accept
pairwise arguments and emit combinatorial output, it can then be
inserted before any stage to which you want to add combinatorial input
processing.

Ken

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]