Class: ReliableMsg::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/ap4r/queue_manager_ext.rb,
lib/ap4r/queue_manager_ext_debug.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ QueueManager

Hooks original initialize method to add lifecyle listeners. – TODO: Make dispatchers and carriers lifecyle listeners, 2006/09/01 shino



93
94
95
96
97
98
# File 'lib/ap4r/queue_manager_ext.rb', line 93

def initialize options = nil #:notnew:
  initialize_original options
  @global_lock ||= Mutex.new
  @lifecycle_listeners = []
  RetentionHistory.new(self, @logger, @config)
end

Instance Attribute Details

#carriersObject (readonly)

Returns the value of attribute carriers.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def carriers
  @carriers
end

#configObject (readonly)

Returns the value of attribute config.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def config
  @config
end

#dispatchersObject (readonly)

Returns the value of attribute dispatchers.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def dispatchers
  @dispatchers
end

#mutexObject (readonly)

Returns the value of attribute mutex.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def mutex
  @mutex
end

#storeObject (readonly)

Returns the value of attribute store.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def store
  @store
end

#transactionsObject (readonly)

Returns the value of attribute transactions.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def transactions
  @transactions
end

Instance Method Details

#add_lifecycle_listener(listener, iv_name, attr_mode = 'reader') ⇒ Object



100
101
102
103
104
# File 'lib/ap4r/queue_manager_ext.rb', line 100

def add_lifecycle_listener listener, iv_name, attr_mode = 'reader'
  @lifecycle_listeners << listener
  instance_variable_set "@#{iv_name}".to_sym, listener
  self.class.class_eval("attr_#{attr_mode} :#{iv_name}")
end

#eval_to_inspect(code, inspect_mode = :inspect) ⇒ Object Also known as: e2i

Accepts ruby code as a string, evaluates it on self, and returns the result as a formatted string. Formats can be one of followings.

  • :inspect : default value

  • :yaml

  • :json

  • :xml

Apart from :inspect, format can fail depending on the result object.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 20

def eval_to_inspect code, inspect_mode = :inspect
  # TODO: too sloppy implementation
  result = Thread.new(code, inspect_mode){ |c, mode|
    $SAFE = 4
    result = self.instance_eval(c)
  }.value
  case inspect_mode
  when :inspect
    result.inspect
  when :yaml
    result.to_yaml
  when :json
    result.to_json
  when :xml
    result.to_xml
  else
    result.inspect
  end
end

#initialize_originalObject



88
# File 'lib/ap4r/queue_manager_ext.rb', line 88

alias :initialize_original :initialize

#no_active_message?Boolean

Checks queues are all “empty”.

“Empty” means no messages in transaction and all queues but $dlq are empty.

Returns:

  • (Boolean)


45
46
47
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 45

def no_active_message?
  @transactions.size.zero? && @store.queues.all?{|(q, ms)| q == "$dlq" ||  ms.size.zero? }
end

#stale_queue(multi_queue) ⇒ Object

Gets a queue name which has the most stale message. multi_queue specifies the target queue names to search.



82
83
84
# File 'lib/ap4r/queue_manager_ext.rb', line 82

def stale_queue multi_queue
  @store.stale_queue multi_queue
end

#startObject

Starts reliable-msg server and something around it.

Order is:

  1. Original reliable-msg server (message store and druby).

  2. Dispatchers

  3. Carriors (if exists)

These are Reversed in stop.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/ap4r/queue_manager_ext.rb', line 113

def start
  begin
    @global_lock.synchronize do
      return if @@active == self
      start_original

      @dispatchers = ::Ap4r::Dispatchers.new(self, @config.dispatchers, @logger)
      @dispatchers.start

      @carriors = ::Ap4r::Carriers.new(self, @config.carriers, @logger, @dispatchers)
      @carriors.start

      @lifecycle_listeners.each {|l| l.start }
    end
  rescue Exception => err
    @logger.warn{"Error in starting queue-manager #{err}"}
    @logger.warn{err.backtrace.join("\n")}
  end
end

#start_originalObject



86
# File 'lib/ap4r/queue_manager_ext.rb', line 86

alias :start_original :start

#stopObject

Stops reliable-msg server and something around it. See start also.



135
136
137
138
139
140
141
142
143
# File 'lib/ap4r/queue_manager_ext.rb', line 135

def stop
  @global_lock.synchronize do
    return unless @@active == self
    @lifecycle_listeners.each {|l| l.stop }
    @carriors.stop
    @dispatchers.stop
    stop_original
  end
end

#stop_originalObject



87
# File 'lib/ap4r/queue_manager_ext.rb', line 87

alias :stop_original :stop