Class: ASIR::Channel
- Inherits:
-
Object
- Object
- ASIR::Channel
- Includes:
- Initialization, RetryBehavior
- Defined in:
- lib/asir/channel.rb
Overview
Generic I/O Channel abstraction. Handles stream per Thread and forked child processes.
Constant Summary collapse
- ON_ERROR =
lambda do | channel, exc, action, stream | channel.close rescue nil raise exc end
- ON_CLOSE =
lambda do | channel, stream | stream.close rescue nil if stream end
- ON_RETRY =
lambda do | channel, exc, action | end
Instance Attribute Summary collapse
-
#on_close ⇒ Object
Returns the value of attribute on_close.
-
#on_connect ⇒ Object
Returns the value of attribute on_connect.
-
#on_error ⇒ Object
Returns the value of attribute on_error.
-
#on_retry ⇒ Object
Returns the value of attribute on_retry.
Attributes included from RetryBehavior
#try_max, #try_sleep, #try_sleep_increment, #try_sleep_max
Instance Method Summary collapse
-
#_stream ⇒ Object
Returns the stream for this Channel, or nil.
-
#_streams ⇒ Object
Returns a Thread-specific mapping, unique to this process id.
-
#close ⇒ Object
Invokes @on_close.call(self, stream).
-
#connect! ⇒ Object
Invokes @on_connect.call(self).
-
#handle_error!(exc, action, stream) ⇒ Object
Dispatches exception and arguments if @on_error is defined.
-
#initialize(opts = nil) ⇒ Channel
constructor
A new instance of Channel.
-
#method_missing(sel, *args, &blk) ⇒ Object
Delegate to actual stream.
-
#stream ⇒ Object
Returns IO stream for current Thread.
-
#stream=(x) ⇒ Object
Sets the stream for this Channel, or nil.
-
#with_stream! ⇒ Object
Yield #stream to block.
Methods included from RetryBehavior
Constructor Details
#initialize(opts = nil) ⇒ Channel
Returns a new instance of Channel.
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/asir/channel.rb', line 22 def initialize opts = nil @on_close = ON_CLOSE @on_error = ON_ERROR # @on_retry = ON_RETRY self.try_max = 10 self.try_sleep = 0.1 self.try_sleep_increment = 0.1 self.try_sleep_max = 10 super end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(sel, *args, &blk) ⇒ Object
Delegate to actual stream.
84 85 86 87 88 |
# File 'lib/asir/channel.rb', line 84 def method_missing sel, *args, &blk with_stream! do | obj | obj.__send__(sel, *args, &blk) end end |
Instance Attribute Details
#on_close ⇒ Object
Returns the value of attribute on_close.
10 11 12 |
# File 'lib/asir/channel.rb', line 10 def on_close @on_close end |
#on_connect ⇒ Object
Returns the value of attribute on_connect.
10 11 12 |
# File 'lib/asir/channel.rb', line 10 def on_connect @on_connect end |
#on_error ⇒ Object
Returns the value of attribute on_error.
10 11 12 |
# File 'lib/asir/channel.rb', line 10 def on_error @on_error end |
#on_retry ⇒ Object
Returns the value of attribute on_retry.
10 11 12 |
# File 'lib/asir/channel.rb', line 10 def on_retry @on_retry end |
Instance Method Details
#_stream ⇒ Object
Returns the stream for this Channel, or nil.
113 114 115 |
# File 'lib/asir/channel.rb', line 113 def _stream _streams[self] end |
#_streams ⇒ Object
Returns a Thread-specific mapping, unique to this process id. Maps from Channel objects to actual stream.
102 103 104 105 106 107 108 109 110 |
# File 'lib/asir/channel.rb', line 102 def _streams streams = Thread.current[:'ASIR::Channel._streams'] ||= { } if @owning_process != $$ || # child process? @owning_process > $$ # PIDs wrapped around? @owning_process = $$ streams.clear end streams end |
#close ⇒ Object
Invokes @on_close.call(self, stream). On Exception, invokes @on_error.call(self, exc, :close, stream).
63 64 65 66 67 68 69 70 |
# File 'lib/asir/channel.rb', line 63 def close if stream = _stream self.stream = nil @on_close.call(self, stream) if @on_close end rescue ::Exception => exc handle_error!(exc, :close, stream) end |
#connect! ⇒ Object
Invokes @on_connect.call(self). On Exception, invokes @on_error.call(self, exc, :connect, nil).
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/asir/channel.rb', line 42 def connect! n_try = nil with_retry do | action, data | case action when :try n_try = data @on_connect.call(self) when :retry #, exc exc = data $stderr.puts "RETRY: #{n_try}: ERROR : #{data.inspect}" @on_retry.call(self, exc, :connect) if @on_retry when :failed exc = data $stderr.puts "FAILED: #{n_try}: ERROR : #{data.inspect}" handle_error!(exc, :connect, nil) end end end |
#handle_error!(exc, action, stream) ⇒ Object
Dispatches exception and arguments if @on_error is defined. Otherwise, reraise exception.
92 93 94 95 96 97 98 |
# File 'lib/asir/channel.rb', line 92 def handle_error! exc, action, stream if @on_error @on_error.call(self, exc, action, stream) else raise exc end end |
#stream ⇒ Object
Returns IO stream for current Thread. Automatically calls #connect! if stream is created.
35 36 37 38 |
# File 'lib/asir/channel.rb', line 35 def stream _streams[self] ||= connect! end |
#stream=(x) ⇒ Object
Sets the stream for this Channel, or nil.
118 119 120 121 122 123 124 |
# File 'lib/asir/channel.rb', line 118 def stream= x if x == nil _streams.delete(self) else _streams[self] = x end end |
#with_stream! ⇒ Object
Yield #stream to block. On Exception, invokes @on_error.call(self, exc, :with_stream, stream).
74 75 76 77 78 79 80 81 |
# File 'lib/asir/channel.rb', line 74 def with_stream! x = stream begin yield x rescue ::Exception => exc handle_error!(exc, :with_stream, x) end end |