Class: Mercury::Fake

Inherits:
Object
  • Object
show all
Defined in:
lib/mercury/fake.rb,
lib/mercury/fake/queue.rb,
lib/mercury/fake/domain.rb,
lib/mercury/fake/metadata.rb,
lib/mercury/fake/subscriber.rb,
lib/mercury/fake/queued_message.rb

Defined Under Namespace

Classes: Domain, Metadata, Queue, QueuedMessage, Subscriber

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(domain = :default, parallelism: 1) ⇒ Fake

Returns a new instance of Fake.



18
19
20
21
# File 'lib/mercury/fake.rb', line 18

def initialize(domain=:default, parallelism: 1)
  @domain = Fake.domains[domain]
  @parallelism = parallelism
end

Class Method Details

.domainsObject



23
24
25
# File 'lib/mercury/fake.rb', line 23

def self.domains
  @domains ||= Hash.new { |h, k| h[k] = Domain.new }
end

Instance Method Details

#close(&k) ⇒ Object



27
28
29
30
# File 'lib/mercury/fake.rb', line 27

def close(&k)
  @closed = true
  ret(k)
end

#delete_source(source_name, &k) ⇒ Object



54
55
56
57
58
# File 'lib/mercury/fake.rb', line 54

def delete_source(source_name, &k)
  assert_not_closed
  queues.delete_if{|_k, v| v.source == source_name}
  ret(k)
end

#delete_work_queue(worker_group, &k) ⇒ Object



60
61
62
63
64
# File 'lib/mercury/fake.rb', line 60

def delete_work_queue(worker_group, &k)
  assert_not_closed
  queues.delete_if{|_k, v| v.worker == worker_group}
  ret(k)
end

#publish(source_name, msg, tag: '', headers: {}, &k) ⇒ Object



32
33
34
35
36
# File 'lib/mercury/fake.rb', line 32

def publish(source_name, msg, tag: '', headers: {}, &k)
  assert_not_closed
  queues.values.select{|q| q.binds?(source_name, tag)}.each{|q| q.enqueue(roundtrip(msg), tag, headers)}
  ret(k)
end

#queue_exists?(worker, &k) ⇒ Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/mercury/fake.rb', line 71

def queue_exists?(worker, &k)
  ret(k, queues.values.map(&:worker).include?(worker))
end

#source_exists?(source, &k) ⇒ Boolean

Returns:

  • (Boolean)


66
67
68
69
# File 'lib/mercury/fake.rb', line 66

def source_exists?(source, &k)
  built_in_sources = %w(direct topic fanout headers match rabbitmq.log rabbitmq.trace).map{|x| "amq.#{x}"}
  ret(k, (queues.values.map(&:source) + built_in_sources).include?(source))
end

#start_listener(source_name, handler, tag_filter: '#', &k) ⇒ Object



38
39
40
# File 'lib/mercury/fake.rb', line 38

def start_listener(source_name, handler, tag_filter: '#', &k)
  start_worker_or_listener(source_name, handler, tag_filter, &k)
end

#start_worker(worker_group, source_name, handler, tag_filter: '#', &k) ⇒ Object



42
43
44
# File 'lib/mercury/fake.rb', line 42

def start_worker(worker_group, source_name, handler, tag_filter: '#', &k)
  start_worker_or_listener(source_name, handler, tag_filter, worker_group, &k)
end