Class: ASIR::Transport

Inherits:
Object
  • Object
show all
Includes:
AdditionalData, Initialization, Log, Message::Delay, ThreadVariable, Conduit
Defined in:
lib/asir/transport.rb,
lib/asir/transport/zmq.rb,
lib/asir/transport/file.rb,
lib/asir/transport/http.rb,
lib/asir/transport/null.rb,
lib/asir/transport/rack.rb,
lib/asir/transport/demux.rb,
lib/asir/transport/local.rb,
lib/asir/transport/retry.rb,
lib/asir/transport/buffer.rb,
lib/asir/transport/resque.rb,
lib/asir/transport/stream.rb,
lib/asir/transport/webrick.rb,
lib/asir/transport/fallback.rb,
lib/asir/transport/beanstalk.rb,
lib/asir/transport/broadcast.rb,
lib/asir/transport/composite.rb,
lib/asir/transport/delegation.rb,
lib/asir/transport/payload_io.rb,
lib/asir/transport/subprocess.rb,
lib/asir/transport/tcp_socket.rb,
lib/asir/transport/connection_oriented.rb

Overview

!SLIDE Transport

Client: Send the Message to the Service. Service: Receive the Message from the Client. Service: Invoke the Message. Service: Send the Result to the Client. Client: Receive the Result from the Service.

Direct Known Subclasses

Broadcast, Buffer, Demux, Fallback, HTTP, Local, Null, Retry, Stream

Defined Under Namespace

Modules: Composite, Delegation, PayloadIO Classes: Beanstalk, Broadcast, Buffer, ConnectionOriented, Demux, Fallback, File, HTTP, Local, Null, Rack, Resque, Retry, Stream, Subprocess, TcpSocket, Webrick, Zmq

Constant Summary

Constants included from ThreadVariable

ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER

Instance Attribute Summary collapse

Attributes included from Log

#_logger

Instance Method Summary collapse

Methods included from Log

#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included

Methods included from AdditionalData

#[], #[]=, #additional_data, #additional_data!, #additional_data=, included

Methods included from Message::Delay

#relative_message_delay!, #wait_for_delay!

Methods included from ThreadVariable

included, setter

Constructor Details

#initialize(*args) ⇒ Transport

!SLIDE END



91
92
93
94
# File 'lib/asir/transport.rb', line 91

def initialize *args
  @verbose = 0
  super
end

Instance Attribute Details

#after_receive_messageObject

A Proc to call within #receive_message, after #_receive_message. trans.after_receive_message(trans, message)



101
102
103
# File 'lib/asir/transport.rb', line 101

def after_receive_message
  @after_receive_message
end

#before_send_messageObject

A Proc to call within #send_message, before #_send_message. trans.before_send_message(trans, message)



105
106
107
# File 'lib/asir/transport.rb', line 105

def before_send_message
  @before_send_message
end

#decoderObject

Returns the value of attribute decoder.



18
19
20
# File 'lib/asir/transport.rb', line 18

def decoder
  @decoder
end

#encoderObject

!SLIDE Transport Support …



209
210
211
# File 'lib/asir/transport.rb', line 209

def encoder
  @encoder
end

#invokerObject

The Invoker responsible for invoking the Message.



241
242
243
# File 'lib/asir/transport.rb', line 241

def invoker
  @invoker
end

#message_countObject

Incremented for each message sent or received.



97
98
99
# File 'lib/asir/transport.rb', line 97

def message_count
  @message_count
end

#needs_message_identifierObject

Returns the value of attribute needs_message_identifier.



118
119
120
# File 'lib/asir/transport.rb', line 118

def needs_message_identifier
  @needs_message_identifier
end

#needs_message_timestampObject

Returns the value of attribute needs_message_timestamp.



118
119
120
# File 'lib/asir/transport.rb', line 118

def needs_message_timestamp
  @needs_message_timestamp
end

#on_exceptionObject

Proc to call with exception, if exception occurs within #serve_message!, but outside Message#invoke!.

trans.on_exception.call(trans, exception, :message, Message_instance, nil) trans.on_exception.call(trans, exception, :result, Message_instance, Result_instance)



116
117
118
# File 'lib/asir/transport.rb', line 116

def on_exception
  @on_exception
end

#on_result_exceptionObject

Proc to call with #invoke_message! if result.exception. trans.on_result_exception.call(trans, result)



109
110
111
# File 'lib/asir/transport.rb', line 109

def on_result_exception
  @on_result_exception
end

#one_wayObject

Returns the value of attribute one_way.



18
19
20
# File 'lib/asir/transport.rb', line 18

def one_way
  @one_way
end

#runningObject

!SLIDE Transport Server Support



175
176
177
# File 'lib/asir/transport.rb', line 175

def running
  @running
end

#verboseObject

Returns the value of attribute verbose.



122
123
124
# File 'lib/asir/transport.rb', line 122

def verbose
  @verbose
end

Instance Method Details

#_subclass_responsibility(*args) ⇒ Object Also known as: _send_message, _receive_message, _send_result, _receive_result



124
125
126
# File 'lib/asir/transport.rb', line 124

def _subclass_responsibility *args
  raise Error::SubclassResponsibility "subclass responsibility"
end

#invoke_message!(message) ⇒ Object

