Class: Execache

Inherits:
Object
  • Object
show all
Defined in:
lib/execache.rb,
lib/execache/client.rb

Defined Under Namespace

Classes: Client

Instance Method Summary collapse

Constructor Details

#initialize(yaml) ⇒ Execache

Returns a new instance of Execache.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/execache.rb', line 17

def initialize(yaml)
  options = YAML.load(File.read(yaml))

  puts "\nStarting execache server (redis @ #{options['redis']})..."

  redis = Redis.connect(:url => "redis://#{options['redis']}")
  retries = 0
  
  begin
    while true
      request = redis.lpop('execache:request')
      if request
        Timeout.timeout(60) do
          request = Yajl::Parser.parse(request)
          channel = request.delete('channel')
          force = request.delete('channel')
          commands = []
          pending = false

          request.each do |cmd_type, cmd_options|
            # Command with preliminary args
            command = [
              options[cmd_type]['command'],
              cmd_options['args']
            ]
            
            # Fill results with caches if present
            cmd_options['groups'].each do |group|
              cache_key = Digest::SHA1.hexdigest(
                "#{group['cache_key'] || cmd_options['args']} #{group['args']}"
              )
              group['cache_key'] = cache_key = "execache:cache:#{cache_key}"
              cache = redis.get(cache_key)
              
              if cache && cache == '[PENDING]'
                pending = true
                group['result'] = true
              elsif !force && !group['force'] && cache
                group['result'] = Yajl::Parser.parse(cache)
              else
                pending = true
                redis.set(cache_key, '[PENDING]')
                redis.expire(cache_key, 60) # Timeout incase execution fails
                command << group['args']
              end
            end
            
            # Add command to be executed if not all args are cached
            if command.length > 2
              cmd_options['cmd'] = command.join(' ')
            end
          end

          if pending
            # Execute command in thread, cache results
            unless redis.get('execache:wait')
              Thread.new do
                Timeout.timeout(60) do
                  redis.set('execache:wait', '1')
                  redis.expire('execache:wait', 120)
                  request.each do |cmd_type, cmd_options|
                    if cmd_options['cmd']
                      separators = options[cmd_type]['separators'] || {}
                      separators['group'] ||= "[END]"
                      separators['result'] ||= "\n"
                      output = `#{cmd_options['cmd']}`
                      output = output.split(separators['group'] + separators['result'])
                      output = output.collect { |r| r.split(separators['result']) }
                    end

                    cmd_options['groups'].each do |group|
                      unless group['result']
                        redis.set(
                          group['cache_key'],
                          Yajl::Encoder.encode(output.shift)
                        )
                        if group['ttl']
                          redis.expire(group['cache_key'], group['ttl'])
                        end
                      end
                    end
                  end
                  redis.del('execache:wait')
                end
              end
            end
          else
            response = request.inject({}) do |hash, (cmd_type, cmd_options)|
              hash[cmd_type] = []

              cmd_options['groups'].each do |group|
                hash[cmd_type] << group['result']
              end

              hash
            end
          end
          
          redis.publish(
            "execache:response:#{channel}",
            pending ? '[PENDING]' : Yajl::Encoder.encode(response)
          )
        end
      end
      sleep(1.0 / 1000.0)
    end
  rescue Interrupt
    shut_down
  rescue Exception => e
    puts "\nError: #{e.message}"
    puts "\t#{e.backtrace.join("\n\t")}"
    retries += 1
    shut_down if retries >= 10
    retry
  end
end

Instance Method Details

#shut_downObject



134
135
136
137
# File 'lib/execache.rb', line 134

def shut_down
  puts "\nShutting down execache server..."
  exit
end