Class: Fleck::Client::Request

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/fleck/client/request.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_error, #logger

Constructor Details

#initialize(client, routing_key, reply_to, action: nil, version: nil, headers: {}, params: {}, timeout: nil, multiple_responses: false, rmq_options: {}, &callback) ⇒ Request

Returns a new instance of Request.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fleck/client/request.rb', line 8

def initialize(client, routing_key, reply_to, action: nil, version: nil, headers: {}, params: {}, timeout: nil, multiple_responses: false, rmq_options: {}, &callback)
  @id              = SecureRandom.uuid
  logger.progname += " #{@id}"

  logger.debug "Preparing new request"

  @client             = client
  @response           = nil
  @lock               = Mutex.new
  @condition          = ConditionVariable.new
  @callback           = callback
  @started_at         = nil
  @ended_at           = nil
  @completed          = false
  @async              = false
  @action             = action  || headers[:action]  || headers['action']
  @version            = version || headers[:version] || headers['version']
  @routing_key        = routing_key
  @timeout            = (timeout * 1000).to_i unless timeout.nil?
  @multiple_responses = multiple_responses
  @ztimer_slot        = nil
  @expired            = false
  @params             = params.filtered!

  headers[:version] = @version
  headers[:ip]      = @client.local_ip

  @options = {
    routing_key:      @routing_key,
    reply_to:         reply_to,
    correlation_id:   @id,
    type:             action,
    headers:          headers,
    mandatory:        rmq_options[:mandatory]  || true,
    persistent:       rmq_options[:persistent] || false,
    content_type:     'application/json',
    content_encoding: 'UTF-8'
  }.filtered!
  @options[:priority]   = rmq_options[:priority] unless rmq_options[:priority].nil?
  @options[:app_id]     = rmq_options[:app_id] || Fleck.config.app_name
  @options[:expiration] = @timeout

  @message = Oj.dump({headers: headers, params: @params}, mode: :compat)

  logger.debug "Request prepared"
end

Instance Attribute Details

#completedObject (readonly)

Returns the value of attribute completed.



6
7
8
# File 'lib/fleck/client/request.rb', line 6

def completed
  @completed
end

#expiredObject (readonly)

Returns the value of attribute expired.



6
7
8
# File 'lib/fleck/client/request.rb', line 6

def expired
  @expired
end

#idObject (readonly)

Returns the value of attribute id.



6
7
8
# File 'lib/fleck/client/request.rb', line 6

def id
  @id
end

#responseObject

Returns the value of attribute response.



6
7
8
# File 'lib/fleck/client/request.rb', line 6

def response
  @response
end

Instance Method Details

#cancel!Object



96
97
98
99
# File 'lib/fleck/client/request.rb', line 96

def cancel!
  logger.warn "Request canceled!"
  self.response = Fleck::Client::Response.new(Oj.dump({status: 503, errors: ['Service Unavailable'], body: nil} , mode: :compat))
end

#complete!Object



85
86
87
88
89
90
91
92
93
94
# File 'lib/fleck/client/request.rb', line 85

def complete!
  @ztimer_slot.cancel! if @ztimer_slot
  @lock.synchronize do
    @completed = true
    @ended_at  = Time.now.to_f
    logger.debug "Done #{@async ? 'async' : 'synchronized'} in #{((@ended_at - @started_at).round(5) * 1000).round(2)} ms"
    @condition.signal unless @async
    @client.remove_request(@id)
  end
end

#expire!Object



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fleck/client/request.rb', line 101

def expire!
  if @multiple_responses
    if @response.nil?
      @expired = true
      cancel!
    else
      complete!
    end
  elsif !@completed
    @expired = true
    cancel!
  end
end

#expired?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/fleck/client/request.rb', line 115

def expired?
  return @expired
end

#send!(async = false) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fleck/client/request.rb', line 65

def send!(async = false)
  @started_at = Time.now.to_f
  @async = async
  logger.debug("Sending request with (options: #{@options}, message: #{@params})")

  if @timeout
    @ztimer_slot = Ztimer.after(@timeout){ expire! }
  end

  @client.publish(@message, @options)

  @lock.synchronize do
    unless @async || @completed
      logger.debug("Waiting for response")
      @condition.wait(@lock)
      logger.debug("Request terminated.")
    end
  end
end