Class: ASIR::Channel

Inherits:
Object
  • Object
show all
Includes:
Initialization, RetryBehavior
Defined in:
lib/asir/channel.rb

Overview

Generic I/O Channel abstraction. Handles stream per Thread and forked child processes.

Constant Summary collapse

ON_ERROR =
lambda do | channel, exc, action, stream |
  channel.close rescue nil
  raise exc
end
ON_CLOSE =
lambda do | channel, stream |
  stream.close rescue nil if stream
end
ON_RETRY =
lambda do | channel, exc, action |
end

Instance Attribute Summary collapse

Attributes included from RetryBehavior

#try_max, #try_sleep, #try_sleep_increment, #try_sleep_max

Instance Method Summary collapse

Methods included from RetryBehavior

#with_retry

Constructor Details

#initialize(opts = nil) ⇒ Channel

Returns a new instance of Channel.



22
23
24
25
26
27
28
29
30
31
# File 'lib/asir/channel.rb', line 22

def initialize opts = nil
  @on_close = ON_CLOSE
  @on_error = ON_ERROR
  # @on_retry = ON_RETRY
  self.try_max = 10
  self.try_sleep = 0.1
  self.try_sleep_increment = 0.1
  self.try_sleep_max = 10
  super
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(sel, *args, &blk) ⇒ Object

Delegate to actual stream.



84
85
86
87
88
# File 'lib/asir/channel.rb', line 84

def method_missing sel, *args, &blk
  with_stream! do | obj |
    obj.__send__(sel, *args, &blk)
  end
end

Instance Attribute Details

#on_closeObject

Returns the value of attribute on_close.



10
11
12
# File 'lib/asir/channel.rb', line 10

def on_close
  @on_close
end

#on_connectObject

Returns the value of attribute on_connect.



10
11
12
# File 'lib/asir/channel.rb', line 10

def on_connect
  @on_connect
end

#on_errorObject

Returns the value of attribute on_error.



10
11
12
# File 'lib/asir/channel.rb', line 10

def on_error
  @on_error
end

#on_retryObject

Returns the value of attribute on_retry.



10
11
12
# File 'lib/asir/channel.rb', line 10

def on_retry
  @on_retry
end

Instance Method Details

#_streamObject

Returns the stream for this Channel, or nil.



113
114
115
# File 'lib/asir/channel.rb', line 113

def _stream
  _streams[self]
end

#_streamsObject

Returns a Thread-specific mapping, unique to this process id. Maps from Channel objects to actual stream.



102
103
104
105
106
107
108
109
110
# File 'lib/asir/channel.rb', line 102

def _streams
  streams = Thread.current[:'ASIR::Channel._streams'] ||= { }
  if  @owning_process != $$ || # child process?
      @owning_process > $$     # PIDs wrapped around?
    @owning_process = $$
    streams.clear
  end
  streams
end

#closeObject

Invokes @on_close.call(self, stream). On Exception, invokes @on_error.call(self, exc, :close, stream).



63
64
65
66
67
68
69
70
# File 'lib/asir/channel.rb', line 63

def close
  if stream = _stream
    self.stream = nil
    @on_close.call(self, stream) if @on_close
  end
rescue ::Exception => exc
  handle_error!(exc, :close, stream)
end

#connect!Object

Invokes @on_connect.call(self). On Exception, invokes @on_error.call(self, exc, :connect, nil).



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/asir/channel.rb', line 42

def connect!
  n_try = nil
  with_retry do | action, data |
    case action
    when :try
      n_try = data
      @on_connect.call(self)
    when :retry #, exc
      exc = data
      $stderr.puts "RETRY: #{n_try}: ERROR : #{data.inspect}"
      @on_retry.call(self, exc, :connect) if @on_retry
    when :failed
      exc = data
      $stderr.puts "FAILED: #{n_try}: ERROR : #{data.inspect}"
      handle_error!(exc, :connect, nil)
    end
  end
end

#handle_error!(exc, action, stream) ⇒ Object

Dispatches exception and arguments if @on_error is defined. Otherwise, reraise exception.



92
93
94
95
96
97
98
# File 'lib/asir/channel.rb', line 92

def handle_error! exc, action, stream
  if @on_error
    @on_error.call(self, exc, action, stream)
  else
    raise exc
  end
end

#streamObject

Returns IO stream for current Thread. Automatically calls #connect! if stream is created.



35
36
37
38
# File 'lib/asir/channel.rb', line 35

def stream
  _streams[self] ||= 
    connect!
end

#stream=(x) ⇒ Object

Sets the stream for this Channel, or nil.



118
119
120
121
122
123
124
# File 'lib/asir/channel.rb', line 118

def stream= x
  if x == nil
    _streams.delete(self)
  else
    _streams[self] = x
  end
end

#with_stream!Object

Yield #stream to block. On Exception, invokes @on_error.call(self, exc, :with_stream, stream).



74
75
76
77
78
79
80
81
# File 'lib/asir/channel.rb', line 74

def with_stream!
  x = stream
  begin
    yield x
  rescue ::Exception => exc
    handle_error!(exc, :with_stream, x)
  end
end