Class: ASIR::Transport::Resque

Inherits:
ConnectionOriented show all
Includes:
PollThrottle
Defined in:
lib/asir/transport/resque.rb

Overview

!SLIDE Resque Transport

Constant Summary collapse

DEFAULT_QUEUE =
'asir'.freeze

Constants included from PayloadIO

PayloadIO::FOOTER, PayloadIO::HEADER

Constants included from UriConfig

UriConfig::S_LOCALHOST, UriConfig::S_TCP

Constants included from ASIR::ThreadVariable

ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER

Instance Attribute Summary collapse

Attributes included from UriConfig

#host, #host_default, #path, #path_default, #port, #port_default, #scheme, #scheme_default, #uri

Attributes inherited from ASIR::Transport

#after_receive_message, #before_send_message, #decoder, #encoder, #invoker, #message_count, #needs_message_identifier, #needs_message_timestamp, #on_exception, #on_result_exception, #one_way, #running, #verbose

Attributes included from Log

#_logger

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PollThrottle

#poll_throttle

Methods inherited from ConnectionOriented

#_after_connect!, #_before_close!, #_connect!, #_server_close!, #connect!, #prepare_server!, #run_server!, #serve_connection!, #stream

Methods included from PayloadIO

#_read, #_read_line_and_expect!, #_write, #close

Methods included from UriConfig

#_uri, #address, #protocol

Methods inherited from Stream

#_serve_stream!, #serve_stream!

Methods inherited from ASIR::Transport

#_subclass_responsibility, #invoke_message!, #needs_message_identifier?, #needs_message_timestamp?, #receive_message, #receive_result, #send_message, #send_result, #serve_message!, #stop!, #with_server_signals!

Methods included from Log

#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included

Methods included from AdditionalData

#[], #[]=, #additional_data, #additional_data!, #additional_data=, included

Methods included from Message::Delay

#relative_message_delay!, #wait_for_delay!

Methods included from ASIR::ThreadVariable

included, setter

Constructor Details

#initialize(*args) ⇒ Resque

Returns a new instance of Resque.



14
15
16
17
18
19
# File 'lib/asir/transport/resque.rb', line 14

def initialize *args
  @port_default = 6379
  @scheme_default = 'redis'.freeze
  super
  self.one_way = true
end

Instance Attribute Details

#namespaceObject

Returns the value of attribute namespace.



12
13
14
# File 'lib/asir/transport/resque.rb', line 12

def namespace
  @namespace
end

#queueObject

Defaults to ‘asir’.



78
79
80
# File 'lib/asir/transport/resque.rb', line 78

def queue
  @queue
end

#queuesObject

Returns the value of attribute queues.



12
13
14
# File 'lib/asir/transport/resque.rb', line 12

def queues
  @queues
end

#throttleObject

Returns the value of attribute throttle.



12
13
14
# File 'lib/asir/transport/resque.rb', line 12

def throttle
  @throttle
end

Class Method Details

.perform(payload) ⇒ Object

Class method entry point from Resque::Job.perform.



120
121
122
123
124
125
# File 'lib/asir/transport/resque.rb', line 120

def self.perform payload
  # $stderr.puts "  #{self} process_job payload=#{payload.inspect}"
  t = Thread.current[:asir_transport_resque_instance]
  # Pass payload as in_stream; _receive_message will return it.
  t.serve_message! payload, nil
end

Instance Method Details

#_client_connect!Object

!SLIDE Resque client.



23
24
25
26
27
28
# File 'lib/asir/transport/resque.rb', line 23

def _client_connect!
  # $stderr.puts "  #{$$} #{self} _client_connect!"
  resque_connect!
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{uri}: #{exc.message}", exc.backtrace
end

#_receive_message(payload, additional_data) ⇒ Object

is actual payload



127
128
129
130
# File 'lib/asir/transport/resque.rb', line 127

def _receive_message payload, additional_data # is actual payload
  # $stderr.puts "  #{$$} #{self} _receive_message payload=#{payload.inspect}"
  [ payload, nil ]
end

#_receive_result(message, opaque_result) ⇒ Object



40
41
42
43
# File 'lib/asir/transport/resque.rb', line 40

def _receive_result message, opaque_result
  return nil if one_way || message.one_way
  super
end

#_send_message(message, message_payload) ⇒ Object



50
51
52
53
54
55
# File 'lib/asir/transport/resque.rb', line 50

def _send_message message, message_payload
  stream.with_stream! do | io |  # Force connect
    $stderr.puts "  #{$$} #{self} _send_message #{message_payload.inspect} to queue=#{queue.inspect} as #{self.class} :process_job" if @verbose >= 2
    ::Resque.enqueue_to(queue, self.class, message_payload)
  end
end

#_send_result(message, result, result_payload, stream, message_state) ⇒ Object



45
46
47
48
# File 'lib/asir/transport/resque.rb', line 45

def _send_result message, result, result_payload, stream, message_state
  return nil if one_way || message.one_way
  super
