Class: ASIR::Transport

Inherits:
Object
  • Object
show all
Includes:
AdditionalData, Initialization, Log, Message::Delay, ThreadVariable, Conduit
Defined in:
lib/asir/transport.rb,
lib/asir/transport/file.rb,
lib/asir/transport/http.rb,
lib/asir/transport/null.rb,
lib/asir/transport/rack.rb,
lib/asir/transport/demux.rb,
lib/asir/transport/local.rb,
lib/asir/transport/retry.rb,
lib/asir/transport/buffer.rb,
lib/asir/transport/stream.rb,
lib/asir/transport/thread.rb,
lib/asir/transport/webrick.rb,
lib/asir/transport/database.rb,
lib/asir/transport/fallback.rb,
lib/asir/transport/broadcast.rb,
lib/asir/transport/composite.rb,
lib/asir/transport/delegation.rb,
lib/asir/transport/payload_io.rb,
lib/asir/transport/subprocess.rb,
lib/asir/transport/tcp_socket.rb,
lib/asir/transport/connection_oriented.rb

Overview

!SLIDE Transport

Client: Send the Message to the Service. Service: Receive the Message from the Client. Service: Invoke the Message. Service: Send the Result to the Client. Client: Receive the Result from the Service.

Direct Known Subclasses

Broadcast, Buffer, Database, Demux, Fallback, HTTP, Local, Null, Retry, Stream

Defined Under Namespace

Modules: Composite, Delegation, PayloadIO Classes: Broadcast, Buffer, ConnectionOriented, Database, Demux, Fallback, File, HTTP, Local, Null, Rack, Retry, Stream, Subprocess, TcpSocket, Thread, Webrick

Constant Summary

Constants included from ThreadVariable

ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER

Instance Attribute Summary collapse

Attributes included from Log

#_logger

Instance Method Summary collapse

Methods included from Log

#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included

Methods included from AdditionalData

#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included

Methods included from Message::Delay

#relative_message_delay!, #wait_for_delay!

Methods included from ThreadVariable

included, setter

Constructor Details

#initialize(*args) ⇒ Transport

!SLIDE END



98
99
100
101
# File 'lib/asir/transport.rb', line 98

def initialize *args
  @verbose = 0
  super
end

Instance Attribute Details

#after_invoke_messageObject

Proc to call after #invoke_message! trans.after_invoke_message.call(trans, message, result)



120
121
122
# File 'lib/asir/transport.rb', line 120

def after_invoke_message
  @after_invoke_message
end

#after_receive_messageObject

A Proc to call within #receive_message, after #_receive_message. trans.after_receive_message(trans, message)



108
109
110
# File 'lib/asir/transport.rb', line 108

def after_receive_message
  @after_receive_message
end

#before_send_messageObject

A Proc to call within #send_message, before #_send_message. trans.before_send_message(trans, message)



112
113
114
# File 'lib/asir/transport.rb', line 112

def before_send_message
  @before_send_message
end

#coder_needs_result_messageObject

Returns the value of attribute coder_needs_result_message.



67
68
69
# File 'lib/asir/transport.rb', line 67

def coder_needs_result_message
  @coder_needs_result_message
end

#decoderObject

Returns the value of attribute decoder.



19
20
21
# File 'lib/asir/transport.rb', line 19

def decoder
  @decoder
end

#encoderObject

!SLIDE Transport Support …



223
224
225
# File 'lib/asir/transport.rb', line 223

def encoder
  @encoder
end

#invokerObject

The Invoker responsible for invoking the Message.



259
260
261
# File 'lib/asir/transport.rb', line 259

def invoker
  @invoker
end

#message_countObject

Incremented for each message sent or received.



104
105
106
# File 'lib/asir/transport.rb', line 104

def message_count
  @message_count
end

#needs_message_identifierObject

Returns the value of attribute needs_message_identifier.



129
130
131
# File 'lib/asir/transport.rb', line 129

def needs_message_identifier
  @needs_message_identifier
end

#needs_message_timestampObject

Returns the value of attribute needs_message_timestamp.



129
130
131
# File 'lib/asir/transport.rb', line 129

def needs_message_timestamp
  @needs_message_timestamp
end

#on_exceptionObject

Proc to call with exception, if exception occurs within #serve_message!, but outside Message#invoke!.

trans.on_exception.call(trans, exception, :message, message_state) trans.on_exception.call(trans, exception, :result, message_state)



127
128
129
# File 'lib/asir/transport.rb', line 127

def on_exception
  @on_exception
end

#on_result_exceptionObject

Proc to call with #invoke_message! if result.exception. trans.on_result_exception.call(trans, result)



116
117
118
# File 'lib/asir/transport.rb', line 116

def on_result_exception
  @on_result_exception
end

#one_wayObject

Returns the value of attribute one_way.



19
20
21
# File 'lib/asir/transport.rb', line 19

def one_way
  @one_way
end

#runningObject

!SLIDE Transport Server Support



189
190
191
# File 'lib/asir/transport.rb', line 189

def running
  @running
end

#verboseObject

Returns the value of attribute verbose.



133
134
135
# File 'lib/asir/transport.rb', line 133

def verbose
  @verbose
end

Instance Method Details

#_subclass_responsibility(*args) ⇒ Object Also known as: _send_message, _receive_message, _send_result, _receive_result



135
136
137
# File 'lib/asir/transport.rb', line 135

def _subclass_responsibility *args
  raise Error::SubclassResponsibility "subclass responsibility"
end

