Class: ReliableMsg::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/ap4r/queue_manager_ext.rb,
lib/ap4r/queue_manager_ext_debug.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ QueueManager

Hooks original initialize method to add lifecyle listeners. – TODO: Make dispatchers and carriers lifecyle listeners, 2006/09/01 shino TODO: and separate them from QueueManager, 2006/09/01 shino



41
42
43
44
45
46
# File 'lib/ap4r/queue_manager_ext.rb', line 41

def initialize options = nil #:notnew:
  initialize_original options
  @global_lock ||= Mutex.new
  @lifecycle_listeners = []
  RetentionHistory.new(self, @logger, @config)
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def config
  @config
end

#mutexObject (readonly)

Returns the value of attribute mutex.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def mutex
  @mutex
end

#storeObject (readonly)

Returns the value of attribute store.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def store
  @store
end

#transactionsObject (readonly)

Returns the value of attribute transactions.



9
10
11
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 9

def transactions
  @transactions
end

Instance Method Details

#add_lifecycle_listener(listener, iv_name, attr_mode = 'reader') ⇒ Object



48
49
50
51
52
# File 'lib/ap4r/queue_manager_ext.rb', line 48

def add_lifecycle_listener listener, iv_name, attr_mode = 'reader'
  @lifecycle_listeners << listener
  instance_variable_set "@#{iv_name}".to_sym, listener
  self.class.class_eval("attr_#{attr_mode} :#{iv_name}")
end

#dispatch_loop(group, targets, index) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/ap4r/queue_manager_ext.rb', line 102

def dispatch_loop(group, targets, index)
  group.add Thread.current
  mq = MultiQueue.new targets
  @logger.info{ "start dispatcher: targets= #{mq} (index #{index})" }
  until Thread.current[:dying]
    sleep 0.1
    # @logger.debug{ "try dispatch #{mq} #{mq.name}" }
    # TODO: needs timeout?, 2006/10/16 shino
    begin
      mq.get{|m|
        @logger.debug{"dispatcher get message\n#{m.to_yaml}"} if m
        response = 
        case m.headers[:dispatch_mode]
        when :HTTP
          dispatch_via_http(m)
        when :XMLRPC
          dispatch_via_xmlrpc(m)
        when :SOAP
          dispatch_via_soap(m)
        else
          raise "undefined dispatch mode #{m.headers[:mode]}"
        end
        @logger.debug{"dispatcher get response\n#{response.to_yaml}"}
      }
    rescue Exception => err
      @logger.warn("dispatch err #{err.inspect}")
      @logger.warn(err.backtrace.join("\n"))
    end
  end
  @logger.info{"end dispatcher #{mq} (index #{index})"}
end

#dispatch_via_http(message) ⇒ Object

Dispatch via a HTTP protocol Current implementation uses POST method, irrespectively options

Determination of “success” is two fold

  • status code should be 200, other codes (including 201-2xx) are treated as error

  • when status code is 200, body should include a string “true”



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/ap4r/queue_manager_ext.rb', line 140

def dispatch_via_http(message)
  #TODO: Add some request headers e.g. User-Agent and Accept, 2006/10/12 shino
  #TODO: Now support POST only, 2006/10/12 shino
  response = Net::HTTP.post_form(URI.parse(message[:target_url]),
                                 message.object)
  @logger.debug{"response status [#{response.code} #{response.message}]"}

  #TODO: make the difinition of success variable, 2006/10/13 shino
  unless response.kind_of?(Net::HTTPOK)
    error_message = "HTTP Response FAILURE, status [#{response.code} #{response.message}]"
    @logger.error(error_message)
    @logger.info{response.to_yaml}
    #TODO: must create AP4R specific Exception class, 2006/10/12 shino
    raise StandardError.new(error_message)
  end
 
  unless response.body =~ /true/
    error_message = "HTTP Response FAILURE, status [#{response.code} #{response.message}], body [#{response.body}]"
    #TODO: Refactor error logging, 2006/10/13 shino
    @logger.error(error_message)
    @logger.info{response.to_yaml}
    #TODO: must create AP4R specific Exception class, 2006/10/12 shino
    raise StandardError.new(error_message)
  end

  response
end

#dispatch_via_soap(message) ⇒ Object



176
177
178
179
180
181
# File 'lib/ap4r/queue_manager_ext.rb', line 176

def dispatch_via_soap(message)
  @logger.debug{message[:target_url]}
  driver = SOAP::WSDLDriverFactory.new(message[:target_url]).create_rpc_driver
  @logger.debug{driver}
  driver.send(message[:target_action], message.object)
end

#dispatch_via_xmlrpc(message) ⇒ Object



168
169
170
171
172
173
174
# File 'lib/ap4r/queue_manager_ext.rb', line 168

def dispatch_via_xmlrpc(message)
  endpoint = message[:target_url]
  client = XMLRPC::Client.new2(endpoint)
  success, response = client.call2(message[:target_action], message.object)
  raise response unless success
  response
end

#eval_to_inspect(code, inspect_mode = :inspect) ⇒ Object Also known as: e2i

Accepts ruby code as a string, evaluates it on self, and returns the result as a formatted string. Formats can be one of followings.

  • :inspect : default value

  • :yaml

  • :json

  • :xml

Apart from :inspect, format can fail depending on the result object.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/ap4r/queue_manager_ext_debug.rb', line 20

def eval_to_inspect code, inspect_mode = :inspect
  # TODO: too sloppy implementation
  result = Thread.new(code, inspect_mode){ |c, mode|
    $SAFE = 4
    result = self.instance_eval(c)
  }.value
  case inspect_mode
  when :inspect
    result.inspect
  when :yaml
    result.to_yaml
  when :json
    result.to_json
  when :xml
    result.to_xml
  else
    result.inspect
  end
