Module: Shifty::DSL
- Defined in:
- lib/shifty/dsl.rb
Defined Under Namespace
Classes: BatchContext
Instance Method Summary collapse
- #batch_worker(options = {}, &block) ⇒ Object
- #filter_worker(options = {}, &block) ⇒ Object
- #handoff(something) ⇒ Object
- #relay_worker(options = {}, &block) ⇒ Object
- #side_worker(options = {}, &block) ⇒ Object
- #source_worker(argument = nil, &block) ⇒ Object
- #splitter_worker(options = {}, &block) ⇒ Object
-
#trailing_worker(trail_length = 2) ⇒ Object
don’t like that this is a second exception to accepting options..
Instance Method Details
#batch_worker(options = {}, &block) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/shifty/dsl.rb', line 68 def batch_worker( = {}, &block) [:tags] ||= [] [:tags] << :batch [:gathering] ||= 1 ensure_regular_arity(block) if block batch_full = block || proc { |_, batch| batch.size >= [:gathering] } [:context] = BatchContext.new({batch_full: batch_full}) Worker.new() do |value, supply, context| if value context.collection = [value] until context.batch_complete?( context.collection.last, context.collection ) context.collection << supply.shift end context.collection.compact end end end |
#filter_worker(options = {}, &block) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/shifty/dsl.rb', line 48 def filter_worker( = {}, &block) [:tags] ||= [] [:tags] << :filter ensure_callable!(block) Worker.new() do |value, supply| while value && !block.call(value) value = supply.shift end value end end |
#handoff(something) ⇒ Object
132 133 134 |
# File 'lib/shifty/dsl.rb', line 132 def handoff(something) Fiber.yield something end |
#relay_worker(options = {}, &block) ⇒ Object
22 23 24 25 26 27 28 29 30 |
# File 'lib/shifty/dsl.rb', line 22 def relay_worker( = {}, &block) [:tags] ||= [] [:tags] << :relay ensure_regular_arity(block) Worker.new() do |value| value && block.call(value) end end |
#side_worker(options = {}, &block) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/shifty/dsl.rb', line 32 def side_worker( = {}, &block) [:tags] ||= [] [:tags] << :side_effect mode = [:mode] || :normal ensure_regular_arity(block) Worker.new() do |value| value.tap do |v| used_value = (mode == :hardened) ? Marshal.load(Marshal.dump(v)) : v v && block.call(used_value) end end end |
#source_worker(argument = nil, &block) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/shifty/dsl.rb', line 5 def source_worker(argument = nil, &block) ensure_correct_arity_for!(argument, block) series = series_from(argument) callable = setup_callable_for(block, series) return Worker.new(&callable) if series.nil? Worker.new(tags: [:source]) do series.each(&callable) loop do handoff nil end end end |
#splitter_worker(options = {}, &block) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/shifty/dsl.rb', line 93 def splitter_worker( = {}, &block) [:tags] ||= [] [:tags] << :splitter ensure_regular_arity(block) Worker.new() do |value| if value.nil? value else parts = [block.call(value)].flatten while parts.size > 1 handoff parts.shift end parts.shift end end end |
#trailing_worker(trail_length = 2) ⇒ Object
don’t like that this is a second exception to accepting options..
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/shifty/dsl.rb', line 112 def trailing_worker(trail_length = 2) = {tags: [:trailing]} trail = [] Worker.new() do |value, supply| if value trail.unshift value if trail.size >= trail_length trail.pop end while trail.size < trail_length trail.unshift supply.shift end trail else value # hint: it's nil! end end end |