Hi,
I am playing with pipeline, and I have some questions regarding to usage. What is the suggested method of avoiding "traffic-jam" in the pipe? I mean, when a stage produces results fast, followed by a long-running stage. I will run out of stack space in large amount of input data. Currently, my solution uses the context raise/registerListener methods. The slow stage notifies it's "feeder" stage, that new data can be processed. Is there any better ideas for this problem? My other issue is about branches. Is there a way to attach separated branches together? For example, a stage needs input from two different branches. Is there any solution to apply a synchronized data flow? (Lets say I have two branches, one produces A-s, and the other produces B-s. I want a stage, that is being fed by those two branches, and produces a sequence of ABABAB...) Is there an implementation for such behaviour? Finally, I am interested about the various StageDrivers. I'd like some more detailed informations then the API. Especially usage advices, samples, to help choose the best stagedriver for the certain stages. Thanks in advance for your time, Istvan Cseh ______________________________________________________________________ Olcsó repülőjegyet mindenkinek! Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is. repulojegy.budavartours.hu --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Hello Istvan,
I recommend limiting the input queue size that is used by the stage driver. The DedicatedThreadStageDriverFactory uses a blocking queue. If you are configuring your pipeline with a Digester configuration file the XML would look like this (look for the word "capacity" near the end): <?xml version="1.0" encoding="UTF-8"?> <!-- Document : conf_examplepipeline.xml Description: Configuration file for data loading pipeline --> <pipeline> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df1" faultToleranceLevel="ALL"/> <!-- Find the data file(s), surveyData.YYYY-MM-DD. ** FAST STAGE ** Input: starting directory Output: Data File object --> <!-- ================================================================== --> <stage className="org.apache.commons.pipeline.stage.FileFinderStage" filePattern="surveyData.*" driverFactoryId="df1" /> <feed> <value>/data/prod/ingestNow</value> </feed> <!-- Unpack the data file. Each input file becomes a bunch of data bean objects. ** FAST STAGE ** Input: Data File object Output: Data point object --> <!-- ================================================================== --> <stage className="gov.noaa.eds.SurveyReaderStage" driverFactoryId="df1" /> <!-- Write the data beans to the database ** SLOW STAGE - Set ArrayBlockingQueueFactory capacity to 10 ** The queueFactory must be a blocking type that implements the java.util.concurrent.BlockingQueue interface. Input: Data point object Output: Data point object --> <!-- ================================================================== --> <stage className="gov.noaa.eds.WriteSurveyToDatabaseStage" driverFactoryId="df1" > <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory" capacity="10" fair="false"/> </stage> </pipeline> Alternatively, you could set up the driver factory with the same property so that all stages have the same input queue size: <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df1" faultToleranceLevel="ALL"> <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory" capacity="10" fair="false"/> </driverFactory> Aquator wrote: > Hi, > > I am playing with pipeline, and I have some questions regarding to usage. > > What is the suggested method of avoiding "traffic-jam" in the pipe? I mean, when a stage produces results fast, followed by a long-running stage. I will run out of stack space in large amount of input data. > > Currently, my solution uses the context raise/registerListener methods. The slow stage notifies it's "feeder" stage, that new data can be processed. Is there any better ideas for this problem? > > My other issue is about branches. Is there a way to attach separated branches together? For example, a stage needs input from two different branches. Is there any solution to apply a synchronized data flow? (Lets say I have two branches, one produces A-s, and the other produces B-s. I want a stage, that is being fed by those two branches, and produces a sequence of ABABAB...) Is there an implementation for such behaviour? > > the stages on a branch of pipeline processing end up have the same conditional logic at the beginning to sort out what to do with the incoming objects. If you do need to combine information back into another branch, you should do it by raising events, which can bring along with them the data transport beans you wish combine. The receiving stage needs to register a listener to bring those in. I have some documentation on this, but it's not publicly available yet. If you want I can email you the HTML directly. > Finally, I am interested about the various StageDrivers. I'd like some more detailed informations > then the API. Especially usage advices, samples, to help choose the best stagedriver for the certain stages. > > Thanks in advance for your time, > Istvan Cseh > > ______________________________________________________________________ > Olcsó repülőjegyet mindenkinek! > Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is. > repulojegy.budavartours.hu > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [hidden email] > For additional commands, e-mail: [hidden email] > > --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
In reply to this post by Aquator
I created some documentation for some Pipeline concepts. The following
links are to a self-signed server, so you will have to allow a security exception to view them. https://www.ngdc.noaa.gov/wiki/index.php?title=PipelineBasics https://www.ngdc.noaa.gov/wiki/index.php?title=QnD_Pipeline_Events -Ken Aquator wrote: > Hi, > > I am playing with pipeline, and I have some questions regarding to usage. > > > ... > Finally, I am interested about the various StageDrivers. I'd like some more detailed informations > then the API. Especially usage advices, samples, to help choose the best stagedriver for the certain stages. > > Thanks in advance for your time, > Istvan Cseh > --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
In reply to this post by Aquator
The Pipeline Basics tutorial has now been incorporated into the project
page. Thanks to some help and cleanup from Rahul Akolkar the documentation submitted was installed quickly. See http://commons.apache.org/sandbox/pipeline/pipeline_basics.html -Ken Aquator wrote: > Hi, > > I am playing with pipeline, and I have some questions regarding to usage. > > What is the suggested method of avoiding "traffic-jam" in the pipe? I mean, when a stage produces results fast, followed by a long-running stage. I will run out of stack space in large amount of input data. > > Currently, my solution uses the context raise/registerListener methods. The slow stage notifies it's "feeder" stage, that new data can be processed. Is there any better ideas for this problem? > > My other issue is about branches. Is there a way to attach separated branches together? For example, a stage needs input from two different branches. Is there any solution to apply a synchronized data flow? (Lets say I have two branches, one produces A-s, and the other produces B-s. I want a stage, that is being fed by those two branches, and produces a sequence of ABABAB...) Is there an implementation for such behaviour? > > Finally, I am interested about the various StageDrivers. I'd like some more detailed informations > then the API. Especially usage advices, samples, to help choose the best stagedriver for the certain stages. > > Thanks in advance for your time, > Istvan Cseh > > ______________________________________________________________________ > Olcsó repülőjegyet mindenkinek! > Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is. > repulojegy.budavartours.hu > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [hidden email] > For additional commands, e-mail: [hidden email] > > --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Ken Tanaka wrote:
> The Pipeline Basics tutorial has now been incorporated into the project > page. Thanks to some help and cleanup from Rahul Akolkar the > documentation submitted was installed quickly. See > > http://commons.apache.org/sandbox/pipeline/pipeline_basics.html > > -Ken > That documentation is really useful. Thanks! Could I follow up one of the earlier questions in this thread on branching and merging. From those docs it looks to me like the way data was set to a branch is a bit strange. There appears to be a FileReaderStage class that has Java bean property called htmlPipelineKey: <stage className="com.demo.pipeline.stages.FileReaderStage" driverFactoryId="df1" htmlPipelineKey="sales2html"/> and later in the pipeline a branch is defined that names the pipeline according to that name: <pipeline key="sales2html"> This seems pretty inflexible to me. Any branches have to be hardcoded into the stage definition. I was expecting a situation where multiple stages could be the recipients of the output of any stage, and these can be "wired up" dynamically. e.g. something like this: |--stage2 | stage1---+--stage3 | |--stage4 so that all you needed to do was to define a stage5 as one more downstream stage for stage 1 and it would transparently receive the data. Is this possible, or does the branching have to be hard-coded into the stage definition? Similarly for merging. To follow up the previous question, let say I had stageA that output some A's and stage B that output some B's (lets assume both A's and B's are simple numbers). Now I wanted to have a stageC that takes all A's and all B's and generates some output with the, (lets assume the output is A * B so that every combination of A * B is output). So this would look like this: stageA--+ | |----stageC | stageB--+ Is it possble to do this, so that stageA and stageB are both writing to stageC, but that stageC can distinguish the 2 different streams of data? Many thanks. Tim --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Tim Dudgeon wrote: > Ken Tanaka wrote: >> The Pipeline Basics tutorial has now been incorporated into the >> project page. Thanks to some help and cleanup from Rahul Akolkar the >> documentation submitted was installed quickly. See >> >> http://commons.apache.org/sandbox/pipeline/pipeline_basics.html >> >> -Ken >> > > That documentation is really useful. Thanks! > documentation some. I hope people realize that some of the color-coded examples got some inadvertent newlines added--but this isn't relevant to your questions. > Could I follow up one of the earlier questions in this thread on > branching and merging. > > > From those docs it looks to me like the way data was set to a branch > is a bit strange. There appears to be a FileReaderStage class that has > Java bean property called htmlPipelineKey: > <stage className="com.demo.pipeline.stages.FileReaderStage" > driverFactoryId="df1" htmlPipelineKey="sales2html"/> > > and later in the pipeline a branch is defined that names the pipeline > according to that name: > <pipeline key="sales2html"> > > This seems pretty inflexible to me. Any branches have to be hardcoded > into the stage definition. I was expecting a situation where multiple > stages could be the recipients of the output of any stage, and these > can be "wired up" dynamically. e.g. something like this: > > > |--stage2 > | > stage1---+--stage3 > | > |--stage4 > > so that all you needed to do was to define a stage5 as one more > downstream stage for stage 1 and it would transparently receive the data. > > Is this possible, or does the branching have to be hard-coded into the > stage definition? xml file here is a configuration file. For our current use, branches are pretty rare, so the pipeline framework deals best with simple cases that are fairly linear. Also, if stage1 is a branching stage, then that stage was written with branching in mind, and the "htmlPipelineKey" is a hard-coded property name in the stage source code, so it can direct output when it passes data out to the framework. To simplify matters, all your branching stages could follow a convention of using "branchKey" (or some other generic name), then you wouldn't have to remember what variable holds the branch name for which stage. A stage could be written to take an arbitrary number of branch names, and thus send output down multiple branches, although it can get complicated configuring rules on what goes where if the same thing isn't going to all the branches. So rather than making stage1 a branching stage, it could be followed by "stageMulti", which would send copies of it's input to a number of outputs: |-----stage2 | stage1----stageMulti----stage3 | |-----stage4 stageMulti could then be used to add branching to any stage it follows. I can imagine making configuration files a little simpler with regards to setting up branching, but the more intelligent configuration file reader to handle that hasn't been written. > > > Similarly for merging. To follow up the previous question, let say I > had stageA that output some A's and stage B that output some B's (lets > assume both A's and B's are simple numbers). Now I wanted to have a > stageC that takes all A's and all B's and generates some output with > the, (lets assume the output is A * B so that every combination of A * > B is output). So this would look like this: > > stageA--+ > | > |----stageC > | > stageB--+ > > Is it possble to do this, so that stageA and stageB are both writing > to stageC, but that stageC can distinguish the 2 different streams of > data? > > stage, to accept feed values out of the config file (or place command line arguments into the first stage queue if the main pipeline application was been written to do that). So maybe you have a stageInit which takes a single number like "3" feed "3" --> stageInit----stageA | ----------stageB stageInit can then pass "3" on to stageA and stageB, possibly causing stageA to create 3 2-digit numbers and stageB to create 3 3-digit numbers. For merging, stageC will accept normal input from a stage as well as watch for events carrying additional data. stageC may well have to accumulate input and then produce output as events are received. Stages normally accept one input, which is either a feed or the output of the stage immediately preceding them. Input from elsewhere or from more than one source is currently handled as events raised by the source and received by a "notify" method in the receiving stage. feed "3" --> stageInit----stageA-------------stageC --> 10*111, 10*222, 10*333, 20*111, 20*222, 20*333, 30*111.... | 3 10, 20, 30 : ----------stageB................: 3 111, 222, 333 ---- normal data flow .... event passed data Like branching, for our uses merging is rare. Also beware of running out of memory if you are doing any accumulation of data to merge input from more than one stage. -Ken --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Hi Ken,
Thanks for the rapid response. First, let me explain some background here. I am looking for Java based pipelining solutions to incorporate into an exisiting application. The use of pipelining is well established in the sector, with applications like Pipeline Pilot and Knime, and so many of the common needs have been well established over several years by these applciations. Key issues that my initial investigations of Jakarta Pipeline seem to identify are: 1. Branching is very common. This typically takes 2 forms: 1.1. Splitting data. A stage could (for instance) have 2 output ports, "pass" and "fail". Data is processed by the stage and sent to whichever port is appropriate. Different stages would be attached to each port, resulting in the pipeline being brached by this pass/fail decision. 1.2. Attaching multiple stages to a particular output port. The stage just sends its output onwards. It has no interest in what happens once the data is sent, and is not concerned whether zero, one or 100 stages receive the output. This is the stage1,2,3,4 scenario I outlined previously. 2. Merging is also common (though less common than branching). By analogy with braching, I would see this conceptually as a stage having multiple input ports (A and B in the merging example). Taken together I can see a generalisation here using named ports (input and outut), which is similar, but not identical, to your current concept of branches. So you have: BaseStage.emit(String branch, Object obj); whereas I would conceptually see this as: emit(String port, Object obj); and you have: Stage.process(Object obj); whereas I would would conceptually see this as: Stage.process(String port, Object obj); And when a pipeline is being assembled a downstream stage is attached to a particular port of a stage, not the stage itself. It then just recieves data sent to that particular port, but not the other ports. I'd love to hear how compatible the current system is with this way of seeing things. Are we just talking about a new type of Stage implementation, or a more fundamental incompatibility at the API level. Many thanks. Tim Ken Tanaka wrote: > > > Tim Dudgeon wrote: >> Ken Tanaka wrote: >>> The Pipeline Basics tutorial has now been incorporated into the >>> project page. Thanks to some help and cleanup from Rahul Akolkar the >>> documentation submitted was installed quickly. See >>> >>> http://commons.apache.org/sandbox/pipeline/pipeline_basics.html >>> >>> -Ken >>> >> >> That documentation is really useful. Thanks! >> > Wow, someone is actually looking at this. I'll work on cleaning up the > documentation some. I hope people realize that some of the color-coded > examples got some inadvertent newlines added--but this isn't relevant to > your questions. >> Could I follow up one of the earlier questions in this thread on >> branching and merging. >> >> >> From those docs it looks to me like the way data was set to a branch >> is a bit strange. There appears to be a FileReaderStage class that has >> Java bean property called htmlPipelineKey: >> <stage className="com.demo.pipeline.stages.FileReaderStage" >> driverFactoryId="df1" htmlPipelineKey="sales2html"/> >> >> and later in the pipeline a branch is defined that names the pipeline >> according to that name: >> <pipeline key="sales2html"> >> >> This seems pretty inflexible to me. Any branches have to be hardcoded >> into the stage definition. I was expecting a situation where multiple >> stages could be the recipients of the output of any stage, and these >> can be "wired up" dynamically. e.g. something like this: >> >> >> |--stage2 >> | >> stage1---+--stage3 >> | >> |--stage4 >> >> so that all you needed to do was to define a stage5 as one more >> downstream stage for stage 1 and it would transparently receive the data. >> >> Is this possible, or does the branching have to be hard-coded into the >> stage definition? > I wouldn't call the way branches are specified "hard coding", since the > xml file here is a configuration file. For our current use, branches are > pretty rare, so the pipeline framework deals best with simple cases that > are fairly linear. Also, if stage1 is a branching stage, then that stage > was written with branching in mind, and the "htmlPipelineKey" is a > hard-coded property name in the stage source code, so it can direct > output when it passes data out to the framework. To simplify matters, > all your branching stages could follow a convention of using "branchKey" > (or some other generic name), then you wouldn't have to remember what > variable holds the branch name for which stage. > > A stage could be written to take an arbitrary number of branch names, > and thus send output down multiple branches, although it can get > complicated configuring rules on what goes where if the same thing isn't > going to all the branches. So rather than making stage1 a branching > stage, it could be followed by "stageMulti", which would send copies of > it's input to a number of outputs: > > |-----stage2 > | > stage1----stageMulti----stage3 > | > |-----stage4 > > stageMulti could then be used to add branching to any stage it follows. > > I can imagine making configuration files a little simpler with regards > to setting up branching, but the more intelligent configuration file > reader to handle that hasn't been written. >> >> >> Similarly for merging. To follow up the previous question, let say I >> had stageA that output some A's and stage B that output some B's (lets >> assume both A's and B's are simple numbers). Now I wanted to have a >> stageC that takes all A's and all B's and generates some output with >> the, (lets assume the output is A * B so that every combination of A * >> B is output). So this would look like this: >> >> stageA--+ >> | >> |----stageC >> | >> stageB--+ >> >> Is it possble to do this, so that stageA and stageB are both writing >> to stageC, but that stageC can distinguish the 2 different streams of >> data? >> >> > First off, the current design expects all pipelines to start with one > stage, to accept feed values out of the config file (or place command > line arguments into the first stage queue if the main pipeline > application was been written to do that). So maybe you have a stageInit > which takes a single number like "3" > > feed "3" --> stageInit----stageA > | > ----------stageB > > stageInit can then pass "3" on to stageA and stageB, possibly causing > stageA to create 3 2-digit numbers and stageB to create 3 3-digit numbers. > > For merging, stageC will accept normal input from a stage as well as > watch for events carrying additional data. stageC may well have to > accumulate input and then produce output as events are received. Stages > normally accept one input, which is either a feed or the output of the > stage immediately preceding them. Input from elsewhere or from more than > one source is currently handled as events raised by the source and > received by a "notify" method in the receiving stage. > > feed "3" --> stageInit----stageA-------------stageC --> 10*111, 10*222, > 10*333, 20*111, 20*222, 20*333, 30*111.... > | 3 10, 20, 30 : > ----------stageB................: > 3 111, 222, 333 > ---- normal data flow > .... event passed data > > Like branching, for our uses merging is rare. Also beware of running out > of memory if you are doing any accumulation of data to merge input from > more than one stage. > > -Ken --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Hi Tim,
Tim Dudgeon wrote: > Hi Ken, > > Thanks for the rapid response. > First, let me explain some background here. > I am looking for Java based pipelining solutions to incorporate into > an exisiting application. The use of pipelining is well established in > the sector, with applications like Pipeline Pilot and Knime, and so > many of the common needs have been well established over several years > by these applciations. Have you also looked at Pentaho? > > Key issues that my initial investigations of Jakarta Pipeline seem to > identify are: > > 1. Branching is very common. This typically takes 2 forms: > 1.1. Splitting data. A stage could (for instance) have 2 output ports, > "pass" and "fail". Data is processed by the stage and sent to > whichever port is appropriate. Different stages would be attached to > each port, resulting in the pipeline being brached by this pass/fail > decision. > 1.2. Attaching multiple stages to a particular output port. > The stage just sends its output onwards. It has no interest in what > happens once the data is sent, and is not concerned whether zero, one > or 100 stages receive the output. This is the stage1,2,3,4 scenario I > outlined previously. > > 2. Merging is also common (though less common than branching). > By analogy with braching, I would see this conceptually as a stage > having multiple input ports (A and B in the merging example). > branches are implemented as additional pipelines accessed by a name through a HashMap. To generally handle branching and merging, a directed acyclic graph (DAG) would better serve, but that would require the pipeline code to be rewritten at this level. Arguments could also be made for allowing cycles, as in directed graphs, but that would be harder to debug, and with a GUI might be a step toward a visual programming language--so I don't think this should be pursued yet unless there are volunteers... > > Taken together I can see a generalisation here using named ports > (input and outut), which is similar, but not identical, to your > current concept of branches. > > So you have: > BaseStage.emit(String branch, Object obj); > whereas I would conceptually see this as: > emit(String port, Object obj); > and you have: > Stage.process(Object obj); > whereas I would would conceptually see this as: > Stage.process(String port, Object obj); > > And when a pipeline is being assembled a downstream stage is attached > to a particular port of a stage, not the stage itself. It then just > recieves data sent to that particular port, but not the other ports. number of stages already written, or maybe creating a compatibility stage driver that takes older style stages so that the input object comes from a configured port name, usually "input" and a sends the output to configured output ports named "output" and whatever the previous branch name(s) were, if any. Stages that used to look for events for input should be rewritten to read multiple inputs ( Stage.process(String port, Object obj) as you suggested). Events would then be reserved for truly out-of-band signals between stages rather than carrying data for processing. > > I'd love to hear how compatible the current system is with this way of > seeing things. Are we just talking about a new type of Stage > implementation, or a more fundamental incompatibility at the API level. > I think you have some good ideas. This is changing the Stage implementation, which affects on the order of 60 stages for us that override the process method, unless the compatibility stage driver works out. The top level pipeline would also be restructured. The amount of work required puts this out of the near term for me to work on it, but there may be other developers/contributors to take this on. -Ken --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Ken Tanaka wrote:
> Hi Tim, > > Tim Dudgeon wrote: >> Hi Ken, >> >> Thanks for the rapid response. >> First, let me explain some background here. >> I am looking for Java based pipelining solutions to incorporate into >> an exisiting application. The use of pipelining is well established in >> the sector, with applications like Pipeline Pilot and Knime, and so >> many of the common needs have been well established over several years >> by these applciations. > Have you also looked at Pentaho? >> >> Key issues that my initial investigations of Jakarta Pipeline seem to >> identify are: >> >> 1. Branching is very common. This typically takes 2 forms: >> 1.1. Splitting data. A stage could (for instance) have 2 output ports, >> "pass" and "fail". Data is processed by the stage and sent to >> whichever port is appropriate. Different stages would be attached to >> each port, resulting in the pipeline being brached by this pass/fail >> decision. >> 1.2. Attaching multiple stages to a particular output port. >> The stage just sends its output onwards. It has no interest in what >> happens once the data is sent, and is not concerned whether zero, one >> or 100 stages receive the output. This is the stage1,2,3,4 scenario I >> outlined previously. >> >> 2. Merging is also common (though less common than branching). >> By analogy with braching, I would see this conceptually as a stage >> having multiple input ports (A and B in the merging example). >> > At present, the structure for storing stages is a linked list, and > branches are implemented as additional pipelines accessed by a name > through a HashMap. To generally handle branching and merging, a directed > acyclic graph (DAG) would better serve, but that would require the > pipeline code to be rewritten at this level. Arguments could also be > made for allowing cycles, as in directed graphs, but that would be > harder to debug, and with a GUI might be a step toward a visual > programming language--so I don't think this should be pursued yet unless > there are volunteers... > I agree, DAG would be better, but cycles could be needeed too, so DG would be better too. But, yes, I am ideally wanting visual designer too. >> >> Taken together I can see a generalisation here using named ports >> (input and outut), which is similar, but not identical, to your >> current concept of branches. >> >> So you have: >> BaseStage.emit(String branch, Object obj); >> whereas I would conceptually see this as: >> emit(String port, Object obj); >> and you have: >> Stage.process(Object obj); >> whereas I would would conceptually see this as: >> Stage.process(String port, Object obj); >> >> And when a pipeline is being assembled a downstream stage is attached >> to a particular port of a stage, not the stage itself. It then just >> recieves data sent to that particular port, but not the other ports. > I could see that this would work, but would need either modifying a > number of stages already written, or maybe creating a compatibility > stage driver that takes older style stages so that the input object > comes from a configured port name, usually "input" and a sends the > output to configured output ports named "output" and whatever the > previous branch name(s) were, if any. Stages that used to look for > events for input should be rewritten to read multiple inputs ( > Stage.process(String port, Object obj) as you suggested). Events would > then be reserved for truly out-of-band signals between stages rather > than carrying data for processing. Agreed, I think with would be good. I think existing stages could be made compatible by having a default input and output port, and to use those if not specific port was specified. A default in/out port would probably be necessary to allow simple auto-wiring. >> >> I'd love to hear how compatible the current system is with this way of >> seeing things. Are we just talking about a new type of Stage >> implementation, or a more fundamental incompatibility at the API level. >> > I think you have some good ideas. This is changing the Stage > implementation, which affects on the order of 60 stages for us that > override the process method, unless the compatibility stage driver works > out. The top level pipeline would also be restructured. The amount of > work required puts this out of the near term for me to work on it, but > there may be other developers/contributors to take this on. I need to investigate more fully here, and consider the other options. But potentially this is certainly of interest. So is all that's necessary to prototype this to create a new Stage implementation, with new emit( ... ) and process( ... ) methods? Thanks Tim > > -Ken --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Tim Dudgeon wrote: > Ken Tanaka wrote: >> Hi Tim, ... >>> >> At present, the structure for storing stages is a linked list, and >> branches are implemented as additional pipelines accessed by a name >> through a HashMap. To generally handle branching and merging, a >> directed acyclic graph (DAG) would better serve, but that would >> require the pipeline code to be rewritten at this level. Arguments >> could also be made for allowing cycles, as in directed graphs, but >> that would be harder to debug, and with a GUI might be a step toward >> a visual programming language--so I don't think this should be >> pursued yet unless there are volunteers... >> > > I agree, DAG would be better, but cycles could be needeed too, so DG > would be better too. > But, yes, I am ideally wanting visual designer too. > the future. > >>> >>> Taken together I can see a generalisation here using named ports >>> (input and outut), which is similar, but not identical, to your >>> current concept of branches. >>> >>> So you have: >>> BaseStage.emit(String branch, Object obj); >>> whereas I would conceptually see this as: >>> emit(String port, Object obj); >>> and you have: >>> Stage.process(Object obj); >>> whereas I would would conceptually see this as: >>> Stage.process(String port, Object obj); >>> >>> And when a pipeline is being assembled a downstream stage is >>> attached to a particular port of a stage, not the stage itself. It >>> then just recieves data sent to that particular port, but not the >>> other ports. >> I could see that this would work, but would need either modifying a >> number of stages already written, or maybe creating a compatibility >> stage driver that takes older style stages so that the input object >> comes from a configured port name, usually "input" and a sends the >> output to configured output ports named "output" and whatever the >> previous branch name(s) were, if any. Stages that used to look for >> events for input should be rewritten to read multiple inputs ( >> Stage.process(String port, Object obj) as you suggested). Events >> would then be reserved for truly out-of-band signals between stages >> rather than carrying data for processing. > > Agreed, I think with would be good. I think existing stages could be > made compatible by having a default input and output port, and to use > those if not specific port was specified. > A default in/out port would probably be necessary to allow simple > auto-wiring. > >>> >>> I'd love to hear how compatible the current system is with this way >>> of seeing things. Are we just talking about a new type of Stage >>> implementation, or a more fundamental incompatibility at the API level. >>> >> I think you have some good ideas. This is changing the Stage >> implementation, which affects on the order of 60 stages for us that >> override the process method, unless the compatibility stage driver >> works out. The top level pipeline would also be restructured. The >> amount of work required puts this out of the near term for me to work >> on it, but there may be other developers/contributors to take this on. > > I need to investigate more fully here, and consider the other options. > But potentially this is certainly of interest. > > So is all that's necessary to prototype this to create a new Stage > implementation, with new emit( ... ) and process( ... ) methods? arbitrary number of downstream stages rather than just one means changing the digester rules <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> on specifying what follows. Normally a stage is connected to the preceding stage if it is listed in that order in the configuration file. This should be a default behavior, but if stage2 and stage3 both follow stage1 then some notation of which is the previous stage is needed. stage1----stage2 | |-----stage3 might be set up as conf_pipe.xml: <pipeline> ... <stage className="com.demo.pipeline.stages.Stage1" driverFactoryId="df1" stageId="stage1"/> <stage className="com.demo.pipeline.stages.Stage2" driverFactoryId="df1"/> <stage className="com.demo.pipeline.stages.Stage3" driverFactoryId="df1" follows="stage1"/> </pipeline> I propose the 'follows="stage1"' attribute to connect stage3 to stage1 instead of stage2 immediately preceding. This seems cleaner than setting up a branch and matching up branch key names between the branching stage and the secondary pipeline(s). Can you think of a cleaner way to configure this? The Pipeline.java class will need to be modified to build and maintain a DAG structure rather than a linked list. The incoming data are managed by a queue in the stage driver, which would change to a group of queues, allowing multiple inputs (ports). I'm assuming there is an open source directed acyclic graph library out there that can replace the linked list. -Ken --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
See comments below.
Tim Ken Tanaka wrote: > > > Tim Dudgeon wrote: >> Ken Tanaka wrote: >>> Hi Tim, > ... >>>> >>> At present, the structure for storing stages is a linked list, and >>> branches are implemented as additional pipelines accessed by a name >>> through a HashMap. To generally handle branching and merging, a >>> directed acyclic graph (DAG) would better serve, but that would >>> require the pipeline code to be rewritten at this level. Arguments >>> could also be made for allowing cycles, as in directed graphs, but >>> that would be harder to debug, and with a GUI might be a step toward >>> a visual programming language--so I don't think this should be >>> pursued yet unless there are volunteers... >>> >> >> I agree, DAG would be better, but cycles could be needeed too, so DG >> would be better too. >> But, yes, I am ideally wanting visual designer too. >> > I'd like a visual designer too at some point, but that's a ways off into > the future. >> >>>> >>>> Taken together I can see a generalisation here using named ports >>>> (input and outut), which is similar, but not identical, to your >>>> current concept of branches. >>>> >>>> So you have: >>>> BaseStage.emit(String branch, Object obj); >>>> whereas I would conceptually see this as: >>>> emit(String port, Object obj); >>>> and you have: >>>> Stage.process(Object obj); >>>> whereas I would would conceptually see this as: >>>> Stage.process(String port, Object obj); >>>> >>>> And when a pipeline is being assembled a downstream stage is >>>> attached to a particular port of a stage, not the stage itself. It >>>> then just recieves data sent to that particular port, but not the >>>> other ports. >>> I could see that this would work, but would need either modifying a >>> number of stages already written, or maybe creating a compatibility >>> stage driver that takes older style stages so that the input object >>> comes from a configured port name, usually "input" and a sends the >>> output to configured output ports named "output" and whatever the >>> previous branch name(s) were, if any. Stages that used to look for >>> events for input should be rewritten to read multiple inputs ( >>> Stage.process(String port, Object obj) as you suggested). Events >>> would then be reserved for truly out-of-band signals between stages >>> rather than carrying data for processing. >> >> Agreed, I think with would be good. I think existing stages could be >> made compatible by having a default input and output port, and to use >> those if not specific port was specified. >> A default in/out port would probably be necessary to allow simple >> auto-wiring. >> >>>> >>>> I'd love to hear how compatible the current system is with this way >>>> of seeing things. Are we just talking about a new type of Stage >>>> implementation, or a more fundamental incompatibility at the API level. >>>> >>> I think you have some good ideas. This is changing the Stage >>> implementation, which affects on the order of 60 stages for us that >>> override the process method, unless the compatibility stage driver >>> works out. The top level pipeline would also be restructured. The >>> amount of work required puts this out of the near term for me to work >>> on it, but there may be other developers/contributors to take this on. >> >> I need to investigate more fully here, and consider the other options. >> But potentially this is certainly of interest. >> >> So is all that's necessary to prototype this to create a new Stage >> implementation, with new emit( ... ) and process( ... ) methods? > I'm thinking it's more involved than that. To really deal well with the > arbitrary number of downstream stages rather than just one means > changing the digester rules > <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> > on specifying what follows. Normally a stage is connected to the > preceding stage if it is listed in that order in the configuration file. > This should be a default behavior, but if stage2 and stage3 both follow > stage1 then some notation of which is the previous stage is needed. > > stage1----stage2 > | > |-----stage3 > > might be set up as conf_pipe.xml: > <pipeline> > ... > <stage className="com.demo.pipeline.stages.Stage1" > driverFactoryId="df1" stageId="stage1"/> > <stage className="com.demo.pipeline.stages.Stage2" > driverFactoryId="df1"/> > <stage className="com.demo.pipeline.stages.Stage3" > driverFactoryId="df1" follows="stage1"/> > </pipeline> > > I propose the 'follows="stage1"' attribute to connect stage3 to stage1 > instead of stage2 immediately preceding. This seems cleaner than setting > up a branch and matching up branch key names between the branching stage > and the secondary pipeline(s). Can you think of a cleaner way to > configure this? I think we're in danger of looking at this the wrong way. The XML should reflect the underlying data model, not drive it. But to stick with this paradigm I would think it might be best to explicity define the connections in the model definition. Maybe something more like this: <pipeline> ... <stage className="com.demo.pipeline.stages.Stage1" driverFactoryId="df1" stageId="stage1"> </stage> <stage className="com.demo.pipeline.stages.Stage2" driverFactoryId="df1"> <input stageId="stage1" outputPort="pass"/> </stage> <stage className="com.demo.pipeline.stages.Stage3" driverFactoryId="df1"> <input stageId="stage1" outputPort="pass"/> </stage> <stage className="com.demo.pipeline.stages.Stage4" driverFactoryId="df1"> <input stageId="stage1" outputPort="fail" inputPort="aPort"/> </stage> </pipeline> I think this would allow more flexibility, as: 1. a stage could define multiple inputs if it needed to. 2. each connection is explicity defined and could have extra attributes added in future (e.g. a disable attribute to disable execution of that part of the pipeline. 3. The concept of input can probably be generalised to include the "feed", allowing multiple feeds to be used (as discussed earlier in this thread). e.g. stage1 would also have an input that would be the feed. > > The Pipeline.java class will need to be modified to build and maintain a > DAG structure rather than a linked list. The incoming data are managed > by a queue in the stage driver, which would change to a group of queues, > allowing multiple inputs (ports). I'm assuming there is an open source > directed acyclic graph library out there that can replace the linked list. If defined as I propose I'm not sure a specific graph library is necessary. The model just comprises a set of stages that know how they are connected. e.g. the connections are already implicit in the model. But this probably needs more thought. > > -Ken --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Tim Dudgeon wrote: > See comments below. > > Tim > > Ken Tanaka wrote: >> >> >> Tim Dudgeon wrote: >>> Ken Tanaka wrote: >>>> Hi Tim, >> ... >>>>> >>>> At present, the structure for storing stages is a linked list, and >>>> branches are implemented as additional pipelines accessed by a name >>>> through a HashMap. To generally handle branching and merging, a >>>> directed acyclic graph (DAG) would better serve, but that would >>>> require the pipeline code to be rewritten at this level. Arguments >>>> could also be made for allowing cycles, as in directed graphs, but >>>> that would be harder to debug, and with a GUI might be a step >>>> toward a visual programming language--so I don't think this should >>>> be pursued yet unless there are volunteers... >>>> >>> >>> I agree, DAG would be better, but cycles could be needeed too, so DG >>> would be better too. >>> But, yes, I am ideally wanting visual designer too. >>> >> I'd like a visual designer too at some point, but that's a ways off >> into the future. >>> >>>>> >>>>> Taken together I can see a generalisation here using named ports >>>>> (input and outut), which is similar, but not identical, to your >>>>> current concept of branches. >>>>> >>>>> So you have: >>>>> BaseStage.emit(String branch, Object obj); >>>>> whereas I would conceptually see this as: >>>>> emit(String port, Object obj); >>>>> and you have: >>>>> Stage.process(Object obj); >>>>> whereas I would would conceptually see this as: >>>>> Stage.process(String port, Object obj); >>>>> >>>>> And when a pipeline is being assembled a downstream stage is >>>>> attached to a particular port of a stage, not the stage itself. It >>>>> then just recieves data sent to that particular port, but not the >>>>> other ports. >>>> I could see that this would work, but would need either modifying a >>>> number of stages already written, or maybe creating a compatibility >>>> stage driver that takes older style stages so that the input object >>>> comes from a configured port name, usually "input" and a sends the >>>> output to configured output ports named "output" and whatever the >>>> previous branch name(s) were, if any. Stages that used to look for >>>> events for input should be rewritten to read multiple inputs ( >>>> Stage.process(String port, Object obj) as you suggested). Events >>>> would then be reserved for truly out-of-band signals between stages >>>> rather than carrying data for processing. >>> >>> Agreed, I think with would be good. I think existing stages could be >>> made compatible by having a default input and output port, and to >>> use those if not specific port was specified. >>> A default in/out port would probably be necessary to allow simple >>> auto-wiring. >>> >>>>> >>>>> I'd love to hear how compatible the current system is with this >>>>> way of seeing things. Are we just talking about a new type of >>>>> Stage implementation, or a more fundamental incompatibility at the >>>>> API level. >>>>> >>>> I think you have some good ideas. This is changing the Stage >>>> implementation, which affects on the order of 60 stages for us that >>>> override the process method, unless the compatibility stage driver >>>> works out. The top level pipeline would also be restructured. The >>>> amount of work required puts this out of the near term for me to >>>> work on it, but there may be other developers/contributors to take >>>> this on. >>> >>> I need to investigate more fully here, and consider the other options. >>> But potentially this is certainly of interest. >>> >>> So is all that's necessary to prototype this to create a new Stage >>> implementation, with new emit( ... ) and process( ... ) methods? >> I'm thinking it's more involved than that. To really deal well with >> the arbitrary number of downstream stages rather than just one means >> changing the digester rules >> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> >> on specifying what follows. Normally a stage is connected to the >> preceding stage if it is listed in that order in the configuration >> file. This should be a default behavior, but if stage2 and stage3 >> both follow stage1 then some notation of which is the previous stage >> is needed. >> >> stage1----stage2 >> | >> |-----stage3 >> >> might be set up as conf_pipe.xml: >> <pipeline> >> ... >> <stage className="com.demo.pipeline.stages.Stage1" >> driverFactoryId="df1" stageId="stage1"/> >> <stage className="com.demo.pipeline.stages.Stage2" >> driverFactoryId="df1"/> >> <stage className="com.demo.pipeline.stages.Stage3" >> driverFactoryId="df1" follows="stage1"/> >> </pipeline> >> >> I propose the 'follows="stage1"' attribute to connect stage3 to >> stage1 instead of stage2 immediately preceding. This seems cleaner >> than setting up a branch and matching up branch key names between the >> branching stage and the secondary pipeline(s). Can you think of a >> cleaner way to configure this? > > I think we're in danger of looking at this the wrong way. The XML > should reflect the underlying data model, not drive it. But to stick > with this paradigm I would think it might be best to explicity define > the connections in the model definition. Maybe something more like this: > > <pipeline> > ... > <stage className="com.demo.pipeline.stages.Stage1" > driverFactoryId="df1" stageId="stage1"> > </stage> > <stage className="com.demo.pipeline.stages.Stage2" > driverFactoryId="df1"> > <input stageId="stage1" outputPort="pass"/> > </stage> outputPort="pass"/>', 'outputPort="pass"' refers to an output port of stage1 and named "pass" and is not specifying that the stage2 output port is named "pass", right? So Stage1 has two output ports, named "pass" and "fail", and this would be documented somewhere so you knew what to connect to when you wrote the configuration XML? > <stage className="com.demo.pipeline.stages.Stage3" > driverFactoryId="df1"> > <input stageId="stage1" outputPort="pass"/> > </stage> > <stage className="com.demo.pipeline.stages.Stage4" > driverFactoryId="df1"> > <input stageId="stage1" outputPort="fail" inputPort="aPort"/> > </stage> So here Stage4 has an input port named "aPort" and it is loaded from the stage1 output port named "fail"? > </pipeline> > > I think this would allow more flexibility, as: > 1. a stage could define multiple inputs if it needed to. If I understand you correctly, suppose there is a stage5 that has input ports "aPort" and "bPort" that we would like to receive data from stage2 and stage3 ("pass" output port from both). Then it would be specified as follows: <stage className="com.demo.pipeline.stages.Stage5" driverFactoryId="df1"> <input stageId="stage2" outputPort="pass" inputPort="aPort"/> <input stageId="stage3" outputPort="pass" inputPort="bPort"/> </stage> I also assume that Stage2 and Stage3 are given stageIds of "stage2" and "stage3" respectively. [stage1]------------>[stage2]------------>[stage5] | pass->(in) pass->aPort ^ | | +-------------->[stage3]---------------+ | pass->(in) pass->bPort | +-------------->[stage4] fail->aPort > 2. each connection is explicity defined and could have extra > attributes added in future (e.g. a disable attribute to disable > execution of that part of the pipeline. > 3. The concept of input can probably be generalised to include the > "feed", allowing multiple feeds to be used (as discussed earlier in > this thread). e.g. stage1 would also have an input that would be the > feed. > Do you envision a stage with two inputs (aPort and bPort) waiting until there are inputs on both before its stageDriver invokes the process method? If stage5 needs two inputs, and stage2 provides 3 values and stage3 provides 2 values, there are just 2 complete pairs of values. The third value from stage2 could wait indefinitely for a matching input from stage3. Currently stages run until their queue is empty, but with multiple inputs that could be imbalanced, it might be better to set the quit condition to any one queue is empty and all upstream stages claim to be complete. Any non-empty queues on exit can trigger a warning. > >> >> The Pipeline.java class will need to be modified to build and >> maintain a DAG structure rather than a linked list. The incoming data >> are managed by a queue in the stage driver, which would change to a >> group of queues, allowing multiple inputs (ports). I'm assuming there >> is an open source directed acyclic graph library out there that can >> replace the linked list. > > If defined as I propose I'm not sure a specific graph library is > necessary. The model just comprises a set of stages that know how they > are connected. e.g. the connections are already implicit in the model. > But this probably needs more thought. > find the next stage feeder the first time it is used. To allow general connections, the downstream feeder link could become an array of subsequent stageDrivers, with the connections set up as the pipeline is built. In that case, then a DAG library would not be needed, and we could keep the linked list as is. -Ken --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Ken Tanaka wrote:
> > > Tim Dudgeon wrote: >> See comments below. >> >> Tim >> >> Ken Tanaka wrote: >>> >>> >>> Tim Dudgeon wrote: >>>> Ken Tanaka wrote: >>>>> Hi Tim, >>> ... >>>>>> >>>>> At present, the structure for storing stages is a linked list, and >>>>> branches are implemented as additional pipelines accessed by a name >>>>> through a HashMap. To generally handle branching and merging, a >>>>> directed acyclic graph (DAG) would better serve, but that would >>>>> require the pipeline code to be rewritten at this level. Arguments >>>>> could also be made for allowing cycles, as in directed graphs, but >>>>> that would be harder to debug, and with a GUI might be a step >>>>> toward a visual programming language--so I don't think this should >>>>> be pursued yet unless there are volunteers... >>>>> >>>> >>>> I agree, DAG would be better, but cycles could be needeed too, so DG >>>> would be better too. >>>> But, yes, I am ideally wanting visual designer too. >>>> >>> I'd like a visual designer too at some point, but that's a ways off >>> into the future. >>>> >>>>>> >>>>>> Taken together I can see a generalisation here using named ports >>>>>> (input and outut), which is similar, but not identical, to your >>>>>> current concept of branches. >>>>>> >>>>>> So you have: >>>>>> BaseStage.emit(String branch, Object obj); >>>>>> whereas I would conceptually see this as: >>>>>> emit(String port, Object obj); >>>>>> and you have: >>>>>> Stage.process(Object obj); >>>>>> whereas I would would conceptually see this as: >>>>>> Stage.process(String port, Object obj); >>>>>> >>>>>> And when a pipeline is being assembled a downstream stage is >>>>>> attached to a particular port of a stage, not the stage itself. It >>>>>> then just recieves data sent to that particular port, but not the >>>>>> other ports. >>>>> I could see that this would work, but would need either modifying a >>>>> number of stages already written, or maybe creating a compatibility >>>>> stage driver that takes older style stages so that the input object >>>>> comes from a configured port name, usually "input" and a sends the >>>>> output to configured output ports named "output" and whatever the >>>>> previous branch name(s) were, if any. Stages that used to look for >>>>> events for input should be rewritten to read multiple inputs ( >>>>> Stage.process(String port, Object obj) as you suggested). Events >>>>> would then be reserved for truly out-of-band signals between stages >>>>> rather than carrying data for processing. >>>> >>>> Agreed, I think with would be good. I think existing stages could be >>>> made compatible by having a default input and output port, and to >>>> use those if not specific port was specified. >>>> A default in/out port would probably be necessary to allow simple >>>> auto-wiring. >>>> >>>>>> >>>>>> I'd love to hear how compatible the current system is with this >>>>>> way of seeing things. Are we just talking about a new type of >>>>>> Stage implementation, or a more fundamental incompatibility at the >>>>>> API level. >>>>>> >>>>> I think you have some good ideas. This is changing the Stage >>>>> implementation, which affects on the order of 60 stages for us that >>>>> override the process method, unless the compatibility stage driver >>>>> works out. The top level pipeline would also be restructured. The >>>>> amount of work required puts this out of the near term for me to >>>>> work on it, but there may be other developers/contributors to take >>>>> this on. >>>> >>>> I need to investigate more fully here, and consider the other options. >>>> But potentially this is certainly of interest. >>>> >>>> So is all that's necessary to prototype this to create a new Stage >>>> implementation, with new emit( ... ) and process( ... ) methods? >>> I'm thinking it's more involved than that. To really deal well with >>> the arbitrary number of downstream stages rather than just one means >>> changing the digester rules >>> <http://commons.apache.org/sandbox/pipeline/xref/org/apache/commons/pipeline/config/PipelineRuleSet.html> >>> on specifying what follows. Normally a stage is connected to the >>> preceding stage if it is listed in that order in the configuration >>> file. This should be a default behavior, but if stage2 and stage3 >>> both follow stage1 then some notation of which is the previous stage >>> is needed. >>> >>> stage1----stage2 >>> | >>> |-----stage3 >>> >>> might be set up as conf_pipe.xml: >>> <pipeline> >>> ... >>> <stage className="com.demo.pipeline.stages.Stage1" >>> driverFactoryId="df1" stageId="stage1"/> >>> <stage className="com.demo.pipeline.stages.Stage2" >>> driverFactoryId="df1"/> >>> <stage className="com.demo.pipeline.stages.Stage3" >>> driverFactoryId="df1" follows="stage1"/> >>> </pipeline> >>> >>> I propose the 'follows="stage1"' attribute to connect stage3 to >>> stage1 instead of stage2 immediately preceding. This seems cleaner >>> than setting up a branch and matching up branch key names between the >>> branching stage and the secondary pipeline(s). Can you think of a >>> cleaner way to configure this? >> >> I think we're in danger of looking at this the wrong way. The XML >> should reflect the underlying data model, not drive it. But to stick >> with this paradigm I would think it might be best to explicity define >> the connections in the model definition. Maybe something more like this: >> >> <pipeline> >> ... >> <stage className="com.demo.pipeline.stages.Stage1" >> driverFactoryId="df1" stageId="stage1"> >> </stage> >> <stage className="com.demo.pipeline.stages.Stage2" >> driverFactoryId="df1"> >> <input stageId="stage1" outputPort="pass"/> >> </stage> > Just to clarify, for Stage2, when you specify '<input stageId="stage1" > outputPort="pass"/>', 'outputPort="pass"' refers to an output port of > stage1 and named "pass" and is not specifying that the stage2 output > port is named "pass", right? So Stage1 has two output ports, named > "pass" and "fail", and this would be documented somewhere so you knew > what to connect to when you wrote the configuration XML? Yes. That's the idea. Maybe different names might help here. >> <stage className="com.demo.pipeline.stages.Stage3" >> driverFactoryId="df1"> >> <input stageId="stage1" outputPort="pass"/> >> </stage> >> <stage className="com.demo.pipeline.stages.Stage4" >> driverFactoryId="df1"> >> <input stageId="stage1" outputPort="fail" inputPort="aPort"/> >> </stage> > So here Stage4 has an input port named "aPort" and it is loaded from the > stage1 output port named "fail"? Correct again. >> </pipeline> >> >> I think this would allow more flexibility, as: >> 1. a stage could define multiple inputs if it needed to. > If I understand you correctly, suppose there is a stage5 that has input > ports "aPort" and "bPort" that we would like to receive data from stage2 > and stage3 ("pass" output port from both). Then it would be specified as > follows: > > <stage className="com.demo.pipeline.stages.Stage5" > driverFactoryId="df1"> > <input stageId="stage2" outputPort="pass" inputPort="aPort"/> > <input stageId="stage3" outputPort="pass" inputPort="bPort"/> > </stage> > Correct again. > I also assume that Stage2 and Stage3 are given stageIds of "stage2" and > "stage3" respectively. > > [stage1]------------>[stage2]------------>[stage5] > | pass->(in) pass->aPort ^ > | | > +-------------->[stage3]---------------+ > | pass->(in) pass->bPort > | > +-------------->[stage4] > fail->aPort > That's the idea. Its obviously a bit on an extreme case as it uses branching and merging, but it covers what I think is needed. >> 2. each connection is explicity defined and could have extra >> attributes added in future (e.g. a disable attribute to disable >> execution of that part of the pipeline. >> 3. The concept of input can probably be generalised to include the >> "feed", allowing multiple feeds to be used (as discussed earlier in >> this thread). e.g. stage1 would also have an input that would be the >> feed. >> > Do you envision a stage with two inputs (aPort and bPort) waiting until > there are inputs on both before its stageDriver invokes the process > method? If stage5 needs two inputs, and stage2 provides 3 values and > stage3 provides 2 values, there are just 2 complete pairs of values. The > third value from stage2 could wait indefinitely for a matching input > from stage3. Currently stages run until their queue is empty, but with > multiple inputs that could be imbalanced, it might be better to set the > quit condition to any one queue is empty and all upstream stages claim > to be complete. Any non-empty queues on exit can trigger a warning. Yes, there are multiple scenarios here. e.g. 1. pairwise processing. a1 + b1 a2 + b2 a3 + b3 ... 2. combinatorial processing a1 + b1 a1 + b2 a1 + b3 a2 + b1 a2 + b2 .... (maybe others too) Clearly 1's inputs has to be matched, and if they are not some exit rules would be required. Also, as you have mentioned before there are potential memory issues with combinatorial processing of large sets of data, but I think solutions can be found for this. Tim >> >>> >>> The Pipeline.java class will need to be modified to build and >>> maintain a DAG structure rather than a linked list. The incoming data >>> are managed by a queue in the stage driver, which would change to a >>> group of queues, allowing multiple inputs (ports). I'm assuming there >>> is an open source directed acyclic graph library out there that can >>> replace the linked list. >> >> If defined as I propose I'm not sure a specific graph library is >> necessary. The model just comprises a set of stages that know how they >> are connected. e.g. the connections are already implicit in the model. >> But this probably needs more thought. >> > Currently the linked list of stages is used for lazy initialization, to > find the next stage feeder the first time it is used. To allow general > connections, the downstream feeder link could become an array of > subsequent stageDrivers, with the connections set up as the pipeline is > built. In that case, then a DAG library would not be needed, and we > could keep the linked list as is. > > > -Ken --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Thanks for confirming the examples. I figure that fleshing this out now
should speed development at a future time. Tim Dudgeon wrote: > Ken Tanaka wrote: >> >> >> Tim Dudgeon wrote: >>> See comments below. >>> >>> Tim ... > > >>> 2. each connection is explicity defined and could have extra >>> attributes added in future (e.g. a disable attribute to disable >>> execution of that part of the pipeline. >>> 3. The concept of input can probably be generalised to include the >>> "feed", allowing multiple feeds to be used (as discussed earlier in >>> this thread). e.g. stage1 would also have an input that would be the >>> feed. >>> >> Do you envision a stage with two inputs (aPort and bPort) waiting >> until there are inputs on both before its stageDriver invokes the >> process method? If stage5 needs two inputs, and stage2 provides 3 >> values and stage3 provides 2 values, there are just 2 complete pairs >> of values. The third value from stage2 could wait indefinitely for a >> matching input from stage3. Currently stages run until their queue is >> empty, but with multiple inputs that could be imbalanced, it might be >> better to set the quit condition to any one queue is empty and all >> upstream stages claim to be complete. Any non-empty queues on exit >> can trigger a warning. > > Yes, there are multiple scenarios here. e.g. > 1. pairwise processing. > a1 + b1 > a2 + b2 > a3 + b3 > ... > > 2. combinatorial processing > a1 + b1 > a1 + b2 > a1 + b3 > a2 + b1 > a2 + b2 > .... > > (maybe others too) > > > Clearly 1's inputs has to be matched, and if they are not some exit > rules would be required. > Also, as you have mentioned before there are potential memory issues > with combinatorial processing of large sets of data, but I think > solutions can be found for this. > by making a stage behave differently rather than putting this capability into the framework. A stage can be written to do nothing but accept pairwise arguments and emit combinatorial output, it can then be inserted before any stage to which you want to add combinatorial input processing. Ken --------------------------------------------------------------------- To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] |
Free forum by Nabble | Edit this page |