Class: Yunhe::Core

Inherits:
Object
  • Object
show all
Defined in:
lib/yunhe/core.rb

Overview

The core class that manages the producers and consumers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Core

Initialize a new yunhe core with options: :producer_size => Size of the producer pool :consumer_size => Size of the consumer pool :queue_size => Size of the buffer queue



18
19
20
21
22
23
24
25
26
# File 'lib/yunhe/core.rb', line 18

def initialize(options)
  @producer_size = options[:producer_size]
  @consumer_size = options[:consumer_size]
  @queue_size = options[:queue_size]

  @producer_pool = ThreadGroup.new
  @consumer_pool = ThreadGroup.new
  @queue = SizedQueue.new(queue_size)
end

Instance Attribute Details

#consumer_poolObject (readonly)

Get the consumer pool



39
40
41
# File 'lib/yunhe/core.rb', line 39

def consumer_pool
  @consumer_pool
end

#consumer_sizeObject

Returns the value of attribute consumer_size.



11
12
13
# File 'lib/yunhe/core.rb', line 11

def consumer_size
  @consumer_size
end

#producer_poolObject (readonly)

Get the producer pool



32
33
34
# File 'lib/yunhe/core.rb', line 32

def producer_pool
  @producer_pool
end

#producer_sizeObject

Returns the value of attribute producer_size.



11
12
13
# File 'lib/yunhe/core.rb', line 11

def producer_size
  @producer_size
end

#queueObject (readonly)

Get the buffer queue



47
48
49
# File 'lib/yunhe/core.rb', line 47

def queue
  @queue
end

#queue_sizeObject

Returns the value of attribute queue_size.



11
12
13
# File 'lib/yunhe/core.rb', line 11

def queue_size
  @queue_size
end

Instance Method Details

#consume_with(&task) ⇒ Object

Set the consumer task &task: The task that process the resources. Should take the resource as a paramater:

do | resource |
  ...
end


78
79
80
81
82
83
84
85
86
87
# File 'lib/yunhe/core.rb', line 78

def consume_with (&task)
  each_consumer do | thread |
    thread.kill
  end

  consumer_size.times do
    thread = Consumer.new(queue, task)
    consumer_pool.add(thread)
  end
end

#produce_by(&task) ⇒ Object

Set the producer task &task: The task that produces a new resource. Should have the resource returned:

do 
  ...
  return resource
end


61
62
63
64
65
66
67
68
69
70
# File 'lib/yunhe/core.rb', line 61

def produce_by (&task)
  each_producer do | thread |
    thread.kill
  end

  producer_size.times do 
    thread = Producer.new(queue, task)
    producer_pool.add(thread)
  end
end

#terminateObject

Stop the yunhe core. Kill all producer/consumer threads, and clear the buffer queue.



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/yunhe/core.rb', line 90

def terminate
  each_producer do | thread |
    thread.kill
  end

  each_consumer do | thread |
    thread.kill
  end

  queue.clear
end