Invokes the Message object, returns a Result object.



220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/asir/transport.rb', line 220

def invoke_message! message
  result = nil
  Transport.with_attr! :current, self do
    with_attr! :message, message do
      wait_for_delay! message
      result = invoker.invoke!(message, self)
      # Hook for Exceptions.
      if @on_result_exception && result.exception
        @on_result_exception.call(self, result)
      end
    end
  end
  result
end

#needs_message_identifier?(m) ⇒ Boolean

Returns:

  • (Boolean)


119
# File 'lib/asir/transport.rb', line 119

def needs_message_identifier? m; @needs_message_identifier; end

#needs_message_timestamp?(m) ⇒ Boolean

Returns:

  • (Boolean)


120
# File 'lib/asir/transport.rb', line 120

def needs_message_timestamp?  m; @needs_message_timestamp; end

#receive_message(stream) ⇒ Object

!SLIDE Transport#receive_message Receive Message payload from stream.



39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/asir/transport.rb', line 39

def receive_message stream
  # $stderr.puts "  #{$$} #{self} receive_message #{stream}"
  @message_count ||= 0; @message_count += 1
  additional_data = { }
  if req_and_state = _receive_message(stream, additional_data)
    message = req_and_state[0] = encoder.dup.decode(req_and_state.first)
    message.additional_data!.update(additional_data) if message
    if @after_receive_message
      @after_receive_message.call(self, message)
    end
  end
  req_and_state
end

#receive_result(message, opaque_result) ⇒ Object

!SLIDE Transport#receive_result Receieve Result from stream:

  • Receive Result payload

  • Decode Result.

  • Extract Result result or exception.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/asir/transport.rb', line 75

def receive_result message, opaque_result
  result_payload = _receive_result(message, opaque_result)
  result = decoder.dup.decode(result_payload)
  if result && ! message.one_way
    if exc = result.exception
      invoker.invoke!(exc, self)
    else
      if ! @one_way && message.block
        message.block.call(result)
      end
      result.result
    end
  end
end

#send_message(message) ⇒ Object

!SLIDE Transport#send_message

  • Encode Message.

  • Send encoded Message.

  • Receive decoded Result.



25
26
27
28
29
30
31
32
33
34
# File 'lib/asir/transport.rb', line 25

def send_message message
  @message_count ||= 0; @message_count += 1
  message.create_timestamp! if needs_message_timestamp? message
  message.create_identifier! if needs_message_identifier? message
  @before_send_message.call(self, message) if @before_send_message
  relative_message_delay! message
  message_payload = encoder.dup.encode(message)
  opaque_result = _send_message(message, message_payload)
  receive_result(message, opaque_result)
end

#send_result(result, stream, message_state) ⇒ Object

!SLIDE Transport#send_result Send Result to stream.



57
58
59
60
61
62
63
64
65
66
# File 'lib/asir/transport.rb', line 57

def send_result result, stream, message_state
  message = result.message
  if @one_way && message.block
    message.block.call(result)
  else
    result.message = nil # avoid sending back entire Message.
    result_payload = decoder.dup.encode(result)
    _send_result(message, result, result_payload, stream, message_state)
  end
end

#serve_message!(in_stream, out_stream) ⇒ Object

!SLIDE Serve a Message.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/asir/transport.rb', line 134

def serve_message! in_stream, out_stream
  message = message_state = message_ok = result = result_ok = nil
  exception = original_exception = unforwardable_exception = nil
  message, message_state = receive_message(in_stream)
  if message
    message_ok = true
    result = invoke_message!(message)
    result_ok = true
    self
  else
    nil
  end
rescue ::Exception => exc
  exception = original_exception = exc
  _log [ :message_error, exc ]
  @on_exception.call(self, exc, :message, message, nil) if @on_exception
ensure
  begin
    if message_ok
      if exception && ! result_ok
        case exception
        when *Error::Unforwardable.unforwardable
          unforwardable_exception = exception = Error::Unforwardable.new(exception)
        end
        result = Result.new(message, nil, exception)
      end
      if out_stream
        send_result(result, out_stream, message_state)
      end
    end
  rescue ::Exception => exc
    _log [ :result_error, exc ]
    @on_exception.call(self, exc, :result, message, result) if @on_exception
  end
  raise original_exception if unforwardable_exception
end

#stop!(force = false) ⇒ Object

Raises:



177
178
179
180
181
182
# File 'lib/asir/transport.rb', line 177

def stop! force = false
  @running = false
  stop_server! if respond_to?(:stop_server!)
  raise Error::Terminate if force
  self
end

#with_server_signals!Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/asir/transport.rb', line 184

def with_server_signals!
  old_trap = { }
  [ "TERM", "HUP" ].each do | sig |
    trap = proc do | *args |
      stop!
      @signal_exception = ::ASIR::Error::Terminate.new("#{self} by SIG#{sig} #{args.inspect} in #{__FILE__}:#{__LINE__}")
    end
    old_trap[sig] = Signal.trap(sig, trap)
  end
  yield
  if exc = @signal_exception
    @signal_exception = nil
    raise exc
  end
ensure
  # $stderr.puts "old_trap = #{old_trap.inspect}"
  old_trap.each do | sig, trap |
    Signal.trap(sig, trap) rescue nil
  end
end