Class: ReliableMsg::QueueManager
- Defined in:
- lib/ap4r/queue_manager_ext.rb,
lib/ap4r/queue_manager_ext_debug.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#store ⇒ Object
readonly
Returns the value of attribute store.
-
#transactions ⇒ Object
readonly
Returns the value of attribute transactions.
Instance Method Summary collapse
- #add_lifecycle_listener(listener, iv_name, attr_mode = 'reader') ⇒ Object
- #dispatch_loop(group, targets, index) ⇒ Object
-
#dispatch_via_http(message) ⇒ Object
Dispatch via a HTTP protocol Current implementation uses POST method, irrespectively options.
- #dispatch_via_soap(message) ⇒ Object
- #dispatch_via_xmlrpc(message) ⇒ Object
-
#eval_to_inspect(code, inspect_mode = :inspect) ⇒ Object
(also: #e2i)
Accepts ruby code as a string, evaluates it on
self
, and returns the result as a formatted string. -
#initialize(options = nil) ⇒ QueueManager
constructor
Hooks original initialize method to add lifecyle listeners.
- #initialize_original ⇒ Object
-
#stale_queue(multi_queue) ⇒ Object
Gets a queue name which has the most stale message in queues specified by
multi_queue
. - #start ⇒ Object
- #start_carriers ⇒ Object
- #start_dispatchers ⇒ Object
- #start_original ⇒ Object
- #stop ⇒ Object
- #stop_carriers ⇒ Object
- #stop_dispatchers ⇒ Object
- #stop_original ⇒ Object
Constructor Details
#initialize(options = nil) ⇒ QueueManager
Hooks original initialize method to add lifecyle listeners. – TODO: Make dispatchers and carriers lifecyle listeners, 2006/09/01 shino TODO: and separate them from QueueManager, 2006/09/01 shino
41 42 43 44 45 46 |
# File 'lib/ap4r/queue_manager_ext.rb', line 41 def initialize = nil #:notnew: initialize_original @global_lock ||= Mutex.new @lifecycle_listeners = [] RetentionHistory.new(self, @logger, @config) end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
9 10 11 |
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9 def config @config end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
9 10 11 |
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9 def mutex @mutex end |
#store ⇒ Object (readonly)
Returns the value of attribute store.
9 10 11 |
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9 def store @store end |
#transactions ⇒ Object (readonly)
Returns the value of attribute transactions.
9 10 11 |
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9 def transactions @transactions end |
Instance Method Details
#add_lifecycle_listener(listener, iv_name, attr_mode = 'reader') ⇒ Object
48 49 50 51 52 |
# File 'lib/ap4r/queue_manager_ext.rb', line 48 def add_lifecycle_listener listener, iv_name, attr_mode = 'reader' @lifecycle_listeners << listener instance_variable_set "@#{iv_name}".to_sym, listener self.class.class_eval("attr_#{attr_mode} :#{iv_name}") end |
#dispatch_loop(group, targets, index) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/ap4r/queue_manager_ext.rb', line 102 def dispatch_loop(group, targets, index) group.add Thread.current mq = MultiQueue.new targets @logger.info{ "start dispatcher: targets= #{mq} (index #{index})" } until Thread.current[:dying] sleep 0.1 # @logger.debug{ "try dispatch #{mq} #{mq.name}" } # TODO: needs timeout?, 2006/10/16 shino begin mq.get{|m| @logger.debug{"dispatcher get message\n#{m.to_yaml}"} if m response = case m.headers[:dispatch_mode] when :HTTP dispatch_via_http(m) when :XMLRPC dispatch_via_xmlrpc(m) when :SOAP dispatch_via_soap(m) else raise "undefined dispatch mode #{m.headers[:mode]}" end @logger.debug{"dispatcher get response\n#{response.to_yaml}"} } rescue Exception => err @logger.warn("dispatch err #{err.inspect}") @logger.warn(err.backtrace.join("\n")) end end @logger.info{"end dispatcher #{mq} (index #{index})"} end |
#dispatch_via_http(message) ⇒ Object
Dispatch via a HTTP protocol Current implementation uses POST method, irrespectively options
Determination of “success” is two fold
-
status code should be 200, other codes (including 201-2xx) are treated as error
-
when status code is 200, body should include a string “true”
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/ap4r/queue_manager_ext.rb', line 140 def dispatch_via_http() #TODO: Add some request headers e.g. User-Agent and Accept, 2006/10/12 shino #TODO: Now support POST only, 2006/10/12 shino response = Net::HTTP.post_form(URI.parse([:target_url]), .object) @logger.debug{"response status [#{response.code} #{response.}]"} #TODO: make the difinition of success variable, 2006/10/13 shino unless response.kind_of?(Net::HTTPOK) = "HTTP Response FAILURE, status [#{response.code} #{response.}]" @logger.error() @logger.info{response.to_yaml} #TODO: must create AP4R specific Exception class, 2006/10/12 shino raise StandardError.new() end unless response.body =~ /true/ = "HTTP Response FAILURE, status [#{response.code} #{response.}], body [#{response.body}]" #TODO: Refactor error logging, 2006/10/13 shino @logger.error() @logger.info{response.to_yaml} #TODO: must create AP4R specific Exception class, 2006/10/12 shino raise StandardError.new() end response end |
#dispatch_via_soap(message) ⇒ Object
176 177 178 179 180 181 |
# File 'lib/ap4r/queue_manager_ext.rb', line 176 def dispatch_via_soap() @logger.debug{[:target_url]} driver = SOAP::WSDLDriverFactory.new([:target_url]).create_rpc_driver @logger.debug{driver} driver.send([:target_action], .object) end |
#dispatch_via_xmlrpc(message) ⇒ Object
168 169 170 171 172 173 174 |
# File 'lib/ap4r/queue_manager_ext.rb', line 168 def dispatch_via_xmlrpc() endpoint = [:target_url] client = XMLRPC::Client.new2(endpoint) success, response = client.call2([:target_action], .object) raise response unless success response end |
#eval_to_inspect(code, inspect_mode = :inspect) ⇒ Object Also known as: e2i
Accepts ruby code as a string, evaluates it on self
, and returns the result as a formatted string. Formats can be one of followings.
-
:inspect
: default value -
:yaml
-
:json
-
:xml
Apart from :inspect
, format can fail depending on the result object.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 20 def eval_to_inspect code, inspect_mode = :inspect # TODO: too sloppy implementation result = Thread.new(code, inspect_mode){ |c, mode| $SAFE = 4 result = self.instance_eval(c) }.value case inspect_mode when :inspect result.inspect when :yaml result.to_yaml when :json result.to_json when :xml result.to_xml else result.inspect end end |
#initialize_original ⇒ Object
35 |
# File 'lib/ap4r/queue_manager_ext.rb', line 35 alias :initialize_original :initialize |
#stale_queue(multi_queue) ⇒ Object
Gets a queue name which has the most stale message in queues specified by multi_queue
.
29 30 31 |
# File 'lib/ap4r/queue_manager_ext.rb', line 29 def stale_queue multi_queue @store.stale_queue multi_queue end |
#start ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/ap4r/queue_manager_ext.rb', line 54 def start begin @global_lock.synchronize do return if @@active == self @dispatch_targets = '' start_original start_dispatchers start_carriers @lifecycle_listeners.each {|l| l.start } end rescue Exception => err @logger.warn{"Error in starting queue-manager #{err}"} @logger.warn{err.backtrace.join("\n")} end end |
#start_carriers ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/ap4r/queue_manager_ext.rb', line 190 def start_carriers conf = @config.carriers return unless conf @logger.info{ "ready to start carrires with config #{conf.to_yaml}" } @carriers = ThreadGroup.new conf.each { |remote| remote["threads"].times { |index| Thread.fork(@carriers, remote, index){|group, remote, index| # TODO: refactor structure, 2006/10/06 shino group.add Thread.current @logger.info{ "starting a carrier (index #{index}) for the queue manager #{remote['source_uri']}" } uri = remote['source_uri'] until Thread.current[:dying] begin sleep 0.1 #TODO check :dying flag here and break, 2006/09/01 shino #TODO cache DRbObject if necessary, 2006/09/01 shino remote_qm = DRb::DRbObject.new_with_uri(uri) queue_name = remote_qm.stale_queue @dispatch_targets next unless queue_name @logger.debug{ "stale queue name : #{queue_name}" } q = ReliableMsg::Queue.new queue_name, :drb_uri => uri q.get { |m| unless m @logger.debug{ "carrier strikes at the air (T_T)" } next end # @logger.debug{ "carrier gets a message\n#{m.to_yaml}" } # TODO: decide the better one, and delete another, 2006/09/01 shino # TODO: or switchable implementation in versions, 2006/10/16 shino #version 1: use thread fork so queue manager use a different tx # TODO probably should have a thread as an instance variable or in a thread local, 2006/09/01 shino #Thread.fork(m) {|m| # local_queue = ReliableMsg::Queue.new queue_name # local_queue.put m.object #}.join #version 2: store tx and set nil, and resotre tx after putting a message begin tx = Thread.current[ReliableMsg::Client::THREAD_CURRENT_TX] Thread.current[ReliableMsg::Client::THREAD_CURRENT_TX] = nil # @logger.debug{ "before tx: #{tx}" } ReliableMsg::Queue.new(queue_name).put(m.object) ensure Thread.current[ReliableMsg::Client::THREAD_CURRENT_TX] = tx # @logger.debug{ "after tx: #{Thread.current[ReliableMsg::Client::THREAD_CURRENT_TX]}" } end } rescue Exception => ex @logger.warn "error in remote-get/local-put #{ex}\n#{ex.backtrace.join("\n\t")}\n" end end @logger.info{"ends a carrier (index #{index}) for the queue manager #{remote['uri']}"} } } } @logger.info{"queue manager has forked all carriers"} end |
#start_dispatchers ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/ap4r/queue_manager_ext.rb', line 80 def start_dispatchers begin return unless @config.dispatchers @logger.info{ "about to start dispatchers with config #{@config.dispatchers.to_yaml}" } @disps = ThreadGroup.new @config.dispatchers.each{ |conf| conf["threads"].to_i.times { |index| Thread.fork(@disps, conf["targets"], index){|group, targets, index| dispatch_loop(group, targets, index) } } @dispatch_targets.concat(conf["targets"]).concat(';') @logger.info{ "dispatch targets are : #{@dispatch_targets}" } } @logger.info "queue manager has forked dispatchers" rescue Exception => err @logger.warn{"Error in starting dipatchers #{err}"} @logger.warn{err.backtrace.join("\n")} raise err end end |
#start_original ⇒ Object
33 |
# File 'lib/ap4r/queue_manager_ext.rb', line 33 alias :start_original :start |
#stop ⇒ Object
70 71 72 73 74 75 76 77 78 |
# File 'lib/ap4r/queue_manager_ext.rb', line 70 def stop @global_lock.synchronize do return unless @@active == self @lifecycle_listeners.each {|l| l.stop } stop_carriers stop_dispatchers stop_original end end |
#stop_carriers ⇒ Object
253 254 255 256 257 258 |
# File 'lib/ap4r/queue_manager_ext.rb', line 253 def stop_carriers @logger.info{"stop_carriers #{@carriers}"} return unless @carriers @carriers.list.each{|d| d[:dying] = true} @carriers.list.each{|d| d.join } end |
#stop_dispatchers ⇒ Object
183 184 185 186 187 188 |
# File 'lib/ap4r/queue_manager_ext.rb', line 183 def stop_dispatchers @logger.info{"stop_dispatchers #{@disps}"} return unless @disps @disps.list.each {|d| d[:dying] = true} @disps.list.each {|d| d.join } end |
#stop_original ⇒ Object
34 |
# File 'lib/ap4r/queue_manager_ext.rb', line 34 alias :stop_original :stop |