Class: ASIR::Transport

Inherits:
Object
  • Object
show all
Includes:
AdditionalData, Initialization, Log, Message::Delay, ThreadVariable, Conduit
Defined in:
lib/asir/transport.rb,
lib/asir/transport/file.rb,
lib/asir/transport/http.rb,
lib/asir/transport/null.rb,
lib/asir/transport/rack.rb,
lib/asir/transport/demux.rb,
lib/asir/transport/local.rb,
lib/asir/transport/retry.rb,
lib/asir/transport/buffer.rb,
lib/asir/transport/stream.rb,
lib/asir/transport/thread.rb,
lib/asir/transport/webrick.rb,
lib/asir/transport/database.rb,
lib/asir/transport/fallback.rb,
lib/asir/transport/broadcast.rb,
lib/asir/transport/composite.rb,
lib/asir/transport/delegation.rb,
lib/asir/transport/payload_io.rb,
lib/asir/transport/subprocess.rb,
lib/asir/transport/tcp_socket.rb,
lib/asir/transport/connection_oriented.rb

Overview

!SLIDE Transport

Client: Send the Message to the Service. Service: Receive the Message from the Client. Service: Invoke the Message. Service: Send the Result to the Client. Client: Receive the Result from the Service.

Direct Known Subclasses

Broadcast, Buffer, Database, Demux, Fallback, HTTP, Local, Null, Retry, Stream

Defined Under Namespace

Modules: Composite, Delegation, PayloadIO Classes: Broadcast, Buffer, ConnectionOriented, Database, Demux, Fallback, File, HTTP, Local, Null, Rack, Retry, Stream, Subprocess, TcpSocket, Thread, Webrick

Constant Summary

Constants included from ThreadVariable

ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER

Instance Attribute Summary collapse

Attributes included from Log

#_logger

Instance Method Summary collapse

Methods included from Log

#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included

Methods included from AdditionalData

#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included

Methods included from Message::Delay

#relative_message_delay!, #wait_for_delay!

Methods included from ThreadVariable

included, setter

Constructor Details

#initialize(*args) ⇒ Transport

!SLIDE END



98
99
100
101
# File 'lib/asir/transport.rb', line 98

def initialize *args
  @verbose = 0
  super
end

Instance Attribute Details

#after_invoke_messageObject

Proc to call after #invoke_message! trans.after_invoke_message.call(trans, state)



120
121
122
# File 'lib/asir/transport.rb', line 120

def after_invoke_message
  @after_invoke_message
end

#after_receive_messageObject

A Proc to call within #receive_message, after #_receive_message. trans.after_receive_message(trans, state)



108
109
110
# File 'lib/asir/transport.rb', line 108

def after_receive_message
  @after_receive_message
end

#around_serve_messageObject

A Proc to call around #serve_message!. trans.around_serve_message(trans, state) Should yield and return yield result. May be used to setup/teardown global resources during Message processing.



205
206
207
# File 'lib/asir/transport.rb', line 205

def around_serve_message
  @around_serve_message
end

#before_send_messageObject

A Proc to call within #send_message, before #_send_message. trans.before_send_message(trans, state)



112
113
114
# File 'lib/asir/transport.rb', line 112

def before_send_message
  @before_send_message
end

#coder_needs_result_messageObject

Returns the value of attribute coder_needs_result_message.



67
68
69
# File 'lib/asir/transport.rb', line 67

def coder_needs_result_message
  @coder_needs_result_message
end

#decoderObject

Returns the value of attribute decoder.



19
20
21
# File 'lib/asir/transport.rb', line 19

def decoder
  @decoder
end

#encoderObject

!SLIDE Transport Support …



251
252
253
# File 'lib/asir/transport.rb', line 251

def encoder
  @encoder
end

#invokerObject

The Invoker responsible for invoking the Message.



294
295
296
# File 'lib/asir/transport.rb', line 294

def invoker
  @invoker
end

#message_countObject

Incremented for each message sent or received.



104
105
106
# File 'lib/asir/transport.rb', line 104

def message_count
  @message_count
end

#needs_message_identifierObject

