Class: Klomp

Inherits:
Object
  • Object
show all
Defined in:
lib/klomp.rb,
lib/klomp/frames.rb,
lib/klomp/sentinel.rb,
lib/klomp/connection.rb,
lib/klomp/subscription.rb

Defined Under Namespace

Modules: Frames Classes: Connection, Error, FrameError, Sentinel, Subscription

Constant Summary collapse

VERSION =
'1.0.8'
FRAME_SEP =

null character is frame separator

"\x00"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(servers, options = {}) ⇒ Klomp

Returns a new instance of Klomp.

Raises:

  • (ArgumentError)


8
9
10
11
12
# File 'lib/klomp.rb', line 8

def initialize(servers, options = {})
  servers = [servers].flatten
  raise ArgumentError, "no servers given" if servers.empty?
  @connections = servers.map {|s| Connection.new(s, options) }
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



6
7
8
# File 'lib/klomp.rb', line 6

def connections
  @connections
end

Instance Method Details

#connected?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/klomp.rb', line 41

def connected?
  connections.detect(&:connected?)
end

#disconnectObject



45
46
47
48
49
# File 'lib/klomp.rb', line 45

def disconnect
  connections.map {|conn| conn.disconnect }.tap do
    @connections = []
  end
end

#publish(queue, body, headers = {}) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/klomp.rb', line 14

def publish(queue, body, headers = {})
  connections_remaining = connections.dup
  begin
    conn = connections_remaining[rand(connections_remaining.size)]
    conn.publish(queue, body, headers)
  rescue
    connections_remaining.delete conn
    retry unless connections_remaining.empty?
    raise
  end
end

#subscribe(queue, subscriber = nil, headers = {}, &block) ⇒ Object



27
28
29
# File 'lib/klomp.rb', line 27

def subscribe(queue, subscriber = nil, headers = {}, &block)
  connections.map {|conn| conn.subscribe(queue, subscriber, headers, &block) }
end

#unsubscribe(queue, headers = {}) ⇒ Object



31
32
33
34
35
36
37
38
39
# File 'lib/klomp.rb', line 31

def unsubscribe(queue, headers = {})
  if Array === queue
    raise ArgumentError,
      "wrong size array for #{connections.size} (#{queue.size})" unless connections.size == queue.size
    connections.zip(queue).map {|conn,arg| conn.unsubscribe(arg, headers) rescue nil }
  else
    connections.map {|conn| conn.unsubscribe(queue, headers) rescue nil }
  end
end