Class: ASIR::Transport

Inherits:
Object
  • Object
show all
Includes:
AdditionalData, Initialization, Log, Message::Delay, ThreadVariable, Conduit
Defined in:
lib/asir/transport.rb,
lib/asir/transport/file.rb,
lib/asir/transport/http.rb,
lib/asir/transport/null.rb,
lib/asir/transport/rack.rb,
lib/asir/transport/demux.rb,
lib/asir/transport/local.rb,
lib/asir/transport/retry.rb,
lib/asir/transport/buffer.rb,
lib/asir/transport/stream.rb,
lib/asir/transport/thread.rb,
lib/asir/transport/webrick.rb,
lib/asir/transport/database.rb,
lib/asir/transport/fallback.rb,
lib/asir/transport/broadcast.rb,
lib/asir/transport/composite.rb,
lib/asir/transport/delegation.rb,
lib/asir/transport/payload_io.rb,
lib/asir/transport/subprocess.rb,
lib/asir/transport/tcp_socket.rb,
lib/asir/transport/connection_oriented.rb

Overview

!SLIDE Transport

Client: Send the Message to the Service. Service: Receive the Message from the Client. Service: Invoke the Message. Service: Send the Result to the Client. Client: Receive the Result from the Service.

Direct Known Subclasses

Broadcast, Buffer, Database, Demux, Fallback, HTTP, Local, Null, Retry, Stream

Defined Under Namespace

Modules: Composite, Delegation, PayloadIO Classes: Broadcast, Buffer, ConnectionOriented, Database, Demux, Fallback, File, HTTP, Local, Null, Rack, Retry, Stream, Subprocess, TcpSocket, Thread, Webrick

Constant Summary

Constants included from ThreadVariable

ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER

Instance Attribute Summary collapse

Attributes included from Log

#_logger

Instance Method Summary collapse

Methods included from Log

#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included

Methods included from AdditionalData

#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included

Methods included from Message::Delay

#relative_message_delay!, #wait_for_delay!

Methods included from ThreadVariable

included, setter

Constructor Details

#initialize(*args) ⇒ Transport

!SLIDE END



94
95
96
97
# File 'lib/asir/transport.rb', line 94

def initialize *args
  @verbose = 0
  super
end

Instance Attribute Details

#after_invoke_messageObject

Proc to call after #invoke_message! trans.after_invoke_message.call(trans, message, result)



116
117
118
# File 'lib/asir/transport.rb', line 116

def after_invoke_message
  @after_invoke_message
end

#after_receive_messageObject

A Proc to call within #receive_message, after #_receive_message. trans.after_receive_message(trans, message)



104
105
106
# File 'lib/asir/transport.rb', line 104

def after_receive_message
  @after_receive_message
end

#before_send_messageObject

A Proc to call within #send_message, before #_send_message. trans.before_send_message(trans, message)



108
109
110
# File 'lib/asir/transport.rb', line 108

def before_send_message
  @before_send_message
end

#coder_needs_result_messageObject

Returns the value of attribute coder_needs_result_message.



68
69
70
# File 'lib/asir/transport.rb', line 68

def coder_needs_result_message
  @coder_needs_result_message
end

#decoderObject

Returns the value of attribute decoder.



18
19
20
# File 'lib/asir/transport.rb', line 18

def decoder
  @decoder
end

#encoderObject

!SLIDE Transport Support …



219
220
221
# File 'lib/asir/transport.rb', line 219

def encoder
  @encoder
end

#invokerObject

The Invoker responsible for invoking the Message.



251
252
253
# File 'lib/asir/transport.rb', line 251

def invoker
  @invoker
end

#message_countObject

Incremented for each message sent or received.



100
101
102
# File 'lib/asir/transport.rb', line 100

def message_count
  @message_count
end

#needs_message_identifierObject

Returns the value of attribute needs_message_identifier.



125
126
127
# File 'lib/asir/transport.rb', line 125

def needs_message_identifier
  @needs_message_identifier
end

#needs_message_timestampObject

Returns the value of attribute needs_message_timestamp.



125
126
127
# File 'lib/asir/transport.rb', line 125

def needs_message_timestamp
  @needs_message_timestamp
end

#on_exceptionObject

Proc to call with exception, if exception occurs within #serve_message!, but outside Message#invoke!.

trans.on_exception.call(trans, exception, :message, Message_instance, nil) trans.on_exception.call(trans, exception, :result, Message_instance, Result_instance)



123
124
125
# File 'lib/asir/transport.rb', line 123

def on_exception
  @on_exception
end

#on_result_exceptionObject

Proc to call with #invoke_message! if result.exception. trans.on_result_exception.call(trans, result)



112
113
114
# File 'lib/asir/transport.rb', line 112

def on_result_exception
  @on_result_exception
end

#one_wayObject

Returns the value of attribute one_way.



18
19
20
# File 'lib/asir/transport.rb', line 18

def one_way
  @one_way
end

#runningObject

!SLIDE Transport Server Support



185
186
187
# File 'lib/asir/transport.rb', line 185

def running
  @running
end

#verboseObject

Returns the value of attribute verbose.



129
130
131
# File 'lib/asir/transport.rb', line 129

def verbose
  @verbose
end

Instance Method Details

#_subclass_responsibility(*args) ⇒ Object Also known as: _send_message, _receive_message, _send_result, _receive_result



131
132
133
# File 'lib/asir/transport.rb', line 131

