Class: Queight::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/queight/client.rb

Constant Summary collapse

PublishFailure =
Class.new(StandardError)

Instance Method Summary collapse

Constructor Details

#initialize(channel_pool) ⇒ Client

Returns a new instance of Client.



7
8
9
# File 'lib/queight/client.rb', line 7

def initialize(channel_pool)
  @channel_pool = channel_pool
end

Instance Method Details

#bind(exchange, queue) ⇒ Object



79
80
81
82
83
# File 'lib/queight/client.rb', line 79

def bind(exchange, queue)
  with_channel do |channel|
    exchange.bind(channel, queue)
  end
end

#declare(queue) ⇒ Object



29
30
31
32
33
# File 'lib/queight/client.rb', line 29

def declare(queue)
  with_channel do |channel|
    queue.declare(channel)
  end
end

#delete_exchange(exchange) ⇒ Object



99
100
101
102
103
# File 'lib/queight/client.rb', line 99

def delete_exchange(exchange)
  with_channel do |channel|
    exchange.delete(channel)
  end
end

#delete_queue(queue) ⇒ Object



89
90
91
92
93
# File 'lib/queight/client.rb', line 89

def delete_queue(queue)
  with_channel do |channel|
    queue.delete(channel)
  end
end

#message_count(queue) ⇒ Object



85
86
87
# File 'lib/queight/client.rb', line 85

def message_count(queue)
  with_channel { |channel| queue.message_count(channel) }
end

#publish(exchange, message, routing_key) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/queight/client.rb', line 35

def publish(exchange, message, routing_key)
  with_transactional_channel do |channel|
    channel.tx_select
    exchange.publish(channel, message, routing_key)
    raise PublishFailure unless channel.tx_commit
  end
end

#publish_to_queue(message, queue, message_options = {}) ⇒ Object



50
51
52
53
# File 'lib/queight/client.rb', line 50

def publish_to_queue(message, queue, message_options = {})
  declare(queue)
  publish(Queight.default_exchange(message_options), message, queue.name)
end

#publish_to_queue_without_transaction(message, queue, options = {}) ⇒ Object Also known as: publish_to_queue!



55
56
57
58
59
60
61
62
# File 'lib/queight/client.rb', line 55

def publish_to_queue_without_transaction(message, queue, options = {})
  declare(queue)
  publish_without_transaction(
    Queight.default_exchange(options),
    message,
    queue.name
  )
end

#publish_without_transaction(exchange, message, routing_key) ⇒ Object Also known as: publish!



43
44
45
46
47
# File 'lib/queight/client.rb', line 43

def publish_without_transaction(exchange, message, routing_key)
  with_channel do |channel|
    exchange.publish(channel, message, routing_key)
  end
end

#purge(queue) ⇒ Object



95
96
97
# File 'lib/queight/client.rb', line 95

def purge(queue)
  with_channel { |channel| queue.purge(channel) }
end

#subscribe(queue, prefetch = 1, &block) ⇒ Object



65
66
67
68
69
# File 'lib/queight/client.rb', line 65

def subscribe(queue, prefetch = 1, &block)
  with_subscribe_channel(prefetch) do |channel|
    queue.subscribe(channel, &block)
  end
end

#subscribe_non_blocking(queue, prefetch = 1, &block) ⇒ Object



71
72
73
74
75
76
77
# File 'lib/queight/client.rb', line 71

def subscribe_non_blocking(queue, prefetch = 1, &block)
  channel = @channel_pool.create_channel(prefetch)
  channel.prefetch(prefetch)
  consumer = queue.subscribe(channel, :block => false, &block)

  CancellableSubscriber.new(channel, consumer)
end

#with_channelObject



11
12
13
14
15
# File 'lib/queight/client.rb', line 11

def with_channel
  @channel_pool.with_channel do |channel|
    yield(channel)
  end
end

#with_subscribe_channel(prefetch) ⇒ Object



23
24
25
26
27
# File 'lib/queight/client.rb', line 23

def with_subscribe_channel(prefetch)
  @channel_pool.with_subscribe_channel(prefetch) do |channel|
    yield(channel)
  end
end

#with_transactional_channelObject



17
18
19
20
21
# File 'lib/queight/client.rb', line 17

def with_transactional_channel
  @channel_pool.with_transactional_channel do |channel|
    yield(channel)
  end
end