Class: Fleck::Client::Request
- Inherits:
-
Object
- Object
- Fleck::Client::Request
- Includes:
- Loggable
- Defined in:
- lib/fleck/client/request.rb
Instance Attribute Summary collapse
-
#completed ⇒ Object
readonly
Returns the value of attribute completed.
-
#expired ⇒ Object
readonly
Returns the value of attribute expired.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#response ⇒ Object
Returns the value of attribute response.
Instance Method Summary collapse
- #cancel! ⇒ Object
- #complete! ⇒ Object
- #expire! ⇒ Object
- #expired? ⇒ Boolean
-
#initialize(client, routing_key, reply_to, action: nil, version: nil, headers: {}, params: {}, timeout: nil, multiple_responses: false, rmq_options: {}, &callback) ⇒ Request
constructor
A new instance of Request.
- #send!(async = false) ⇒ Object
Methods included from Loggable
Constructor Details
#initialize(client, routing_key, reply_to, action: nil, version: nil, headers: {}, params: {}, timeout: nil, multiple_responses: false, rmq_options: {}, &callback) ⇒ Request
Returns a new instance of Request.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fleck/client/request.rb', line 8 def initialize(client, routing_key, reply_to, action: nil, version: nil, headers: {}, params: {}, timeout: nil, multiple_responses: false, rmq_options: {}, &callback) @id = SecureRandom.uuid logger.progname += " #{@id}" logger.debug "Preparing new request" @client = client @response = nil @lock = Mutex.new @condition = ConditionVariable.new @callback = callback @started_at = nil @ended_at = nil @completed = false @async = false @action = action || headers[:action] || headers['action'] @version = version || headers[:version] || headers['version'] @routing_key = routing_key @timeout = (timeout * 1000).to_i unless timeout.nil? @multiple_responses = multiple_responses @ztimer_slot = nil @expired = false @params = params.filtered! headers[:version] = @version headers[:ip] = @client.local_ip @options = { routing_key: @routing_key, reply_to: reply_to, correlation_id: @id, type: action, headers: headers, mandatory: [:mandatory] || true, persistent: [:persistent] || false, content_type: 'application/json', content_encoding: 'UTF-8' }.filtered! @options[:priority] = [:priority] unless [:priority].nil? @options[:app_id] = [:app_id] || Fleck.config.app_name @options[:expiration] = @timeout @message = Oj.dump({headers: headers, params: @params}, mode: :compat) logger.debug "Request prepared" end |
Instance Attribute Details
#completed ⇒ Object (readonly)
Returns the value of attribute completed.
6 7 8 |
# File 'lib/fleck/client/request.rb', line 6 def completed @completed end |
#expired ⇒ Object (readonly)
Returns the value of attribute expired.
6 7 8 |
# File 'lib/fleck/client/request.rb', line 6 def expired @expired end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
6 7 8 |
# File 'lib/fleck/client/request.rb', line 6 def id @id end |
#response ⇒ Object
Returns the value of attribute response.
6 7 8 |
# File 'lib/fleck/client/request.rb', line 6 def response @response end |
Instance Method Details
#cancel! ⇒ Object
96 97 98 99 |
# File 'lib/fleck/client/request.rb', line 96 def cancel! logger.warn "Request canceled!" self.response = Fleck::Client::Response.new(Oj.dump({status: 503, errors: ['Service Unavailable'], body: nil} , mode: :compat)) end |
#complete! ⇒ Object
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fleck/client/request.rb', line 85 def complete! @ztimer_slot.cancel! if @ztimer_slot @lock.synchronize do @completed = true @ended_at = Time.now.to_f logger.debug "Done #{@async ? 'async' : 'synchronized'} in #{((@ended_at - @started_at).round(5) * 1000).round(2)} ms" @condition.signal unless @async @client.remove_request(@id) end end |
#expire! ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/fleck/client/request.rb', line 101 def expire! if @multiple_responses if @response.nil? @expired = true cancel! else complete! end elsif !@completed @expired = true cancel! end end |
#expired? ⇒ Boolean
115 116 117 |
# File 'lib/fleck/client/request.rb', line 115 def expired? return @expired end |
#send!(async = false) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fleck/client/request.rb', line 65 def send!(async = false) @started_at = Time.now.to_f @async = async logger.debug("Sending request with (options: #{@options}, message: #{@params})") if @timeout @ztimer_slot = Ztimer.after(@timeout){ expire! } end @client.publish(@message, @options) @lock.synchronize do unless @async || @completed logger.debug("Waiting for response") @condition.wait(@lock) logger.debug("Request terminated.") end end end |