def _subclass_responsibility *args
  raise Error::SubclassResponsibility "subclass responsibility"
end

#invoke_message!(message) ⇒ Object

Invokes the Message object, returns a Result object.



230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/asir/transport.rb', line 230

def invoke_message! message
  result = nil
  Transport.with_attr! :current, self do
    with_attr! :message, message do
      wait_for_delay! message
      result = invoker.invoke!(message, self)
      # Hook for Exceptions.
      if @on_result_exception && result.exception
        @on_result_exception.call(self, result)
      end
    end
  end
  result
end

#needs_message_identifier?(m) ⇒ Boolean

Returns:

  • (Boolean)


126
# File 'lib/asir/transport.rb', line 126

def needs_message_identifier? m; @needs_message_identifier; end

#needs_message_timestamp?(m) ⇒ Boolean

Returns:

  • (Boolean)


127
# File 'lib/asir/transport.rb', line 127

def needs_message_timestamp?  m; @needs_message_timestamp; end

#receive_message(stream) ⇒ Object

!SLIDE Transport#receive_message Receive Message payload from stream.



39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/asir/transport.rb', line 39

def receive_message stream
  # $stderr.puts "  #{$$} #{self} receive_message #{stream}"
  @message_count ||= 0; @message_count += 1
  additional_data = { }
  if req_and_state = _receive_message(stream, additional_data)
    message = req_and_state[0] = encoder.prepare.decode(req_and_state.first)
    message.additional_data!.update(additional_data) if message
    if @after_receive_message
      @after_receive_message.call(self, message)
    end
  end
  req_and_state
end

#receive_result(message, opaque_result) ⇒ Object

!SLIDE Transport#receive_result Receieve Result from stream:

  • Receive Result payload

  • Decode Result.

  • Extract Result result or exception.



78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/asir/transport.rb', line 78

def receive_result message, opaque_result
  result_payload = _receive_result(message, opaque_result)
  result = decoder.prepare.decode(result_payload)
  if result && ! message.one_way
    if exc = result.exception
      invoker.invoke!(exc, self)
    else
      if ! @one_way && message.block
        message.block.call(result)
      end
      result.result
    end
  end
end

#send_message(message) ⇒ Object

!SLIDE Transport#send_message

  • Encode Message.

  • Send encoded Message.

  • Receive decoded Result.



25
26
27
28
29
30
31
32
33
34
# File 'lib/asir/transport.rb', line 25

def send_message message
  @message_count ||= 0; @message_count += 1
  message.create_timestamp! if needs_message_timestamp? message
  message.create_identifier! if needs_message_identifier? message
  @before_send_message.call(self, message) if @before_send_message
  relative_message_delay! message
  message_payload = encoder.prepare.encode(message)
  opaque_result = _send_message(message, message_payload)
  receive_result(message, opaque_result)
end

#send_result(result, stream, message_state) ⇒ Object

!SLIDE Transport#send_result Send Result to stream.



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/asir/transport.rb', line 57

def send_result result, stream, message_state
  message = result.message
  if @one_way && message.block
    message.block.call(result)
  else
    # avoid sending back entire Message.
    result.message = nil unless @coder_needs_result_message
    result_payload = decoder.prepare.encode(result)
    _send_result(message, result, result_payload, stream, message_state)
  end
end

#serve_message!(in_stream, out_stream) ⇒ Object

!SLIDE Serve a Message.



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/asir/transport.rb', line 141

def serve_message! in_stream, out_stream
  message = message_state = message_ok = result = result_ok = nil
  exception = original_exception = unforwardable_exception = nil
  message, message_state = receive_message(in_stream)
  if message
    message_ok = true
    result = invoke_message!(message)
    result_ok = true
    if @after_invoke_message
      @after_invoke_message.call(self, message, result)
    end
    self
  else
    nil
  end
rescue ::Exception => exc
  exception = original_exception = exc
  _log [ :message_error, exc ]
  @on_exception.call(self, exc, :message, message, nil) if @on_exception
ensure
  begin
    if message_ok
      if exception && ! result_ok
        case exception
        when *Error::Unforwardable.unforwardable
          unforwardable_exception = exception = Error::Unforwardable.new(exception)
        end
        result = Result.new(message, nil, exception)
      end
      if out_stream
        send_result(result, out_stream, message_state)
      end
    end
  rescue ::Exception => exc
    _log [ :result_error, exc ]
    @on_exception.call(self, exc, :result, message, result) if @on_exception
  end
  raise original_exception if unforwardable_exception
end

#stop!(force = false) ⇒ Object

Raises:



187
188
189
190
191
192
# File 'lib/asir/transport.rb', line 187

def stop! force = false
  @running = false
  stop_server! if respond_to?(:stop_server!)
  raise Error::Terminate if force
  self
end

#with_server_signals!Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/asir/transport.rb', line 194

def with_server_signals!
  old_trap = { }
  [ "TERM", "HUP" ].each do | sig |
    trap = proc do | *args |
      stop!
      @signal_exception = ::ASIR::Error::Terminate.new("#{self} by SIG#{sig} #{args.inspect} in #{__FILE__}:#{__LINE__}")
    end
    old_trap[sig] = Signal.trap(sig, trap)
  end
  yield
  if exc = @signal_exception
    @signal_exception = nil
    raise exc
  end
ensure
  # $stderr.puts "old_trap = #{old_trap.inspect}"
  old_trap.each do | sig, trap |
    Signal.trap(sig, trap) rescue nil
  end
end