Module: FIR::Util::ClassMethods

Includes:
Config
Defined in:
lib/tail_f_rabbitmq/util.rb

Constant Summary

Constants included from Config

Config::CONFIG_PATH

Instance Method Summary collapse

Methods included from Config

#config, #default_config, #reload_config, #reset_config, #write_config

Instance Method Details

#monitor(file_path) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/tail_f_rabbitmq/util.rb', line 11

def monitor file_path
  @conn = Bunny.new(config[:url])
  @conn.start
  @ch = @conn.create_channel
  @q  = @ch.queue(config[:queue], :auto_delete => true)
  @x  = @ch.default_exchange
  notifier = INotify::Notifier.new
  file_filter = config[:file_filter] 
  notifier.watch file_path, :create, :recursive do |event|
    if file_filter.nil? or event.name.end_with? file_filter
      Thread.new do
        tail_f event.absolute_name do |lines, queue|
          send_msg event.name, lines
          queue.stop if lines.include? config[:eof]
        end
      end
    end
  end
  notifier.run
end

#send_msg(name, line) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/tail_f_rabbitmq/util.rb', line 43

def send_msg(name, line)
  msg = {
    name: name,
    line: line
  }
  @x.publish(msg.to_json, :routing_key => @q.name) if line != ""
end

#tail_f(file_path) ⇒ Object



32
33
34
35
36
37
38
39
40
41
# File 'lib/tail_f_rabbitmq/util.rb', line 32

def tail_f file_path
  queue = INotify::Notifier.new
  open(file_path) do |file|
    yield file.read, queue # 先处理一次(create 时候的数据)
    queue.watch(file_path, :modify) do
      yield file.read, queue
    end
    queue.run
  end
end