Class: Fluent::ResqueOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_resque.rb

Instance Method Summary collapse

Constructor Details

#initializeResqueOutput

Returns a new instance of ResqueOutput.



16
17
18
19
20
21
# File 'lib/fluent/plugin/out_resque.rb', line 16

def initialize
  super
  require 'multi_json'
  require 'redis'
  require 'redis-namespace'
end

Instance Method Details

#configure(conf) ⇒ Object



23
24
25
26
27
28
# File 'lib/fluent/plugin/out_resque.rb', line 23

def configure(conf)
  super

  @worker_class_name_tag = conf['worker_class_name_tag'] || 'class'
  self.redis = conf['redis'] if conf['redis']
end

#enqueue(queue, klass, args) ⇒ Object



58
59
60
61
# File 'lib/fluent/plugin/out_resque.rb', line 58

def enqueue(queue, klass, args)
  redis.sadd(:queues, queue.to_s)
  redis.rpush("queue:#{queue}", ::MultiJson.encode(:class => klass, :args => [args]))
end

#format(tag, time, record) ⇒ Object



71
72
73
# File 'lib/fluent/plugin/out_resque.rb', line 71

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#redisObject



52
53
54
55
56
# File 'lib/fluent/plugin/out_resque.rb', line 52

def redis
  return @redis if @redis && !@redis.kind_of?(String)
  self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379"
  self.redis
end

#redis=(server) ⇒ Object

code from resque.rb



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/out_resque.rb', line 31

def redis=(server)
  case server
  when String
    if server =~ /redis\:\/\//
      redis = Redis.connect(:url => server, :thread_safe => true)
    else
      server, namespace = server.split('/', 2)
      host, port, db = server.split(':')
      redis = Redis.new(:host => host, :port => port,
                        :thread_safe => true, :db => db)
    end
    namespace ||= :resque

    @redis = Redis::Namespace.new(namespace, :redis => redis)
  when Redis::Namespace
    @redis = server
  else
    @redis = Redis::Namespace.new(:resque, :redis => server)
  end
end

#shutdownObject



67
68
69
# File 'lib/fluent/plugin/out_resque.rb', line 67

def shutdown
  super
end

#startObject



63
64
65
# File 'lib/fluent/plugin/out_resque.rb', line 63

def start
  super
end

#write(chunk) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/out_resque.rb', line 75

def write(chunk)
  queue_name = @queue_mapped ? chunk.key : @queue

  chunk.msgpack_each {|tag, time, record|
    klass = record.delete(@worker_class_name_tag)
    if klass && !klass.empty?
      enqueue(queue_name, klass, record)
    else
      $log.error("record have not #{@worker_class_name_tag} key.")
    end
  }
end