Class: ASIR::Channel

Inherits:
Object
  • Object
show all
Includes:
Initialization, RetryBehavior
Defined in:
lib/asir/channel.rb

Overview

Generic I/O Channel abstraction. Handles stream per Thread and forked child processes.

Constant Summary collapse

ON_ERROR =
lambda do | channel, exc, action, stream |
  channel.close rescue nil
  raise exc
end
ON_CLOSE =
lambda do | channel, stream |
  stream.close rescue nil if stream
end
ON_RETRY =
lambda do | channel, exc, action |
end

Instance Attribute Summary collapse

Attributes included from RetryBehavior

#try_max, #try_sleep, #try_sleep_increment, #try_sleep_max

Instance Method Summary collapse

Methods included from RetryBehavior

#with_retry

Constructor Details

#initialize(opts = nil) ⇒ Channel

Returns a new instance of Channel.



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/asir/channel.rb', line 23

def initialize opts = nil
  @mutex = Mutex.new
  @on_close = ON_CLOSE
  @on_error = ON_ERROR
  # @on_retry = ON_RETRY
  self.try_max = 10
  self.try_sleep = 0.1
  self.try_sleep_increment = 0.1
  self.try_sleep_max = 10
  super
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(sel, *args, &blk) ⇒ Object

Delegate to actual stream.



94
95
96
97
98
# File 'lib/asir/channel.rb', line 94

def method_missing sel, *args, &blk
  with_stream! do | obj |
    obj.__send__(sel, *args, &blk)
  end
end

Instance Attribute Details

#on_closeObject

Returns the value of attribute on_close.



11
12
13
# File 'lib/asir/channel.rb', line 11

def on_close
  @on_close
end

#on_connectObject

Returns the value of attribute on_connect.



11
12
13
# File 'lib/asir/channel.rb', line 11

def on_connect
  @on_connect
end

#on_errorObject

Returns the value of attribute on_error.



11
12
13
# File 'lib/asir/channel.rb', line 11

def on_error
  @on_error
end

#on_retryObject

Returns the value of attribute on_retry.



11
12
13
# File 'lib/asir/channel.rb', line 11

def on_retry
  @on_retry
end

Instance Method Details

#_streamObject

Returns the stream for this Channel, or nil.



125
126
127
# File 'lib/asir/channel.rb', line 125

def _stream
  _streams[self]
end

#_streamsObject

Returns a Thread-specific mapping, unique to this process id. Maps from Channel objects to actual stream.



112
113
114
115
116
117
118
119
120
121
122
# File 'lib/asir/channel.rb', line 112

def _streams
  @mutex.synchronize do
  streams = Thread.current[:'ASIR::Channel._streams'] ||= { }
  if  @owning_process != $$ || # child process?
      @owning_process > $$     # PIDs wrapped around?
    @owning_process = $$
    streams.clear
  end
  streams
  end
end

#closeObject

Invokes @on_close.call(self, stream). On Exception, invokes @on_error.call(self, exc, :close, stream).



69
70
71
72
73
74
75
76
77
78
# File 'lib/asir/channel.rb', line 69

def close
  if stream = _stream
    self.stream = nil
    @on_close.call(self, stream) if @on_close
  end
rescue *Error::Unrecoverable.modules
  raise
rescue ::Exception => exc
  handle_error!(exc, :close, stream)
end

#connect!Object

Invokes @on_connect.call(self). On Exception, invokes @on_error.call(self, exc, :connect, nil).



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/asir/channel.rb', line 44

def connect!
  n_try = nil
  with_retry do | action, data |
    case action
    when :try
      n_try = data
      @on_connect.call(self)
    when :retry #, exc
      exc = data
      case exc
      when *Error::Unrecoverable.modules
        raise exc
      end
      $stderr.puts "RETRY: #{n_try}: ERROR : #{data.inspect}"
      @on_retry.call(self, exc, :connect) if @on_retry
    when :failed
      exc = data
      $stderr.puts "FAILED: #{n_try}: ERROR : #{data.inspect}"
      handle_error!(exc, :connect, nil)
    end
  end
end

#handle_error!(exc, action, stream) ⇒ Object

Dispatches exception and arguments if @on_error is defined. Otherwise, reraise exception.



102
103
104
105
106
107
108
# File 'lib/asir/channel.rb', line 102

def handle_error! exc, action, stream
  if @on_error
    @on_error.call(self, exc, action, stream)
  else
    raise exc
  end
end

#streamObject

Returns IO stream for current Thread. Automatically calls #connect! if stream is created.



37
38
39
40
# File 'lib/asir/channel.rb', line 37

def stream
  _streams[self] ||= 
    connect!
end

#stream=(x) ⇒ Object

Sets the stream for this Channel, or nil.



130
131
132
133
134
135
136
# File 'lib/asir/channel.rb', line 130

def stream= x
  if x == nil
    _streams.delete(self)
  else
    _streams[self] = x
  end
end

#with_stream!Object

Yield #stream to block. On Exception, invokes @on_error.call(self, exc, :with_stream, stream).



82
83
84
85
86
87
88
89
90
91
# File 'lib/asir/channel.rb', line 82

def with_stream!
  x = stream
  begin
    yield x
  rescue *Error::Unrecoverable.modules
    raise
  rescue ::Exception => exc
    handle_error!(exc, :with_stream, x)
  end
end