Class: InternetHakai::ConcurrencyManager

Inherits:
Object
  • Object
show all
Defined in:
lib/internethakai/hakairev.rb,
lib/internethakai/concurrency_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeConcurrencyManager

Returns a new instance of ConcurrencyManager.



4
5
6
7
8
9
10
# File 'lib/internethakai/concurrency_manager.rb', line 4

def initialize
    @fork_mode = false
    @rev_mode = false
    @process = 0
    @suffix = ''
    @mutex = nil
end

Instance Attribute Details

#processObject

Returns the value of attribute process.



11
12
13
# File 'lib/internethakai/concurrency_manager.rb', line 11

def process
  @process
end

#suffixObject

Returns the value of attribute suffix.



11
12
13
# File 'lib/internethakai/concurrency_manager.rb', line 11

def suffix
  @suffix
end

Instance Method Details

#create_id(loop_c, th_c) ⇒ Object



175
176
177
# File 'lib/internethakai/concurrency_manager.rb', line 175

def create_id loop_c, th_c
    (loop_c.to_s + sprintf(@id_tmpl, th_c) + @suffix).to_i
end

#fork_reportObject



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/internethakai/concurrency_manager.rb', line 314

def fork_report
    @forkreport = true
    require 'pstore'
    result = nil
    time = nil
    count = nil
    @forkdb.transaction do
        result = @forkdb['result']
        time = @forkdb['time']
        count = @forkdb['request_count']
    end
    @logger.run "\nTimePerRequest: #{time / (count / @config['max_request_show'])}\n", 2 if time && count
    @logger.run "RequestPerSec: #{count / (time/1000)}\n", 2 if time && count

    reporter = BaseHandler::get_handler(@org_fork_reporter)
    reporter.run(result)
    reporter = nil
end

#on_completeObject

1プロセス終了時に呼ばれる



162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/internethakai/concurrency_manager.rb', line 162

def on_complete
    total = Time::now - @starttime
    @request_pool = BaseHandler::get_handler(@config['request_pool'])
    @request_pool.close
    #puts "thread collect start"
    thread_collect
    #puts "thread collect end"
    report_collect
    #puts "report collect end"
    @logger.run("TotalTime: " + total.to_s + "\n", 2) unless @fork_mode
    @failure = true if $error
    @cv.signal if @cv
end

#page_checkObject



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/internethakai/concurrency_manager.rb', line 107

def page_check
    keys = @result_record.keys
    pages = []
    keys.each do |key|
        err = @result_record[key][:errorcount] || 0
        if(err > 0)
            pages << key
        end
    end
    unless pages.empty?
        @logger.run("ERROR URL: #{pages.join(',')}\n", 2)
    end
end

#prepare_forkObject



31
32
33
34
35
36
# File 'lib/internethakai/concurrency_manager.rb', line 31

def prepare_fork
     return if @fork_mode
     @concurrency = @config['max_scenario']
     @process_max = @config['max_process'] || 2
     @fork_mode = true
end

#prepare_revObject



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/internethakai/concurrency_manager.rb', line 12

def prepare_rev
    return if @rev_mode #2回実行しないように
    require 'rubygems'
    require 'rev'
    require 'internethakai/hakairev'
    #TimeRegistProcess::run
    if @config['ssl']
        @config['http_client'] = 'RevSslClient'
    else
        @config['http_client'] = 'RevHttpClient'
    end
    @config['scenario_executer'] = 'NonBlockScenarioExecuter'
    @config['request_pool'] = 'RequestPoolNonThread'
    @config['queue'] = 'TaskQueue'
    #@config['response_handler'] = 'PipeTimeRegister'
    @config['response_handler'] = 'RevTimeRegister'
    @config['scan']
    @rev_mode = true
end

#report_collectObject



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/internethakai/concurrency_manager.rb', line 207

def report_collect
    d = (Time::now - @starttime)*1000
    if @fork_mode && @forkdb
        @forkdb.transaction do
            if not @forkdb['time']
                @forkdb['time'] = d
            elsif @forkdb['time'] < d
                @forkdb['time'] = d
            end
            @forkdb['request_count'] = 0 unless @forkdb['request_count']
            @forkdb['request_count'] += $REQUEST_COUNT
        end
    else
        @logger.run "\nTimePerRequest: #{d / ($REQUEST_COUNT / @config['max_request'])}\n", 2
        @logger.run "RequestPerSec: #{$REQUEST_COUNT / (d/1000)}\n", 2
    end
    record = ResponseRecord::new
    @threads.each do |k, th|
        if th[:result]
            record += th[:result].get_record
        end
    end
    @logger.run("\n\n", 2) unless @fork_mode
    reporter = BaseHandler::get_handler(@config["reporter"])
    reporter.run(record)
    @result_record = record
end

#rev_executerObject



11
12
13
14
# File 'lib/internethakai/hakairev.rb', line 11

def rev_executer
    @register.exec
    @scenario_executer.run_one_action
end

