Class: ActiveConcurrency::Schedulers::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/active_concurrency/schedulers/topic.rb

Instance Method Summary collapse

Constructor Details

#initialize(pool, **options) ⇒ Topic

Returns a new instance of Topic.



7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/active_concurrency/schedulers/topic.rb', line 7

def initialize(pool, **options)
  topics = options[:topics]
  mutexes = topics.each_with_object({}) do |t, h|
    h[t] = Mutex.new
  end

  topics = topics.cycle
  @pool = pool.each_with_object({}) do |w, h|
    topic = topics.next
    w.mutex = mutexes[topic]
    h.key?(topic) ? (h[topic] << w) : (h[topic] = [w])
  end
end

Instance Method Details

#schedule(*args, &block) ⇒ Object



21
22
23
24
25
# File 'lib/active_concurrency/schedulers/topic.rb', line 21

def schedule(*args, &block)
  topic = args.pop
  worker = @pool[topic].min_by(&:size)
  worker.schedule(*args, &block)
end