Module: Wakame::AMQPClient::ClassMethods

Defined in:
lib/wakame/amqp_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#defered_setup_callsObject (readonly)

Returns the value of attribute defered_setup_calls.



20
21
22
# File 'lib/wakame/amqp_client.rb', line 20

def defered_setup_calls
  @defered_setup_calls
end

Instance Method Details

#add_subscriber(*args) ⇒ Object



85
86
87
# File 'lib/wakame/amqp_client.rb', line 85

def add_subscriber(*args)
  self.instance.add_subscriber(*args)
end

#amqObject



77
78
79
# File 'lib/wakame/amqp_client.rb', line 77

def amq
  Thread.current[:mq]
end

#define_exchange(name, type = :fanout) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/wakame/amqp_client.rb', line 89

def define_exchange(name, type=:fanout)
  def_ex = proc { |inst|
    inst.amq.__send__(type, name)
  }

  (@defered_setup_calls ||= []) << def_ex
  
  #if [email protected]? && @instance.connected?
  #  def_ex.call(@instance)
  #end
end

#define_queue(name, exchange_name, opts = {}) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/wakame/amqp_client.rb', line 101

def define_queue(name, exchange_name, opts={})
  def_q = proc { |inst|
    inst.define_queue(name, exchange_name, opts)
  }

  (@defered_setup_calls ||= []) << def_q

  #if [email protected]? && @instance.connected?
  #  def_q.call(@instance)
  #end
end

#instanceObject



23
24
25
# File 'lib/wakame/amqp_client.rb', line 23

def instance
  @instance
end

#publish_to(*args) ⇒ Object



81
82
83
# File 'lib/wakame/amqp_client.rb', line 81

def publish_to(*args)
  self.instance.publish_to(*args)
end

#start(*opts) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/wakame/amqp_client.rb', line 27

def start(*opts)
  new_instance = proc {
    if @instance.nil?
      @instance = new(*opts)
      @instance.connect(*opts) do
        @instance.init
      end
    end
    @instance
  }
  
  if EM.reactor_running?
    return new_instance.call
  else
    EM.run new_instance
  end
end

#stop(&blk) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/wakame/amqp_client.rb', line 46

def stop(&blk)
  #EM.add_timer(1){
  EM.next_tick {
    end_proc = proc {
      EventDispatcher.reset

      unless blk.nil?
        blk.call
      end
      EM.stop
    }

    catch(:nop) {
    if @instance.nil?
      end_proc.call
      throw :nop
    end
      
    begin
      unless @instance.amqp_client.nil?
        @instance.close { end_proc.call }
      else
        end_proc.call
      end
    ensure
      @instance = nil
    end
    }
  }
end