DWF
Distributed workflow runner following Gush interface using Sidekiq and Redis. This project is for researching DSL purpose
Installation
1. Add dwf
to Gemfile
gem 'dwf', '~> 0.1.12'
2. Execute flow example
Declare jobs
require 'dwf'
class FirstItem < Dwf::Item
def perform
puts "#{self.class.name}: running"
puts "#{self.class.name}: finish"
end
end
class SecondItem < Dwf::Item
def perform
puts "#{self.class.name}: running"
output('Send to ThirdItem')
puts "#{self.class.name} finish"
end
end
class ThirdItem < Dwf::Item
def perform
puts "#{self.class.name}: running"
puts "#{self.class.name}: finish"
end
end
class FourthItem < Dwf::Item
def perform
puts "#{self.class.name}: running"
puts "payloads from incoming: #{payloads.inspect}"
puts "#{self.class.name}: finish"
end
end
FifthItem = Class.new(FirstItem)
Declare flow
require 'dwf'
class TestWf < Dwf::Workflow
def configure
run FirstItem
run SecondItem, after: FirstItem
run ThirdItem, after: FirstItem
run FourthItem, after: [ThirdItem, SecondItem]
run FifthItem, after: FourthItem
end
end
Start background worker process
bundle exec sidekiq -q dwf
Execute flow
wf = TestWf.create
wf.callback_type = Dwf::Workflow::SK_BATCH
wf.start!
Note
dwf
supports 2 callback types Dwf::Workflow::BUILD_IN
and Dwf::Workflow::SK_BATCH
Dwf::Workflow::BUILD_IN
is a build-in callbackDwf::Workflow::SK_BATCH
is sidekiq batch callback which requiredsidekiq-pro
By default dwf
will use Dwf::Workflow::BUILD_IN
callback.
Output
FirstItem: running
FirstItem: finish
SecondItem: running
SecondItem finish
ThirdItem: running
ThirdItem: finish
FourthItem: running
FourthItem: finish
FifthItem: running
FifthItem: finish
Config redis and default queue
dwf
uses redis as the key value stograge through redis-rb, So you can pass redis configuration by redis_opts
Dwf.config do |config|
SENTINELS = [
{ host: "127.0.0.1", port: 26380 },
{ host: "127.0.0.1", port: 26381 }
]
config.redis_opts = { host: 'mymaster', sentinels: SENTINELS, role: :master }
config.namespace = 'dwf'
end
Advanced features
Pipelining
You can pass jobs result to next nodes
class SendOutput < Dwf::Item
def perform
output('it works')
end
end
output
method used to output data from the job to add outgoing jobs
class ReceiveOutput < Dwf::Item
def perform
= payloads.first[:output] # it works
end
end
payloads
is an array that containing outputs from incoming jobs
[
{
id: "SendOutput|1849a3f9-5fce-401e-a73a-91fc1048356",
class: "SendOutput",
output: 'it works'
}
]
Sub workflow
There might be a case when you want to reuse a workflow in another workflow
As an example, let's write a workflow which contain another workflow, expected that the SubWorkflow workflow execute after SecondItem
and the ThirdItem
execute after SubWorkflow
Setup
class FirstItem < Dwf::Item
def perform
puts "Main flow: #{self.class.name} running"
puts "Main flow: #{self.class.name} finish"
end
end
SecondItem = Class.new(FirstItem)
ThirtItem = Class.new(FirstItem)
class FirstSubItem < Dwf::Item
def perform
puts "Sub flow: #{self.class.name} running"
puts "Sub flow: #{self.class.name} finish"
end
end
SecondSubItem = Class.new(FirstSubItem)
class SubWorkflow < Dwf::Workflow
def configure
run FirstSubItem
run SecondSubItem, after: FirstSubItem
end
end
class TestWf < Dwf::Workflow
def configure
run FirstItem
run SecondItem, after: FirstItem
run SubWorkflow, after: SecondItem
run ThirtItem, after: SubWorkflow
end
end
wf = TestWf.create
wf.start!
Result
Main flow: FirstItem running
Main flow: FirstItem finish
Main flow: SecondItem running
Main flow: SecondItem finish
Sub flow: FirstSubItem running
Sub flow: FirstSubItem finish
Sub flow: SecondSubItem running
Sub flow: SecondSubItem finish
Main flow: ThirtItem running
Main flow: ThirtItem finish
Todo
- [x] Make it work
- [x] Support pass params
- [x] Support with build-in callback
- [x] Add github workflow
- [x] Redis configurable
- [x] Pipelining
- [X] Test
- [x] Sub workflow
- [ ] Support Resque
- [ ] Key value store plugable
- [ ] research https://github.com/moneta-rb/moneta