Returns the value of attribute needs_message_identifier.



129
130
131
# File 'lib/asir/transport.rb', line 129

def needs_message_identifier
  @needs_message_identifier
end

#needs_message_timestampObject

Returns the value of attribute needs_message_timestamp.



129
130
131
# File 'lib/asir/transport.rb', line 129

def needs_message_timestamp
  @needs_message_timestamp
end

#on_exceptionObject

Proc to call with exception, if exception occurs within #serve_message!, but outside Message#invoke!.

trans.on_exception.call(trans, exception, :message, message_state) trans.on_exception.call(trans, exception, :result, message_state)



127
128
129
# File 'lib/asir/transport.rb', line 127

def on_exception
  @on_exception
end

#on_result_exceptionObject

Proc to call with #invoke_message! if state.result.exception. trans.on_result_exception.call(trans, state)



116
117
118
# File 'lib/asir/transport.rb', line 116

def on_result_exception
  @on_result_exception
end

#one_wayObject

Returns the value of attribute one_way.



19
20
21
# File 'lib/asir/transport.rb', line 19

def one_way
  @one_way
end

#runningObject

!SLIDE Transport Server Support



209
210
211
# File 'lib/asir/transport.rb', line 209

def running
  @running
end

#verboseObject

Returns the value of attribute verbose.



133
134
135
# File 'lib/asir/transport.rb', line 133

def verbose
  @verbose
end

Instance Method Details

#_after_invoke_message(state) ⇒ Object



282
283
# File 'lib/asir/transport.rb', line 282

def _after_invoke_message state
end

#_subclass_responsibility(*args) ⇒ Object Also known as: _send_message, _receive_message, _send_result, _receive_result



135
136
137
# File 'lib/asir/transport.rb', line 135

def _subclass_responsibility *args
  raise Error::SubclassResponsibility "subclass responsibility"
end

#around_serve_message!(state, &blk) ⇒ Object

!SLIDE pause



193
194
195
196
197
198
199
# File 'lib/asir/transport.rb', line 193

def around_serve_message! state, &blk
  if @around_serve_message
    @around_serve_message.call(self, state, &blk)
  else
    yield
  end
end

#invoke_message!(state) ⇒ Object

Invokes the Message object, returns a Result object.



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/asir/transport.rb', line 262

def invoke_message! state
  Transport.with_attr! :current, self do
    with_attr! :message_state, state do
    with_attr! :message, state.message do
      wait_for_delay! state.message
      state.result = invoker.invoke!(state.message, self)
      # Hook for Exceptions.
      if @on_result_exception && state.result.exception
        @on_result_exception.call(self, state)
      end
    end
  end
  end
  _after_invoke_message state
  if @after_invoke_message
    @after_invoke_message.call(self, state)
  end
  self
end

#needs_message_identifier?(m) ⇒ Boolean

Returns:

  • (Boolean)


130
# File 'lib/asir/transport.rb', line 130

def needs_message_identifier? m; @needs_message_identifier; end

#needs_message_timestamp?(m) ⇒ Boolean

Returns:

  • (Boolean)


131
# File 'lib/asir/transport.rb', line 131

def needs_message_timestamp?  m; @needs_message_timestamp; end

#receive_message(state) ⇒ Object

!SLIDE Transport#receive_message Receive Message payload from stream.



40
41
42
43
44
45
46
47
48
49
# File 'lib/asir/transport.rb', line 40

def receive_message state
  @message_count ||= 0; @message_count += 1 # NOT THREAD-SAFE
  if received = _receive_message(state)
    if message = state.message = encoder.prepare.decode(state.message_payload)
      message.additional_data!.update(state.additional_data) if state.additional_data
      @after_receive_message.call(self, state) if @after_receive_message
      self
    end
  end
end

#receive_result(state) ⇒ Object

!SLIDE Transport#receive_result Receieve Result from stream:

  • Receive Result payload

  • Decode Result.

  • Extract Result result or exception.

  • Invoke Exception or return Result value.



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/asir/transport.rb', line 78

