Class: Qwirk::Adapter::InMemory::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/qwirk/adapter/in_memory/topic.rb

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Topic

Returns a new instance of Topic.



6
7
8
9
10
# File 'lib/qwirk/adapter/in_memory/topic.rb', line 6

def initialize(name)
  @name              = name
  @worker_hash_mutex = Mutex.new
  @worker_hash       = {}
end

Instance Method Details

#get_worker_queue(worker_name, queue_max_size) ⇒ Object



12
13
14
15
16
17
18
# File 'lib/qwirk/adapter/in_memory/topic.rb', line 12

def get_worker_queue(worker_name, queue_max_size)
  @worker_hash_mutex.synchronize do
    queue = @worker_hash[worker_name] ||= Queue.new("#{@name}:#{worker_name}")
    queue.max_size = queue_max_size
    return queue
  end
end

#readObject



28
29
30
# File 'lib/qwirk/adapter/in_memory/topic.rb', line 28

def read
  raise "topic should not have been read for #{name}"
end

#stopObject



20
21
22
23
24
25
26
# File 'lib/qwirk/adapter/in_memory/topic.rb', line 20

def stop
  @worker_hash_mutex.synchronize do
    @worker_hash.each_value do |queue|
      queue.stop
    end
  end
end

#to_sObject



40
41
42
# File 'lib/qwirk/adapter/in_memory/topic.rb', line 40

def to_s
  "topic:#{@name}"
end

#write(obj) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/qwirk/adapter/in_memory/topic.rb', line 32

def write(obj)
  @worker_hash_mutex.synchronize do
    @worker_hash.each_value do |queue|
      queue.write(obj)
    end
  end
end