Class: ConcurrentWorker::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_worker/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args, **options, &work_block) ⇒ Worker

Returns a new instance of Worker.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/concurrent_worker/worker.rb', line 26

def initialize(*args, **options, &work_block)
  @args = args
  @options = options
  set_block(:work_block, &work_block) if work_block
  
  @state = :idle
  @result_callbacks = []
  @retired_callbacks = []

  @snd_queue_max = @options[:snd_queue_max] || 2
  @req_counter = RequestCounter.new
  @options[:result_callback_interrupt]  ||= :immediate 
  @options[:retired_callback_interrupt] ||= :immediate 

  case @options[:type]
  when :process
    class << self
      include ConcurrentProcess
    end
  when :thread
    class << self
      include ConcurrentThread
    end
  else
    class << self
      include ConcurrentThread
    end
  end
end

Instance Attribute Details

#channelObject

Returns the value of attribute channel.



3
4
5
# File 'lib/concurrent_worker/worker.rb', line 3

def channel
  @channel
end

#req_counterObject (readonly)

Worker : worker class

+cncr_block         : concurrent processing block : thread(as ConcurrentThread)/process(as ConcurrentProcess)
  +base_block       : user defined preparation to exec 'work block'
    +loop_block     : loop of receiving request and exec 'work block'
      +work_block   : user requested work

These blocks are executed with ‘instance_exec’ method of worker, so that they can share instance variables:@xxxx.



14
15
16
# File 'lib/concurrent_worker/worker.rb', line 14

def req_counter
  @req_counter
end

#snd_queue_maxObject (readonly)

Worker : worker class

+cncr_block         : concurrent processing block : thread(as ConcurrentThread)/process(as ConcurrentProcess)
  +base_block       : user defined preparation to exec 'work block'
    +loop_block     : loop of receiving request and exec 'work block'
      +work_block   : user requested work

These blocks are executed with ‘instance_exec’ method of worker, so that they can share instance variables:@xxxx.



14
15
16
# File 'lib/concurrent_worker/worker.rb', line 14

def snd_queue_max
  @snd_queue_max
end

Instance Method Details

#add_callback(&callback) ⇒ Object



56
57
58
59
# File 'lib/concurrent_worker/worker.rb', line 56

def add_callback(&callback)
  raise "block is nil" unless callback
  @result_callbacks.push(callback)
end

#add_retired_callback(&callback) ⇒ Object



75
76
77
78
# File 'lib/concurrent_worker/worker.rb', line 75

def add_retired_callback(&callback)
  raise "block is nil" unless callback
  @retired_callbacks.push(callback)
end

#call_result_callbacks(args) ⇒ Object



64
65
66
67
68
69
70
71
72
73
# File 'lib/concurrent_worker/worker.rb', line 64

def call_result_callbacks(args)
  Thread.handle_interrupt(Object => :never) do        
    Thread.handle_interrupt(Object => @options[:result_callback_interrupt]) do
      @result_callbacks.each do |callback|
        callback.call(*args)
      end
    end
    @req_counter.pop
  end
end

#call_retired_callbacksObject



83
84
85
86
87
88
89
# File 'lib/concurrent_worker/worker.rb', line 83

def call_retired_callbacks
  Thread.handle_interrupt(Object => @options[:retired_callback_interrupt]) do
    @retired_callbacks.each do |callback|
      callback.call
    end
  end
end

#clear_callbacksObject



60
61
62
# File 'lib/concurrent_worker/worker.rb', line 60

def clear_callbacks
  @result_callbacks.clear
end

#clear_retired_callbacksObject



79
80
81
# File 'lib/concurrent_worker/worker.rb', line 79

def clear_retired_callbacks
  @retired_callbacks.clear
end

#define_block(symbol, &block) ⇒ Object



108
109
110
111
112
113
# File 'lib/concurrent_worker/worker.rb', line 108

def define_block(symbol,&block)
  worker_block = Proc.new do |*args|
    self.instance_exec(*args, &block)
  end
  instance_variable_set("@" + symbol.to_s, worker_block)
end

#define_block_yield(symbol) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'lib/concurrent_worker/worker.rb', line 115

def define_block_yield(symbol)
  define_singleton_method("yield_" + symbol.to_s) do |*args|
    blk = instance_variable_get("@" + symbol.to_s)
    if blk
      blk.call(*args)
    else
      raise "block " + symbol.to_s + " is not defined"
    end
  end
end

#joinObject



187
188
189
190
191
192
193
194
# File 'lib/concurrent_worker/worker.rb', line 187

def join
  unless @state == :run
    return
  end
  @req_counter.wait_until_less_than(1)
  quit
  wait_cncr_proc
end

#queue_available?Boolean

Returns:

  • (Boolean)


22
23
24
# File 'lib/concurrent_worker/worker.rb', line 22

def queue_available?
  !queue_closed? && @req_counter.size < @snd_queue_max
end

#queue_closed?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/concurrent_worker/worker.rb', line 16

def queue_closed?
  @req_counter.closed?
end

#queue_empty?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/concurrent_worker/worker.rb', line 19

def queue_empty?
  !queue_closed? && @req_counter.size == 0
end

#quitObject



175
176
177
178
179
180
181
182
183
184
185
# File 'lib/concurrent_worker/worker.rb', line 175

def quit
  unless @state == :run
    return
  end
  begin 
    send_req(nil)
    true
  rescue ClosedQueueError, IOError
    false
  end
end

#req(*args, &work_block) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/concurrent_worker/worker.rb', line 161

def req(*args, &work_block)
  unless @state == :run
    run
  end
  @req_counter.wait_until_less_than(@snd_queue_max)
  begin 
    @req_counter.push([args, work_block])
    send_req([args, work_block])
    true
  rescue ClosedQueueError, IOError
    false
  end
end

#result_handle_thread(&recv_block) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/concurrent_worker/worker.rb', line 91

def result_handle_thread(&recv_block)
  Thread.new do
    Thread.handle_interrupt(Object => :never) do
      begin
        Thread.handle_interrupt(Object => :immediate) do
          recv_block.call
        end
      ensure
        @req_counter.close
        channel_close
        call_retired_callbacks
      end
    end
  end
end

#runObject



154
155
156
157
158
159
# File 'lib/concurrent_worker/worker.rb', line 154

def run
  @state = :run
  set_default_loop_block unless defined?(@loop_block) && @loop_block
  set_default_base_block unless defined?(@base_block) && @base_block
  cncr_block
end

#set_block(symbol, &block) ⇒ Object



126
127
128
129
130
131
132
133
134
# File 'lib/concurrent_worker/worker.rb', line 126

def set_block(symbol, &block)
  raise "block is nil" unless block
  
  unless [:base_block, :loop_block, :work_block].include?(symbol)
    raise symbol.to_s + " is not used as worker block"
  end
  define_block(symbol,&block)
  define_block_yield(symbol)
end

#set_default_base_blockObject



148
149
150
151
152
# File 'lib/concurrent_worker/worker.rb', line 148

def set_default_base_block
  set_block(:base_block) do
    yield_loop_block
  end
end

#set_default_loop_blockObject



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/concurrent_worker/worker.rb', line 136

def set_default_loop_block
  set_block(:loop_block) do
    while req = receive_req
      (args, work_block) = req
      if work_block
        set_block(:work_block, &work_block)
      end
      send_res([yield_work_block(*args)])
    end
  end
end