Class: Yunhe::Core
- Inherits:
-
Object
- Object
- Yunhe::Core
- Defined in:
- lib/yunhe/core.rb
Overview
The core class that manages the producers and consumers.
Instance Attribute Summary collapse
-
#consumer_pool ⇒ Object
readonly
Get the consumer pool.
-
#consumer_size ⇒ Object
Returns the value of attribute consumer_size.
-
#producer_pool ⇒ Object
readonly
Get the producer pool.
-
#producer_size ⇒ Object
Returns the value of attribute producer_size.
-
#queue ⇒ Object
readonly
Get the buffer queue.
-
#queue_size ⇒ Object
Returns the value of attribute queue_size.
Instance Method Summary collapse
-
#consume_with(&task) ⇒ Object
Set the consumer task &task: The task that process the resources.
-
#initialize(options) ⇒ Core
constructor
Initialize a new yunhe core with options: :producer_size => Size of the producer pool :consumer_size => Size of the consumer pool :queue_size => Size of the buffer queue.
-
#produce_by(&task) ⇒ Object
Set the producer task &task: The task that produces a new resource.
-
#terminate ⇒ Object
Stop the yunhe core.
Constructor Details
#initialize(options) ⇒ Core
Initialize a new yunhe core with options: :producer_size => Size of the producer pool :consumer_size => Size of the consumer pool :queue_size => Size of the buffer queue
18 19 20 21 22 23 24 25 26 |
# File 'lib/yunhe/core.rb', line 18 def initialize() @producer_size = [:producer_size] @consumer_size = [:consumer_size] @queue_size = [:queue_size] @producer_pool = ThreadGroup.new @consumer_pool = ThreadGroup.new @queue = SizedQueue.new(queue_size) end |
Instance Attribute Details
#consumer_pool ⇒ Object (readonly)
Get the consumer pool
39 40 41 |
# File 'lib/yunhe/core.rb', line 39 def consumer_pool @consumer_pool end |
#consumer_size ⇒ Object
Returns the value of attribute consumer_size.
11 12 13 |
# File 'lib/yunhe/core.rb', line 11 def consumer_size @consumer_size end |
#producer_pool ⇒ Object (readonly)
Get the producer pool
32 33 34 |
# File 'lib/yunhe/core.rb', line 32 def producer_pool @producer_pool end |
#producer_size ⇒ Object
Returns the value of attribute producer_size.
11 12 13 |
# File 'lib/yunhe/core.rb', line 11 def producer_size @producer_size end |
#queue ⇒ Object (readonly)
Get the buffer queue
47 48 49 |
# File 'lib/yunhe/core.rb', line 47 def queue @queue end |
#queue_size ⇒ Object
Returns the value of attribute queue_size.
11 12 13 |
# File 'lib/yunhe/core.rb', line 11 def queue_size @queue_size end |
Instance Method Details
#consume_with(&task) ⇒ Object
Set the consumer task &task: The task that process the resources. Should take the resource as a paramater:
do | resource |
...
end
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/yunhe/core.rb', line 78 def consume_with (&task) each_consumer do | thread | thread.kill end consumer_size.times do thread = Consumer.new(queue, task) consumer_pool.add(thread) end end |
#produce_by(&task) ⇒ Object
Set the producer task &task: The task that produces a new resource. Should have the resource returned:
do
...
return resource
end
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/yunhe/core.rb', line 61 def produce_by (&task) each_producer do | thread | thread.kill end producer_size.times do thread = Producer.new(queue, task) producer_pool.add(thread) end end |
#terminate ⇒ Object
Stop the yunhe core. Kill all producer/consumer threads, and clear the buffer queue.
90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/yunhe/core.rb', line 90 def terminate each_producer do | thread | thread.kill end each_consumer do | thread | thread.kill end queue.clear end |