#invoke_message!(state) ⇒ Object

Invokes the Message object, returns a Result object.



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/asir/transport.rb', line 234

def invoke_message! state
  Transport.with_attr! :current, self do
    with_attr! :message_state, state do
    with_attr! :message, state.message do
      wait_for_delay! state.message
      state.result = invoker.invoke!(state.message, self)
      # Hook for Exceptions.
      if @on_result_exception && state.result.exception
        @on_result_exception.call(self, state)
      end
    end
  end
  end
  self
end

#needs_message_identifier?(m) ⇒ Boolean

Returns:

  • (Boolean)


130
# File 'lib/asir/transport.rb', line 130

def needs_message_identifier? m; @needs_message_identifier; end

#needs_message_timestamp?(m) ⇒ Boolean

Returns:

  • (Boolean)


131
# File 'lib/asir/transport.rb', line 131

def needs_message_timestamp?  m; @needs_message_timestamp; end

#receive_message(state) ⇒ Object

!SLIDE Transport#receive_message Receive Message payload from stream.



40
41
42
43
44
45
46
47
48
49
# File 'lib/asir/transport.rb', line 40

def receive_message state
  @message_count ||= 0; @message_count += 1 # NOT THREAD-SAFE
  if received = _receive_message(state)
    if message = state.message = encoder.prepare.decode(state.message_payload)
      message.additional_data!.update(state.additional_data) if state.additional_data
      @after_receive_message.call(self, state) if @after_receive_message
      self
    end
  end
end

#receive_result(state) ⇒ Object

!SLIDE Transport#receive_result Receieve Result from stream:

  • Receive Result payload

  • Decode Result.

  • Extract Result result or exception.

  • Invoke Exception or return Result value.



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/asir/transport.rb', line 78

def receive_result state
  value = nil
  return value unless _receive_result(state)
  result = state.result ||= decoder.prepare.decode(state.result_payload)
  message = state.message
  if result && ! message.one_way
    result.message = message
    if exc = result.exception
      invoker.invoke!(exc, self)
    else
      if ! @one_way && message.block
        message.block.call(result)
      end
      value = result.result
    end
  end
  value
end

#send_message(message) ⇒ Object

!SLIDE Transport#send_message

  • Encode Message.

  • Send encoded Message.

  • Receive decoded Result.



26
27
28
29
30
31
32
33
34
35
# File 'lib/asir/transport.rb', line 26

def send_message message
  @message_count ||= 0; @message_count += 1 # NOT THREAD-SAFE
  message.create_timestamp! if needs_message_timestamp? message
  message.create_identifier! if needs_message_identifier? message
  relative_message_delay! message
  state = Message::State.new(:message => message, :message_payload => encoder.prepare.encode(message))
  @before_send_message.call(self, state) if @before_send_message
  _send_message(state)
  receive_result(state)
end

#send_result(state) ⇒ Object

!SLIDE Transport#send_result Send Result to stream.



55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/asir/transport.rb', line 55

def send_result state
  result = state.result
  message = state.message
  if @one_way && message.block
    message.block.call(result)
  else
    # Avoid sending back entire Message in Result.
    result.message = nil unless @coder_needs_result_message
    state.result_payload = decoder.prepare.encode(result)
    _send_result(state)
  end
end

#serve_message!(in_stream, out_stream) ⇒ Object

!SLIDE Serve a Message.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/asir/transport.rb', line 145

def serve_message! in_stream, out_stream
  state = message_ok = result = result_ok = nil
  exception = original_exception = unforwardable_exception = nil
  state = Message::State.new(:in_stream => in_stream, :out_stream => out_stream)
  if receive_message(state)
    message_ok = true
    invoke_message!(state)
    result_ok = true
    if @after_invoke_message
      @after_invoke_message.call(self, state)
    end
    self
  else
    nil
  end
rescue ::Exception => exc
  exception = original_exception = exc
  _log [ :message_error, exc ]
  @on_exception.call(self, exc, :message, state) if @on_exception
ensure
  begin
    if message_ok
      if exception && ! result_ok
        case exception
        when *Error::Unforwardable.unforwardable
          unforwardable_exception = exception = Error::Unforwardable.new(exception)
        end
        state.result = Result.new(state.message, nil, exception)
      end
      if out_stream
        send_result(state)
      end
    end
  rescue ::Exception => exc
    _log [ :result_error, exc, exc.backtrace ]
    @on_exception.call(self, exc, :result, state) if @on_exception
  end
  raise original_exception if unforwardable_exception
end

#stop!(force = false) ⇒ Object

Raises:



191
192
193
194
195
196
# File 'lib/asir/transport.rb', line 191

def stop! force = false
  @running = false
  stop_server! if respond_to?(:stop_server!)
  raise Error::Terminate if force
  self
end

#with_server_signals!Object



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/asir/transport.rb', line 198

def with_server_signals!
  old_trap = { }
  [ "TERM", "HUP" ].each do | sig |
    trap = proc do | *args |
      stop!
      @signal_exception = ::ASIR::Error::Terminate.new("#{self} by SIG#{sig} #{args.inspect} in #{__FILE__}:#{__LINE__}")
    end
    old_trap[sig] = Signal.trap(sig, trap)
  end
  yield
  if exc = @signal_exception
    @signal_exception = nil
    raise exc
  end
ensure
  # $stderr.puts "old_trap = #{old_trap.inspect}"
  old_trap.each do | sig, trap |
    Signal.trap(sig, trap) rescue nil
  end
end