Class: HistoryCommander

Inherits:
EventMachine::FileTail
  • Object
show all
Defined in:
lib/history_commander.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, startpos = -1,, mode = "full") ⇒ HistoryCommander

path <~String> File path to monitor mode <~String> Can be set to “full” for read/write mode, and any other value for write only mode. startpos <~Integer> File position to start tailing the file. Default of -1 starts at the end of the file



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/history_commander.rb', line 21

def initialize(path, startpos=-1, mode="full")
  super(path, startpos)
  FileUtils.cp(path, safe_path)
  @buffer = BufferedTokenizer.new
  @global_history_fanout = MQ.new.fanout('global_history')
  @uuid = UUIDTools::UUID.random_create.to_s
  @host = `hostname`.chomp
  @user = `whoami`.chomp
  @pause = false
  subscribe if mode == "full"
end

Instance Attribute Details

#pauseObject

Returns the value of attribute pause.



12
13
14
# File 'lib/history_commander.rb', line 12

def pause
  @pause
end

#uuidObject

Returns the value of attribute uuid.



11
12
13
# File 'lib/history_commander.rb', line 11

def uuid
  @uuid
end

Instance Method Details

#receive_data(data) ⇒ Object

Receive data from the FileTail and submit it to the MQ



35
36
37
38
39
40
41
42
43
# File 'lib/history_commander.rb', line 35

def receive_data(data)
  @buffer.extract(data).each do |line|
    payload = { :uuid => @uuid, 
                :message => line,
                :host => @host,
                :user => @user }
    @global_history_fanout.publish(payload.to_json)
  end
end

#safe_pathObject



14
15
16
# File 'lib/history_commander.rb', line 14

def safe_path
  "#{path}_safe"
end

#schedule_next_readObject



45
46
47
48
49
50
51
# File 'lib/history_commander.rb', line 45

def schedule_next_read
  unless @pause
    EventMachine::add_timer(@naptime) do
      read
    end
  end
end

#skip_aheadObject



53
54
55
56
57
# File 'lib/history_commander.rb', line 53

def skip_ahead
  def skip_ahead
    @pos = @file.sysseek(0, IO::SEEK_END)
  end
end

#subscribeObject

Subscribe to the global history exchange and sync the history file with any new inbound global history. Pauses FileTail and skips the output when writing to the history file.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/history_commander.rb', line 60

def subscribe 
  @subscription = MQ.new
  @subscription.queue(@uuid).bind(@subscription.fanout('global_history')).subscribe do |result|
    x = Mash.new(JSON::parse(result))
    puts "received: #{x[:uuid]} #{x[:user]}@#{x[:host]}$ #{x[:message]}"
    if x[:uuid] != @uuid
      @pause = true
      File.open(path, "a") {|f| f.puts(x[:message])}
      skip_ahead
      @pause = false
      schedule_next_read
    end
  end
end