#rev_loop_runObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/internethakai/hakairev.rb', line 91

def rev_loop_run
    loop = Rev::Loop::default
    q = @queue
    rq = @request_pool
    clq = @client_queue
    while @rev_running
        flag = false
        if loop.has_active_watchers?
            loop.run_nonblock
        end
        if !rq.empty? and !clq.empty?
            a = rq.get
            cl = clq.get
            #a.client_queue = clq
            #puts "set: #{cl.object_id}"
            a.set_client(cl)
            a.run
            flag = true
        end
        if loop.has_active_watchers?
            loop.run_nonblock
        end
        if !q.empty?
            q.run
            flag = true
        end
        if !flag
            loop.run_once
        end
    end
end

#rev_on_completeObject



15
16
17
18
19
# File 'lib/internethakai/hakairev.rb', line 15

def rev_on_complete
    @queue.clear
    on_complete
    rev_stop
end

#rev_scenario_pushObject



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/internethakai/hakairev.rb', line 20

def rev_scenario_push
    flag = true
    @scenarios.each do |sc|
        flag = sc.next
    end
    unless flag
        #@logger.run("all scenario push\n", 2)
        @request_pool.quit
        @threads[:scenario].kill if @threads[:scenario]
    end
    flag
end

#rev_startObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/internethakai/hakairev.rb', line 40

def rev_start
    @request_pool = BaseHandler::get_handler(@config['request_pool'])
    th_cnt = 1
    th_id = (sprintf(@id_tmpl, th_cnt) + @suffix).to_i
    BaseHandler::set_thread_id(th_id)
    @queue = BaseHandler::get_handler('TaskQueue')
    TimerFactory::new(@config['timeout'], @config['max_request'], Rev::Loop::default)
    @client_queue = ClientQueue::new(@config['max_request'])
    1.upto(@config['max_request']) do |i|
        client_cls =  BaseHandler::get_class(@config['http_client'])
        cl = client_cls::create(@config['host'], @config['port'])
        cl.timeout = @config['timeout']
        cl.set_headers(@config['header'])
        cl.tfactory = TimerFactory::instance
        cl.client_queue = @client_queue
        @client_queue.add(cl)
    end

    Thread::current[:heartbeat] = Time::now
    report = BaseHandler::get_handler(@config["response_handler"])
    Thread.current[:result] = report

    @scenarios.each do |sc|
        #最初のアクションだけセット
        #sc.each do |act|
        #    act.client_queue = @client_queue
        #end
        sc.init
    end
    if @config.has_key?('total_duration')
        @config['infinity'] = true
        timer = Rev::TimerWatcher::new(@config['total_duration'], false)
        timer.on_timer(&method(:rev_on_complete))
        timer.attach(Rev::Loop::default)
        timer.enable unless timer.enabled?
    end

    @threads[:main] = Thread::current
    @starttime = Time::now
    @scenario_handler.set_on_complete(&method(:rev_on_complete))
    @scenario_handler.set_on_turn_complete(&@register.method(:exec))
    #rev_executer_run
    @rev_running = true
    #start_task_thread
    rev_loop_run
    if @failure
        exit 1
    else
        exit 0
    end
end

#rev_stopObject



122
123
124
# File 'lib/internethakai/hakairev.rb', line 122

def rev_stop
    @rev_running = false
end

#run(config, basepath, starttime) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/internethakai/concurrency_manager.rb', line 37

def run config, basepath, starttime
    @config = config
    @scenario_handler = BaseHandler::get_handler("ScenarioHandler")
    @failure = false
    @basepath = basepath
    BaseHandler::set_config(@config)
    @logger = BaseHandler::get_handler(@config["logger"])
    @threads = {}

    @concurrency = @config['max_scenario']
    if @config['rev']
        prepare_rev
    end
    if @config['max_process'] > 1
        prepare_fork
    end
    @register = BaseHandler::get_handler(@config['response_handler'])
    reporter = BaseHandler::get_handler(@config["reporter"])

    digit = @config['max_request'].to_s.size
    @id_tmpl = "%0#{digit}d"
    if @config['max_process'] > 1
        run_fork
    else
        run_process
    end
end

#run_executer(th_cnt) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/internethakai/concurrency_manager.rb', line 149

def run_executer th_cnt
    th_id = (sprintf(@id_tmpl, th_cnt) + @suffix).to_i
    report = BaseHandler::get_handler(@config["response_handler"])
    Thread.current[:result] = report
    BaseHandler::set_thread_id(th_id)
    cls = BaseHandler::get_class(@config['scenario_executer'])
    @scenario_handler.set_on_turn_complete(&@register.method(:exec))
    exec = cls::new
    exec.set_on_complete &method(:on_complete)
    @starttime = Time::now
    exec.run
end

#run_forkObject



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/internethakai/concurrency_manager.rb', line 234

