Class: BackgrounDRb::RailsWorkerProxy

Inherits:
Object
  • Object
show all
Defined in:
lib/backgroundrb/rails_worker_proxy.rb

Overview

A Worker proxy, which uses method_missing for delegating method calls to the workers

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(p_worker_name, p_worker_key = nil, p_middle_man = nil) ⇒ RailsWorkerProxy

create new worker proxy



7
8
9
10
11
12
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 7

def initialize(p_worker_name,p_worker_key = nil,p_middle_man = nil)
  @worker_name = p_worker_name
  @middle_man = p_middle_man
  @worker_key = p_worker_key
  @tried_connections = []
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_id, *args) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 14

def method_missing(method_id,*args)
  worker_method = method_id.to_s
  arguments = args.first

  arg,job_key,host_info,scheduled_at,priority = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at, :priority)

  # allow both arg and args
  arg ||= arguments && arguments[:args]

  new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc

  if worker_method =~ /^async_(\w+)/
    method_name = $1
    worker_options = compact(:worker => worker_name,:worker_key => worker_key,
                             :worker_method => method_name,:job_key => job_key, :arg => arg)
    run_method(host_info,:ask_work,worker_options)
  elsif worker_method =~ /^enq_(\w+)/i
    raise NoJobKey.new("Must specify a job key with enqueued tasks") if job_key.blank?
    method_name = $1
    marshalled_args = Marshal.dump(arg)
    enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,
                         :worker_method => method_name.to_s,:job_key => job_key.to_s, :priority => priority,
                         :args => marshalled_args,:timeout => arguments ? arguments[:timeout] : nil,:scheduled_at => new_schedule))
   elsif worker_method =~ /^deq_(\w+)/i
    raise NoJobKey.new("Must specify a job key to dequeue tasks") if job_key.blank?
    method_name = $1
    dequeue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,
                         :worker_method => method_name.to_s,:job_key => job_key.to_s))
  else
    worker_options = compact(:worker => worker_name,:worker_key => worker_key,
                             :worker_method => worker_method,:job_key => job_key,:arg => arg)
    run_method(host_info,:send_request,worker_options)
  end
end

Instance Attribute Details

#dataObject

Returns the value of attribute data.



4
5
6
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4

def data
  @data
end

#middle_manObject

Returns the value of attribute middle_man.



4
5
6
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4

def middle_man
  @middle_man
end

#worker_keyObject

Returns the value of attribute worker_key.



4
5
6
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4

def worker_key
  @worker_key
end

#worker_methodObject

Returns the value of attribute worker_method.



4
5
6
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4

def worker_method
  @worker_method
end

#worker_nameObject

Returns the value of attribute worker_name.



4
5
6
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4

def worker_name
  @worker_name
end

Instance Method Details

#ask_result(job_key) ⇒ Object

get results back from the cache. Cache can be in-memory worker cache or memcache based cache



114
115
116
117
118
119
120
121
122
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 114

def ask_result job_key
  options = compact(:worker => worker_name,:worker_key => worker_key,:job_key => job_key)
  if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
    return_result_from_memcache(options)
  else
    result = middle_man.backend_connections.map { |conn| conn.ask_result(options) }
    return_result(result)
  end
end

#choose_connection(host_info) ⇒ Object

choose a worker



165
166
167
168
169
170
171
172
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 165

def choose_connection host_info
  case host_info
  when :all; middle_man.backend_connections
  when :local; middle_man.find_local
  when String; middle_man.find_connection(host_info)
  else; middle_man.choose_server
  end
end

#compact(options = { }) ⇒ Object

helper method to compact a hash and for getting rid of nil parameters



175
176
177
178
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 175

def compact(options = { })
  options.delete_if { |key,value| value.nil? }
  options
end

#deleteObject

delete a worker



157
158
159
160
161
162
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 157

def delete
  middle_man.backend_connections.each do |connection|
    connection.delete_worker(compact(:worker => worker_name, :worker_key => worker_key))
  end
  return worker_key
end

#dequeue_task(options = {}) ⇒ Object

remove tasks from the worker pool



55
56
57
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 55

def dequeue_task options = {}
  BdrbJobQueue.remove_job(options)
end

#enqueue_task(options = {}) ⇒ Object

enqueue tasks to the worker pool



50
51
52
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 50

def enqueue_task options = {}
  BdrbJobQueue.insert_job(options)
end

#gen_key(options) ⇒ Object

generate worker key



132
133
134
135
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 132

def gen_key options
  key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_')
  key
end

#invoke_on_connection(connection, method_name, options = {}) ⇒ Object

choose a backgroundrb server connection and invoke worker method on it.

Raises:



107
108
109
110
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 107

def invoke_on_connection connection,method_name,options = {}
  raise NoServerAvailable.new("No BackgrounDRb is found running") unless connection
  connection.send(method_name,options)
end

#process_result(t_result) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 87

def process_result t_result
  case t_result
  when Hash
    if(t_result[:result] == true && t_result[:type] = :response)
      if(t_result[:result_flag] == "ok")
        return t_result[:data]
      else
        raise RemoteWorkerError.new("Error while executing worker method")
      end
    elsif(t_result[:result_flag] == "ok")
      "ok"
    elsif(t_result[:result_flag] == "error")
      raise RemoteWorkerError.new("Error while executing worker method")
    end
  when Array
    t_result
  end
end

#reset_memcache_result(job_key, value) ⇒ Object

reset result within memcache for given key



143
144
145
146
147
148
149
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 143

def reset_memcache_result(job_key,value)
  options = compact(:worker => worker_name,:worker_key => worker_key,\
                      :job_key => job_key)
  key = gen_key(options)
  middle_man.cache[key] = value
  value
end

#return_result(result) ⇒ Object



151
152
153
154
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 151

def return_result result
  result = Array(result)
  result.size <= 1 ? result[0] : result
end

#return_result_from_memcache(options = {}) ⇒ Object

return result from memcache



138
139
140
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 138

def return_result_from_memcache options = {}
  middle_man.cache[gen_key(options)]
end

#run_method(host_info, method_name, worker_options = {}) ⇒ Object

invoke method on worker

Raises:



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 60

def run_method host_info,method_name,worker_options = {}
  result = []
  connection = choose_connection(host_info)
  raise NoServerAvailable.new("No BackgrounDRb server is found running") if connection.blank?
  if host_info == :local or host_info.is_a?(String)
    result << invoke_on_connection(connection,method_name,worker_options)
  elsif host_info == :all
    succeeded = false
    begin
      connection.each { |conn| result << invoke_on_connection(conn,method_name,worker_options) }
      succeeded = true
    rescue BdrbConnError; end
    raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded
  else
    @tried_connections = [connection.server_info]
    begin
      result << invoke_on_connection(connection,method_name,worker_options)
    rescue BdrbConnError => e
      connection = middle_man.find_next_except_these(@tried_connections)
      @tried_connections << connection.server_info
      retry
    end
  end
  #return nil if method_name == :ask_work
  process_result(return_result(result))
end

#worker_infoObject

return runtime information about worker



125
126
127
128
129
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 125

def worker_info
  t_connections = middle_man.backend_connections
  result = t_connections.map { |conn| conn.worker_info(compact(:worker => worker_name,:worker_key => worker_key)) }
  return_result(result)
end