def receive_result state
  value = nil
  return value unless _receive_result(state)
  result = state.result ||= decoder.prepare.decode(state.result_payload)
  message = state.message
  if result && ! message.one_way
    result.message = message
    if exc = result.exception
      invoker.invoke!(exc, self)
    else
      if ! @one_way && message.block
        message.block.call(result)
      end
      value = result.result
    end
  end
  value
end

#send_message(message) ⇒ Object

!SLIDE Transport#send_message

  • Encode Message.

  • Send encoded Message.

  • Receive decoded Result.



26
27
28
29
30
31
32
33
34
35
# File 'lib/asir/transport.rb', line 26

def send_message message
  @message_count ||= 0; @message_count += 1 # NOT THREAD-SAFE
  message.create_timestamp! if needs_message_timestamp? message
  message.create_identifier! if needs_message_identifier? message
  relative_message_delay! message
  state = Message::State.new(:message => message, :message_payload => encoder.prepare.encode(message))
  @before_send_message.call(self, state) if @before_send_message
  _send_message(state)
  receive_result(state)
end

#send_result(state) ⇒ Object

!SLIDE Transport#send_result Send Result to stream.



55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/asir/transport.rb', line 55

def send_result state
  result = state.result
  message = state.message
  if @one_way && message.block
    message.block.call(result)
  else
    # Avoid sending back entire Message in Result.
    result.message = nil unless @coder_needs_result_message
    state.result_payload = decoder.prepare.encode(result)
    _send_result(state)
  end
end

#serve_message!(in_stream, out_stream) ⇒ Object

!SLIDE Serve a Message.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/asir/transport.rb', line 145

def serve_message! in_stream, out_stream
  state = message_ok = result = result_ok = nil
  exception = original_exception = unforwardable_exception = nil
  state = Message::State.new(:in_stream => in_stream, :out_stream => out_stream)
  around_serve_message!(state) do
    begin
  if receive_message(state)
    message_ok = true
    invoke_message!(state)
    result_ok = true
    self
  else
    nil
  end
rescue *Error::Unrecoverable.modules
  raise
rescue ::Exception => exc
  exception = original_exception = exc
  _log [ :message_error, exc, exc.backtrace ]
  @on_exception.call(self, exc, :message, state) if @on_exception
ensure
  begin
    if message_ok
      if exception && ! result_ok
        case exception
        when *Error::Unforwardable.modules
          unforwardable_exception = exception = Error::Unforwardable.new(exception)
        end
        state.result = Result.new(state.message, nil, exception)
      end
      if out_stream
        send_result(state)
      end
    end
  rescue *Error::Unrecoverable.modules
    raise
  rescue ::Exception => exc
    _log [ :result_error, exc, exc.backtrace ]
    @on_exception.call(self, exc, :result, state) if @on_exception
  end
  raise original_exception if unforwardable_exception

    end # begin
  end # around_serve_message!
end

#stop!(force = false) ⇒ Object

Raises:



211
212
213
214
215
216
# File 'lib/asir/transport.rb', line 211

def stop! force = false
  @running = false
  stop_server! if respond_to?(:stop_server!)
  raise Error::Terminate if force || @force_stop
  self
end

#with_force_stop!Object



218
219
220
221
222
223
224
# File 'lib/asir/transport.rb', line 218

def with_force_stop!
  force_stop_save = @force_stop
  @force_stop = true
  yield
ensure
  @force_stop = force_stop_save
end

#with_server_signals!Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/asir/transport.rb', line 226

def with_server_signals!
  old_trap = { }
  [ "TERM", "HUP" ].each do | sig |
    trap = proc do | *args |
      stop!
      @signal_exception = ::ASIR::Error::Terminate.new("#{self} by SIG#{sig} #{args.inspect} in #{__FILE__}:#{__LINE__}")
    end
    old_trap[sig] = Signal.trap(sig, trap)
  end
  yield
  if exc = @signal_exception
    @signal_exception = nil
    raise exc
  end
ensure
  # $stderr.puts "old_trap = #{old_trap.inspect}"
  old_trap.each do | sig, trap |
    Signal.trap(sig, trap) rescue nil
  end
end