def run_fork
    org_reporter = @config['reporter']

    @logger.run("Process: #{@process_max}\n", 2)
    @parent = false

    flag = false
    Signal::trap(:USR1){
        return if flag
        flag = true
        #parent
        puts "error: SIGUSR1"
        @pids.each do |pid|
            Process::kill(:USR2, pid)
        end
    }
    Signal::trap(:INT){
        flag = true
        puts "error: SIGINT"
        @pids.each do |pid|
            Process::kill(:USR2, pid) if pid
        end
    }
    Signal::trap(:USR2){
        #child
        report_collect
        exit
    }

    #各プロセスにはpstoreファイルに集計させる
    @config['reporter'] = 'PStoreReporter'
    fname = 'tmp.db'
    fpath = File::join(@basepath, fname)
    if FileTest::exists?(fpath)
        raise 'cannot create db' unless FileTest::writable?(fpath)
        raise 'cannot create db' unless File::delete(fpath)
    end
    db = PStore::new(fpath)
    @forkdb = db

    reporter = BaseHandler::get_handler(@config["reporter"])
    reporter.set_dir(@basepath)
    reporter.init_filename(fname)

    begin
        @pids = []
        @process = @process_max
        digit = @process_max.to_s.size
        tmpl = "%0#{digit}d"
        1.upto(@process_max) do |i|
            @pids << Process::fork {
                @suffix = sprintf(tmpl, i)
                run_process
            }
            #1プロセス目以外は出力を少なく
            @scenario_handler.silence if i==2
        end
        @forkreport = false
        @org_fork_reporter = org_reporter
        results = Process::waitall
        failure = false
        results.each do |r|
            if r[1].to_i != 0
                failure = true
                break
            end
        end
        @logger.run "\n", 2
        @logger.run "result: ", 2
        unless failure
            @logger.run "success\n", 2
        else
            @logger.run "failure\n", 2
        end
        fork_report
    ensure
        fork_report unless @forkreport
        File::delete(fpath) if File::exists?(fpath)
    end
end

#run_processObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/internethakai/concurrency_manager.rb', line 64

def run_process
    #並列数だけシナリオのコピーをつくっておく
    @config['request_pool'] = 'RequestPoolNonThread' if @config["max_request"] <= 1
    @scenarios = []
    1.upto(@concurrency) do
        @scenarios << @scenario_handler.create_scenario(@suffix)
    end
    @scenario_handler.on_scenarios_created
    $REQUEST_COUNT = 0
    begin
        if @rev_mode
            rev_start
        elsif @config["max_request"] <= 1
            single_start
        else
            thread_start
        end
    rescue SystemExit
        return
    rescue Interrupt => e
        if @fork_mode
            #死ぬ前に親に連絡
            Process::kill(:USR1, Process::ppid)
        else
            report_collect
        end
    end
end

#single_startObject



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/internethakai/concurrency_manager.rb', line 92

def single_start
    th_cnt = 1
    @threads[1] = Thread::current
    @scenarios.each do |sc|
        sc.init
    end
    #シングルスレッドのときは先にプールにためておく
    run_executer th_cnt
    if @failure
        page_check
        exit 1
    else
        exit 0
    end
end

#start_task_threadObject



32
33
34
35
36
37
38
39
# File 'lib/internethakai/hakairev.rb', line 32

def start_task_thread
    q = @queue
    Thread::start(q) do |q|
        while @rev_running
            q.run
        end
    end
end

#thread_collectObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/internethakai/concurrency_manager.rb', line 178

def thread_collect
    count = 0
    status = '.'
    flag = false
    count = 0
    @threads.each do |k, th|
        if(th[:result])
            status = 't'
            ect = 0
            th[:result].get_record.each do |name,res|
                ect += 1 if res[:errorcount] > 0
            end
            if ect > 0
                status = 'e'
                flag = true unless flag
            end
            #th.kill if th!=Thread::current && k!=:main
        end
        #@logger.run status, 2
    end
    @logger.run "\n", 2 unless @fork_mode
    @logger.run "result: ", 2 unless @fork_mode
    if flag
        @logger.run "failure\n", 2 unless @fork_mode
    else
        @logger.run "success\n", 2 unless @fork_mode
    end
    @failure = flag
end

#thread_startObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/internethakai/concurrency_manager.rb', line 120

def thread_start
    @mutex = Mutex::new
    @request_pool = BaseHandler::get_handler(@config['request_pool'])
    @cv = ConditionVariable::new

    @scenarios.each do |sc|
        sc.init
    end
    Thread.abort_on_exception
    @scenario_handler.set_on_turn_complete &method(:on_complete)
    1.upto(@config["max_request"]) do |th_cnt|
        @threads[th_cnt] = Thread::new(th_cnt) do |th_cnt|
            run_executer th_cnt
        end
    end
    @threads[1].join
    @threads[:main] = Thread::current
    @mutex.synchronize{
        @cv.wait(@mutex)
    }
    @threads.each do |k,th|
        th.kill if th!=Thread::current && k!=:main
    end
    if @failure
        exit 1
    else
        exit 0
    end
end