Class: FaaStRuby::Local::Processor

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/faastruby/local/processors/processor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#debug, puts, #puts

Constructor Details

#initialize(queue) ⇒ Processor

Returns a new instance of Processor.



6
7
8
9
10
11
12
13
# File 'lib/faastruby/local/processors/processor.rb', line 6

def initialize(queue)
  debug "initialize(#{queue.inspect})"
  @queue = queue
  @ignore = {}
  @mutex = Mutex.new
  @threads_mutex = Mutex.new
  @threads = {}
end

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



5
6
7
# File 'lib/faastruby/local/processors/processor.rb', line 5

def queue
  @queue
end

#threadObject

Returns the value of attribute thread.



5
6
7
# File 'lib/faastruby/local/processors/processor.rb', line 5

def thread
  @thread
end

Instance Method Details

#add_ignore(entry) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/faastruby/local/processors/processor.rb', line 42

def add_ignore(entry)
  debug "add_ignore(#{entry})"
  @mutex.synchronize do
    @ignore[entry] = true
    debug "Added #{@ignore[entry]}"
  end
end

#add_thread(name, action, &block) ⇒ Object



72
73
74
75
76
77
# File 'lib/faastruby/local/processors/processor.rb', line 72

def add_thread(name, action, &block)
  debug __method__
  @threads_mutex.synchronize do
    @threads[name] = {action => start_thread(name, action, &block)}
  end
end

#get_thread(name, action) ⇒ Object



88
89
90
91
92
93
94
# File 'lib/faastruby/local/processors/processor.rb', line 88

def get_thread(name, action)
  debug __method__
  @threads_mutex.synchronize do
    return
    return nil
  end
end

#kill_thread(name, action) ⇒ Object



104
105
106
107
108
109
110
111
112
113
# File 'lib/faastruby/local/processors/processor.rb', line 104

def kill_thread(name, action)
  debug __method__
  @threads_mutex.synchronize do
    if @threads[name] && @threads[name][action]
      puts "Killing previous '#{action}' action for '#{name}'."
      Thread.kill @threads[name][action]
      @threads[name].delete(action)
    end
  end
end

#present_in_ignore_list?(entry) ⇒ Boolean

Returns:

  • (Boolean)


50
51
52
53
54
55
56
# File 'lib/faastruby/local/processors/processor.rb', line 50

def present_in_ignore_list?(entry)
  debug "present_in_ignore_list(#{entry})"
  @mutex.synchronize do
    @ignore[entry] ? debug(true) : debug(false)
    @ignore[entry]
  end
end

#remove_ignore(entry) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/faastruby/local/processors/processor.rb', line 58

def remove_ignore(entry)
  debug "remove_ignore(#{entry})"
  @mutex.synchronize do
    @ignore.delete(entry)
    debug "Removed: is nil? #{@ignore[entry].nil?}"
  end
end

#remove_thread_record(name, action) ⇒ Object



96
97
98
99
100
101
102
# File 'lib/faastruby/local/processors/processor.rb', line 96

def remove_thread_record(name, action)
  @threads_mutex.synchronize do
    if @threads[name] && @threads[name][action]
      @threads[name].delete(action)
    end
  end
end

#run(name, action, &block) ⇒ Object



66
67
68
69
70
# File 'lib/faastruby/local/processors/processor.rb', line 66

def run(name, action, &block)
  debug __method__
  kill_thread(name, action)
  add_thread(name, action, &block)
end

#should_ignore?(event) ⇒ Boolean

Returns:

  • (Boolean)


33
34
35
36
37
38
39
40
# File 'lib/faastruby/local/processors/processor.rb', line 33

def should_ignore?(event)
  debug "should_ignore?(#{event.inspect})"
  if present_in_ignore_list?(event.dirname)
    debug "SKIP #{event}"
    return true
  end
  return false
end

#startObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/faastruby/local/processors/processor.rb', line 15

def start
  debug "start"
  thread = Thread.new do
    loop do
      begin
        event = queue.pop
        next if should_ignore?(event)
        send(event.type, event)
      rescue StandardError => e
        String.disable_colorization = true
        STDOUT.puts e.full_message
        String.disable_colorization = false
        next
      end
    end
  end
end

#start_thread(name, action, &block) ⇒ Object



79
80
81
82
83
84
85
86
# File 'lib/faastruby/local/processors/processor.rb', line 79

def start_thread(name, action, &block)
  debug __method__
  Thread.new do
    Thread.report_on_exception = false
    yield
    remove_thread_record(name, action)
  end
end