end

#initialize_originalObject



35
# File 'lib/ap4r/queue_manager_ext.rb', line 35

alias :initialize_original :initialize

#stale_queue(multi_queue) ⇒ Object

Gets a queue name which has the most stale message in queues specified by multi_queue.



29
30
31
# File 'lib/ap4r/queue_manager_ext.rb', line 29

def stale_queue multi_queue
  @store.stale_queue multi_queue
end

#startObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/ap4r/queue_manager_ext.rb', line 54

def start
  begin
    @global_lock.synchronize do
      return if @@active == self
      @dispatch_targets = ''
      start_original
      start_dispatchers
      start_carriers
      @lifecycle_listeners.each {|l| l.start }
    end
  rescue Exception => err
    @logger.warn{"Error in starting queue-manager #{err}"}
    @logger.warn{err.backtrace.join("\n")}
  end
end

#start_carriersObject



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/ap4r/queue_manager_ext.rb', line 190

def start_carriers
  conf = @config.carriers
  return unless conf

  @logger.info{ "ready to start carrires with config #{conf.to_yaml}" }
  @carriers = ThreadGroup.new
  conf.each { |remote|
    remote["threads"].times { |index|
      Thread.fork(@carriers, remote, index){|group, remote, index|
        # TODO: refactor structure, 2006/10/06 shino
        group.add Thread.current
        @logger.info{ "starting a carrier (index #{index}) for the queue manager #{remote['source_uri']}" }
        uri = remote['source_uri']
        until Thread.current[:dying]
          begin
            sleep 0.1
            #TODO check :dying flag here and break, 2006/09/01 shino
            #TODO cache DRbObject if necessary, 2006/09/01 shino
            remote_qm = DRb::DRbObject.new_with_uri(uri)
            queue_name = remote_qm.stale_queue @dispatch_targets
            next unless queue_name

            @logger.debug{ "stale queue name : #{queue_name}" }
            q = ReliableMsg::Queue.new queue_name, :drb_uri => uri
            q.get { |m|
              unless m
                @logger.debug{ "carrier strikes at the air (T_T)" }
                next
              end
              # @logger.debug{ "carrier gets a message\n#{m.to_yaml}" }

              # TODO: decide the better one, and delete another, 2006/09/01 shino
              # TODO: or switchable implementation in versions, 2006/10/16 shino

              #version 1: use thread fork so queue manager use a different tx
              # TODO probably should have a thread as an instance variable or in a thread local, 2006/09/01 shino
              #Thread.fork(m) {|m|
              #  local_queue = ReliableMsg::Queue.new queue_name
              #  local_queue.put m.object
              #}.join
              
              #version 2: store tx and set nil, and resotre tx after putting a message
              begin
                tx = Thread.current[ReliableMsg::Client::THREAD_CURRENT_TX]
                Thread.current[ReliableMsg::Client::THREAD_CURRENT_TX] = nil
                # @logger.debug{ "before tx: #{tx}" }
                ReliableMsg::Queue.new(queue_name).put(m.object)
              ensure
                Thread.current[ReliableMsg::Client::THREAD_CURRENT_TX] = tx
                # @logger.debug{ "after tx: #{Thread.current[ReliableMsg::Client::THREAD_CURRENT_TX]}" }
              end
            }
          rescue Exception => ex
            @logger.warn "error in remote-get/local-put #{ex}\n#{ex.backtrace.join("\n\t")}\n"
          end
        end
        @logger.info{"ends a carrier (index #{index}) for the queue manager #{remote['uri']}"}
      }
    }
  }
  @logger.info{"queue manager has forked all carriers"}
end

#start_dispatchersObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/ap4r/queue_manager_ext.rb', line 80

def start_dispatchers
  begin
    return unless @config.dispatchers
    @logger.info{ "about to start dispatchers with config #{@config.dispatchers.to_yaml}" }
    @disps = ThreadGroup.new
    @config.dispatchers.each{ |conf|
      conf["threads"].to_i.times { |index|
        Thread.fork(@disps, conf["targets"], index){|group, targets, index|
          dispatch_loop(group, targets, index)
        }
      }
      @dispatch_targets.concat(conf["targets"]).concat(';')
      @logger.info{ "dispatch targets are : #{@dispatch_targets}" }
    }
    @logger.info "queue manager has forked dispatchers"
  rescue Exception => err
    @logger.warn{"Error in starting dipatchers #{err}"}
    @logger.warn{err.backtrace.join("\n")}
    raise err
  end
end

#start_originalObject



33
# File 'lib/ap4r/queue_manager_ext.rb', line 33

alias :start_original :start

#stopObject



70
71
72
73
74
75
76
77
78
# File 'lib/ap4r/queue_manager_ext.rb', line 70

def stop
  @global_lock.synchronize do
    return unless @@active == self
    @lifecycle_listeners.each {|l| l.stop }
    stop_carriers
    stop_dispatchers
    stop_original
  end
end

#stop_carriersObject



253
254
255
256
257
258
# File 'lib/ap4r/queue_manager_ext.rb', line 253

def stop_carriers
  @logger.info{"stop_carriers #{@carriers}"}
  return unless @carriers
  @carriers.list.each{|d| d[:dying] = true}
  @carriers.list.each{|d| d.join }
end

#stop_dispatchersObject



183
184
185
186
187
188
# File 'lib/ap4r/queue_manager_ext.rb', line 183

def stop_dispatchers
  @logger.info{"stop_dispatchers #{@disps}"}
  return unless @disps
  @disps.list.each {|d| d[:dying] = true}
  @disps.list.each {|d| d.join }
end

#stop_originalObject



34
# File 'lib/ap4r/queue_manager_ext.rb', line 34

alias :stop_original :stop