Class: FaaStRuby::Local::Processor
- Inherits:
-
Object
- Object
- FaaStRuby::Local::Processor
show all
- Includes:
- Logger
- Defined in:
- lib/faastruby/local/processors/processor.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logger
#debug, puts, #puts
Constructor Details
#initialize(queue) ⇒ Processor
Returns a new instance of Processor.
6
7
8
9
10
11
12
13
|
# File 'lib/faastruby/local/processors/processor.rb', line 6
def initialize(queue)
debug "initialize(#{queue.inspect})"
@queue = queue
@ignore = {}
@mutex = Mutex.new
@threads_mutex = Mutex.new
@threads = {}
end
|
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
5
6
7
|
# File 'lib/faastruby/local/processors/processor.rb', line 5
def queue
@queue
end
|
#thread ⇒ Object
Returns the value of attribute thread.
5
6
7
|
# File 'lib/faastruby/local/processors/processor.rb', line 5
def thread
@thread
end
|
Instance Method Details
#add_ignore(entry) ⇒ Object
42
43
44
45
46
47
48
|
# File 'lib/faastruby/local/processors/processor.rb', line 42
def add_ignore(entry)
debug "add_ignore(#{entry})"
@mutex.synchronize do
@ignore[entry] = true
debug "Added #{@ignore[entry]}"
end
end
|
#add_thread(name, action, &block) ⇒ Object
72
73
74
75
76
77
|
# File 'lib/faastruby/local/processors/processor.rb', line 72
def add_thread(name, action, &block)
debug __method__
@threads_mutex.synchronize do
@threads[name] = {action => start_thread(name, action, &block)}
end
end
|
#get_thread(name, action) ⇒ Object
88
89
90
91
92
93
94
|
# File 'lib/faastruby/local/processors/processor.rb', line 88
def get_thread(name, action)
debug __method__
@threads_mutex.synchronize do
return
return nil
end
end
|
#kill_thread(name, action) ⇒ Object
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/faastruby/local/processors/processor.rb', line 104
def kill_thread(name, action)
debug __method__
@threads_mutex.synchronize do
if @threads[name] && @threads[name][action]
puts "Killing previous '#{action}' action for '#{name}'."
Thread.kill @threads[name][action]
@threads[name].delete(action)
end
end
end
|
#present_in_ignore_list?(entry) ⇒ Boolean
50
51
52
53
54
55
56
|
# File 'lib/faastruby/local/processors/processor.rb', line 50
def present_in_ignore_list?(entry)
debug "present_in_ignore_list(#{entry})"
@mutex.synchronize do
@ignore[entry] ? debug(true) : debug(false)
@ignore[entry]
end
end
|
#remove_ignore(entry) ⇒ Object
58
59
60
61
62
63
64
|
# File 'lib/faastruby/local/processors/processor.rb', line 58
def remove_ignore(entry)
debug "remove_ignore(#{entry})"
@mutex.synchronize do
@ignore.delete(entry)
debug "Removed: is nil? #{@ignore[entry].nil?}"
end
end
|
#remove_thread_record(name, action) ⇒ Object
96
97
98
99
100
101
102
|
# File 'lib/faastruby/local/processors/processor.rb', line 96
def remove_thread_record(name, action)
@threads_mutex.synchronize do
if @threads[name] && @threads[name][action]
@threads[name].delete(action)
end
end
end
|
#run(name, action, &block) ⇒ Object
66
67
68
69
70
|
# File 'lib/faastruby/local/processors/processor.rb', line 66
def run(name, action, &block)
debug __method__
kill_thread(name, action)
add_thread(name, action, &block)
end
|
#should_ignore?(event) ⇒ Boolean
33
34
35
36
37
38
39
40
|
# File 'lib/faastruby/local/processors/processor.rb', line 33
def should_ignore?(event)
debug "should_ignore?(#{event.inspect})"
if present_in_ignore_list?(event.dirname)
debug "SKIP #{event}"
return true
end
return false
end
|
#start ⇒ Object
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/faastruby/local/processors/processor.rb', line 15
def start
debug "start"
thread = Thread.new do
loop do
begin
event = queue.pop
next if should_ignore?(event)
send(event.type, event)
rescue StandardError => e
String.disable_colorization = true
STDOUT.puts e.full_message
String.disable_colorization = false
next
end
end
end
end
|
#start_thread(name, action, &block) ⇒ Object
79
80
81
82
83
84
85
86
|
# File 'lib/faastruby/local/processors/processor.rb', line 79
def start_thread(name, action, &block)
debug __method__
Thread.new do
Thread.report_on_exception = false
yield
remove_thread_record(name, action)
end
end
|