end

#_server!Object

!SLIDE Resque server (worker).



32
33
34
35
36
37
38
# File 'lib/asir/transport/resque.rb', line 32

def _server!
  # $stderr.puts "  #{$$} #{self} _server!"
  resque_connect!
  resque_worker
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{uri}: #{exc.message}", exc.backtrace
end

#_server_accept_connection!(server) ⇒ Object



89
90
91
# File 'lib/asir/transport/resque.rb', line 89

def _server_accept_connection! server
  [ server, server ]
end

#_server_close_connection!(in_stream, out_stream) ⇒ Object

Nothing to be closed for Resque.



99
100
101
# File 'lib/asir/transport/resque.rb', line 99

def _server_close_connection! in_stream, out_stream
  # NOTHING
end

#_start_conduit!Object



197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/asir/transport/resque.rb', line 197

def _start_conduit!
  @redis_dir ||= "/tmp"
  @redis_conf ||= "#{@redis_dir}/asir-redis-#{port}.conf"
  @redis_log ||= "#{@redis_dir}/asir-redis-#{port}.log"
  ::File.open(@redis_conf, "w+") do | out |
    out.puts "daemonize no"
    out.puts "port #{port}"
    out.puts "loglevel warning"
    out.puts "logfile #{@redis_log}"
  end
  exec "redis-server", @redis_conf
end

#namespace_Object

Defaults to ‘asir’.



83
84
85
# File 'lib/asir/transport/resque.rb', line 83

def namespace_
  @namespace_ ||= namespace || DEFAULT_QUEUE
end

#queues_Object

Defaults to [ ‘asir’ ].



72
73
74
75
# File 'lib/asir/transport/resque.rb', line 72

def queues_
  @queues_ ||=
    queues.empty? ? [ DEFAULT_QUEUE ] : queues.freeze
end

#resque_connect!Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/asir/transport/resque.rb', line 144

def resque_connect!
  @redis_config = {
    :host => host,
    :port => port,
    :thread_safe => true,
  }
  @redis =
    ::Redis.new(@redis_config)
  if namespace_
    ::Resque.redis =
      @redis =
      ::Redis::Namespace.new(namespace_, :redis => @redis)
    ::Resque.redis.namespace = namespace_
  else
    ::Resque.redis = @redis
  end
  # $stderr.puts "  *** #{$$} #{self} resque_connect! #{@redis.inspect}"
  @redis
end

#resque_disconnect!Object



164
165
166
# File 'lib/asir/transport/resque.rb', line 164

def resque_disconnect!
  ::Resque.redis = nil
end

#resque_uriObject



134
135
136
137
138
139
140
141
142
# File 'lib/asir/transport/resque.rb', line 134

def resque_uri
  @resque_uri ||=
    (
    unless scheme == 'redis'
      raise ArgumentError, "Invalid resque URI: #{uri.inspect}"
    end
    _uri
    )
end

#resque_workerObject



168
169
170
# File 'lib/asir/transport/resque.rb', line 168

def resque_worker
  @resque_worker ||= ::Resque::Worker.new(queues_)
end

#serve_stream_message!(in_stream, out_stream) ⇒ Object

ignored



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/asir/transport/resque.rb', line 103

def serve_stream_message! in_stream, out_stream # ignored
  save = Thread.current[:asir_transport_resque_instance]
  Thread.current[:asir_transport_resque_instance] = self
  poll_throttle throttle do
    $stderr.puts "  #{$$} #{self} serve_stream_message!: resque_worker = #{resque_worker} on queues #{resque_worker.queues}" if @verbose >= 3
    if job = resque_worker.reserve
      $stderr.puts "  #{$$} #{self} serve_stream_message! job=#{job.class}:#{job.inspect}" if @verbose >= 2
      resque_worker.process(job)
    end
    job
  end
  self
ensure
  Thread.current[:asir_transport_resque_instance] = save
end

#server_on_start!Object



172
173
174
175
176
177
178
179
180
181
# File 'lib/asir/transport/resque.rb', line 172

def server_on_start!
  # prune_dead_workers expects processes to have "resque " in the name.
  @save_progname ||= $0.dup
  $0 = "resque #{$0}"
  if worker = resque_worker
    worker.prune_dead_workers
    worker.register_worker
  end
  self
end

#server_on_stop!Object



183
184
185
186
187
188
189
190
191
192
193
# File 'lib/asir/transport/resque.rb', line 183

def server_on_stop!
  $0 = @save_progname if @save_progname
  if worker = @resque_worker
    worker.unregister_worker
  end
  self
rescue Redis::CannotConnectError
  # This error is not actionable since server
  # is stopping.
  nil
end

#stream_eof?(stream) ⇒ Boolean

Resque is message-oriented, process only one message per “connection”.

Returns:

  • (Boolean)


94
95
96
# File 'lib/asir/transport/resque.rb', line 94

def stream_eof? stream
  false
end