Class: Arf::RPC::Responder

Inherits:
Object
  • Object
show all
Defined in:
lib/arf/rpc/responder.rb

Instance Method Summary collapse

Constructor Details

#initialize(str, input_stream_type, output_stream_type) ⇒ Responder

Returns a new instance of Responder.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/arf/rpc/responder.rb', line 6

def initialize(str, input_stream_type, output_stream_type)
  @metadata = Metadata.new
  @stream = str

  @response = nil
  @response_error = nil
  @response_lock = Mutex.new
  @response_cond = Thread::ConditionVariable.new

  @input_stream_type = output_stream_type
  @has_input_stream = !output_stream_type.nil?
  @input_stream_started = false
  @input_stream_error = nil
  @input_stream_completed = false
  @input_stream_items = Queue.new
  @input_stream_closed_lock = Mutex.new
  @input_stream_closed = false

  @output_stream_type = input_stream_type
  @has_output_stream = !input_stream_type.nil?
  @output_stream_started_lock = Mutex.new
  @output_stream_started = false
  @output_stream_error = nil
  @output_stream_closed_lock = Mutex.new
  @output_stream_closed = false

  str.attach_handler(self)
end

Instance Method Details

#_wait_response(throw_error: true) ⇒ Object

Raises:

  • (@response_error)


136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/arf/rpc/responder.rb', line 136

def _wait_response(throw_error: true)
  raise @response_error if @response_error && throw_error

  return if @response

  catch :break_loop do
    loop do
      @response_lock.synchronize do
        @response_cond.wait(@response_lock)
        next if @response.nil?

        throw :break_loop
      end
    end
  end
end

#close_recvObject



127
128
129
130
131
132
133
134
# File 'lib/arf/rpc/responder.rb', line 127

def close_recv
  return if @input_stream_closed

  @input_stream_closed_lock.synchronize do
    @input_stream_closed = true
    @input_stream_items.close
  end
end

#close_sendObject



118
119
120
121
122
123
124
125
# File 'lib/arf/rpc/responder.rb', line 118

def close_send
  return if @output_stream_closed

  @output_stream_closed_lock.synchronize do
    @output_stream_closed = true
    @stream.write_data(BaseMessage.encode(EndStream.new))
  end
end

#eachObject



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/arf/rpc/responder.rb', line 106

def each
  loop do
    item = recv
    break if item.nil?

    yield item
  rescue StopIteration
    break
  end
  self
end

#handle_data(value) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/arf/rpc/responder.rb', line 35

def handle_data(value)
  msg = BaseMessage.initialize_from(value)
  case msg
  when Response
    @response_lock.synchronize do
      @response = msg
      @metadata.merge!(msg.)
      if @response.status == :ok
        @params = msg.params
      else
        @response_error = Status::BadStatus.new(
          @response.status,
          @response..get("arf-status-description")
        )
      end
      @response_cond.broadcast
    end

  when StartStream
    @input_stream_started = true

  when EndStream
    @input_stream_completed = true
    @input_stream_items.close

  when StreamMetadata
    @metadata.replace!(msg.)

  when StreamItem
    return if @input_stream_completed

    @input_stream_closed_lock.synchronize do
      return if @input_stream_closed
    end
    @input_stream_items << msg.value
  end
end

#metadataObject



168
169
170
171
172
173
174
# File 'lib/arf/rpc/responder.rb', line 168

def 
  # It makes little sense to access metadata before a response is
  # received. Given that:
  _wait_response

  @metadata
end

#normalize_response_paramsObject



158
159
160
161
162
163
164
165
166
# File 'lib/arf/rpc/responder.rb', line 158

def normalize_response_params
  if @response.params.empty?
    nil
  elsif @response.params.length == 1
    @response.params.first
  else
    @response.params
  end
end

#paramsObject



153
154
155
156
# File 'lib/arf/rpc/responder.rb', line 153

def params
  _wait_response
  normalize_response_params
end

#push(value, **kwargs) ⇒ Object Also known as: <<

Raises:

  • (ArgumentError)


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/arf/rpc/responder.rb', line 73

def push(value, **kwargs)
  raise ArgumentError, "#push receives either a value or kwargs, not both." if value && !kwargs.empty?

  raise RPC::NoStreamError, false unless @has_output_stream
  raise @output_stream_error if @output_stream_error

  # TODO: Warn? Error?
  return if @output_stream_closed

  @output_stream_closed_lock.synchronize do
    return if @output_stream_closed
  end

  @output_stream_started_lock.synchronize do
    next if @output_stream_started

    @stream.write_data(BaseMessage.encode(StartStream.new))
    @output_stream_started = true
  end

  @stream.write_data(BaseMessage.encode(StreamItem.new(value:)))
  value
end

#recvObject

Raises:

  • (RPC::NoStreamError)


99
100
101
102
103
104
# File 'lib/arf/rpc/responder.rb', line 99

def recv
  raise RPC::NoStreamError, true unless @has_input_stream
  raise @input_stream_error if @input_stream_error

  @input_stream_items.pop
end

#statusObject



176
177
178
179
# File 'lib/arf/rpc/responder.rb', line 176

def status
  _wait_response(throw_error: false)
  @response.status
end

#success?Boolean

Returns:

  • (Boolean)


181
182
183
# File 'lib/arf/rpc/responder.rb', line 181

def success?
  status == :ok
end