Class: ASIR::Transport::Resque
- Inherits:
-
ConnectionOriented
- Object
- ASIR::Transport
- Stream
- ConnectionOriented
- ASIR::Transport::Resque
- Includes:
- PollThrottle
- Defined in:
- lib/asir/transport/resque.rb
Overview
!SLIDE Resque Transport
Constant Summary collapse
- DEFAULT_QUEUE =
'asir'.freeze
Constants included from PayloadIO
PayloadIO::FOOTER, PayloadIO::HEADER
Constants included from UriConfig
UriConfig::S_LOCALHOST, UriConfig::S_TCP
Constants included from ASIR::ThreadVariable
ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER
Instance Attribute Summary collapse
-
#namespace ⇒ Object
Returns the value of attribute namespace.
-
#queue ⇒ Object
Defaults to ‘asir’.
-
#queues ⇒ Object
Returns the value of attribute queues.
-
#throttle ⇒ Object
Returns the value of attribute throttle.
Attributes included from UriConfig
#host, #host_default, #path, #path_default, #port, #port_default, #scheme, #scheme_default, #uri
Attributes inherited from ASIR::Transport
#after_receive_message, #before_send_message, #decoder, #encoder, #invoker, #message_count, #needs_message_identifier, #needs_message_timestamp, #on_exception, #on_result_exception, #one_way, #running, #verbose
Attributes included from Log
Class Method Summary collapse
-
.perform(payload) ⇒ Object
Class method entry point from Resque::Job.perform.
Instance Method Summary collapse
-
#_client_connect! ⇒ Object
!SLIDE Resque client.
-
#_receive_message(payload, additional_data) ⇒ Object
is actual payload.
- #_receive_result(message, opaque_result) ⇒ Object
- #_send_message(message, message_payload) ⇒ Object
- #_send_result(message, result, result_payload, stream, message_state) ⇒ Object
-
#_server! ⇒ Object
!SLIDE Resque server (worker).
- #_server_accept_connection!(server) ⇒ Object
-
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for Resque.
- #_start_conduit! ⇒ Object
-
#initialize(*args) ⇒ Resque
constructor
A new instance of Resque.
-
#namespace_ ⇒ Object
Defaults to ‘asir’.
-
#queues_ ⇒ Object
Defaults to [ ‘asir’ ].
- #resque_connect! ⇒ Object
- #resque_disconnect! ⇒ Object
- #resque_uri ⇒ Object
- #resque_worker ⇒ Object
-
#serve_stream_message!(in_stream, out_stream) ⇒ Object
ignored.
- #server_on_start! ⇒ Object
- #server_on_stop! ⇒ Object
-
#stream_eof?(stream) ⇒ Boolean
Resque is message-oriented, process only one message per “connection”.
Methods included from PollThrottle
Methods inherited from ConnectionOriented
#_after_connect!, #_before_close!, #_connect!, #_server_close!, #connect!, #prepare_server!, #run_server!, #serve_connection!, #stream
Methods included from PayloadIO
#_read, #_read_line_and_expect!, #_write, #close
Methods included from UriConfig
Methods inherited from Stream
#_serve_stream!, #serve_stream!
Methods inherited from ASIR::Transport
#_subclass_responsibility, #invoke_message!, #needs_message_identifier?, #needs_message_timestamp?, #receive_message, #receive_result, #send_message, #send_result, #serve_message!, #stop!, #with_server_signals!
Methods included from Log
#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included
Methods included from AdditionalData
#[], #[]=, #additional_data, #additional_data!, #additional_data=, included
Methods included from Message::Delay
#relative_message_delay!, #wait_for_delay!
Methods included from ASIR::ThreadVariable
Constructor Details
#initialize(*args) ⇒ Resque
Returns a new instance of Resque.
14 15 16 17 18 19 |
# File 'lib/asir/transport/resque.rb', line 14 def initialize *args @port_default = 6379 @scheme_default = 'redis'.freeze super self.one_way = true end |
Instance Attribute Details
#namespace ⇒ Object
Returns the value of attribute namespace.
12 13 14 |
# File 'lib/asir/transport/resque.rb', line 12 def namespace @namespace end |
#queue ⇒ Object
Defaults to ‘asir’.
78 79 80 |
# File 'lib/asir/transport/resque.rb', line 78 def queue @queue end |
#queues ⇒ Object
Returns the value of attribute queues.
12 13 14 |
# File 'lib/asir/transport/resque.rb', line 12 def queues @queues end |
#throttle ⇒ Object
Returns the value of attribute throttle.
12 13 14 |
# File 'lib/asir/transport/resque.rb', line 12 def throttle @throttle end |
Class Method Details
.perform(payload) ⇒ Object
Class method entry point from Resque::Job.perform.
120 121 122 123 124 125 |
# File 'lib/asir/transport/resque.rb', line 120 def self.perform payload # $stderr.puts " #{self} process_job payload=#{payload.inspect}" t = Thread.current[:asir_transport_resque_instance] # Pass payload as in_stream; _receive_message will return it. t. payload, nil end |
Instance Method Details
#_client_connect! ⇒ Object
!SLIDE Resque client.
23 24 25 26 27 28 |
# File 'lib/asir/transport/resque.rb', line 23 def _client_connect! # $stderr.puts " #{$$} #{self} _client_connect!" resque_connect! rescue ::Exception => exc raise exc.class, "#{self.class} #{uri}: #{exc.}", exc.backtrace end |
#_receive_message(payload, additional_data) ⇒ Object
is actual payload
127 128 129 130 |
# File 'lib/asir/transport/resque.rb', line 127 def payload, additional_data # is actual payload # $stderr.puts " #{$$} #{self} _receive_message payload=#{payload.inspect}" [ payload, nil ] end |
#_receive_result(message, opaque_result) ⇒ Object
40 41 42 43 |
# File 'lib/asir/transport/resque.rb', line 40 def _receive_result , opaque_result return nil if one_way || .one_way super end |
#_send_message(message, message_payload) ⇒ Object
50 51 52 53 54 55 |
# File 'lib/asir/transport/resque.rb', line 50 def , stream.with_stream! do | io | # Force connect $stderr.puts " #{$$} #{self} _send_message #{.inspect} to queue=#{queue.inspect} as #{self.class} :process_job" if @verbose >= 2 ::Resque.enqueue_to(queue, self.class, ) end end |
#_send_result(message, result, result_payload, stream, message_state) ⇒ Object
45 46 47 48 |
# File 'lib/asir/transport/resque.rb', line 45 def _send_result , result, result_payload, stream, return nil if one_way || .one_way super end |
#_server! ⇒ Object
!SLIDE Resque server (worker).
32 33 34 35 36 37 38 |
# File 'lib/asir/transport/resque.rb', line 32 def _server! # $stderr.puts " #{$$} #{self} _server!" resque_connect! resque_worker rescue ::Exception => exc raise exc.class, "#{self.class} #{uri}: #{exc.}", exc.backtrace end |
#_server_accept_connection!(server) ⇒ Object
89 90 91 |
# File 'lib/asir/transport/resque.rb', line 89 def _server_accept_connection! server [ server, server ] end |
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for Resque.
99 100 101 |
# File 'lib/asir/transport/resque.rb', line 99 def _server_close_connection! in_stream, out_stream # NOTHING end |
#_start_conduit! ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/asir/transport/resque.rb', line 197 def _start_conduit! @redis_dir ||= "/tmp" @redis_conf ||= "#{@redis_dir}/asir-redis-#{port}.conf" @redis_log ||= "#{@redis_dir}/asir-redis-#{port}.log" ::File.open(@redis_conf, "w+") do | out | out.puts "daemonize no" out.puts "port #{port}" out.puts "loglevel warning" out.puts "logfile #{@redis_log}" end exec "redis-server", @redis_conf end |
#namespace_ ⇒ Object
Defaults to ‘asir’.
83 84 85 |
# File 'lib/asir/transport/resque.rb', line 83 def namespace_ @namespace_ ||= namespace || DEFAULT_QUEUE end |
#queues_ ⇒ Object
Defaults to [ ‘asir’ ].
72 73 74 75 |
# File 'lib/asir/transport/resque.rb', line 72 def queues_ @queues_ ||= queues.empty? ? [ DEFAULT_QUEUE ] : queues.freeze end |
#resque_connect! ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/asir/transport/resque.rb', line 144 def resque_connect! @redis_config = { :host => host, :port => port, :thread_safe => true, } @redis = ::Redis.new(@redis_config) if namespace_ ::Resque.redis = @redis = ::Redis::Namespace.new(namespace_, :redis => @redis) ::Resque.redis.namespace = namespace_ else ::Resque.redis = @redis end # $stderr.puts " *** #{$$} #{self} resque_connect! #{@redis.inspect}" @redis end |
#resque_disconnect! ⇒ Object
164 165 166 |
# File 'lib/asir/transport/resque.rb', line 164 def resque_disconnect! ::Resque.redis = nil end |
#resque_uri ⇒ Object
134 135 136 137 138 139 140 141 142 |
# File 'lib/asir/transport/resque.rb', line 134 def resque_uri @resque_uri ||= ( unless scheme == 'redis' raise ArgumentError, "Invalid resque URI: #{uri.inspect}" end _uri ) end |
#resque_worker ⇒ Object
168 169 170 |
# File 'lib/asir/transport/resque.rb', line 168 def resque_worker @resque_worker ||= ::Resque::Worker.new(queues_) end |
#serve_stream_message!(in_stream, out_stream) ⇒ Object
ignored
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/asir/transport/resque.rb', line 103 def in_stream, out_stream # ignored save = Thread.current[:asir_transport_resque_instance] Thread.current[:asir_transport_resque_instance] = self poll_throttle throttle do $stderr.puts " #{$$} #{self} serve_stream_message!: resque_worker = #{resque_worker} on queues #{resque_worker.queues}" if @verbose >= 3 if job = resque_worker.reserve $stderr.puts " #{$$} #{self} serve_stream_message! job=#{job.class}:#{job.inspect}" if @verbose >= 2 resque_worker.process(job) end job end self ensure Thread.current[:asir_transport_resque_instance] = save end |
#server_on_start! ⇒ Object
172 173 174 175 176 177 178 179 180 181 |
# File 'lib/asir/transport/resque.rb', line 172 def server_on_start! # prune_dead_workers expects processes to have "resque " in the name. @save_progname ||= $0.dup $0 = "resque #{$0}" if worker = resque_worker worker.prune_dead_workers worker.register_worker end self end |
#server_on_stop! ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/asir/transport/resque.rb', line 183 def server_on_stop! $0 = @save_progname if @save_progname if worker = @resque_worker worker.unregister_worker end self rescue Redis::CannotConnectError # This error is not actionable since server # is stopping. nil end |
#stream_eof?(stream) ⇒ Boolean
Resque is message-oriented, process only one message per “connection”.
94 95 96 |
# File 'lib/asir/transport/resque.rb', line 94 def stream_eof? stream false end |