Class: Fluent::BufferedResqueOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_buffered_resque.rb

Instance Method Summary collapse

Constructor Details

#initializeBufferedResqueOutput

Returns a new instance of BufferedResqueOutput.



23
24
25
26
27
28
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 23

def initialize
  super
  require 'multi_json'
  require 'redis'
  require 'redis-namespace'
end

Instance Method Details

#configure(conf) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 30

def configure(conf)
  super

  @worker_class_name_tag = conf['worker_class_name_tag'] || 'class'
  @worker_class = conf['worker_class']
  self.redis = conf['redis'] if conf['redis']
end

#enqueue(queue, klass, args) ⇒ Object



66
67
68
69
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 66

def enqueue(queue, klass, args)
  redis.sadd(:queues, queue.to_s)
  redis.rpush("queue:#{queue}", ::MultiJson.encode(:class => klass, :args => [args]))
end

#format(tag, time, record) ⇒ Object



79
80
81
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 79

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#redisObject



60
61
62
63
64
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 60

def redis
  return @redis if @redis && !@redis.kind_of?(String)
  self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379"
  self.redis
end

#redis=(server) ⇒ Object

code from resque.rb



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 39

def redis=(server)
  case server
  when String
    if server =~ /redis\:\/\//
      redis = Redis.connect(:url => server, :thread_safe => true)
    else
      server, namespace = server.split('/', 2)
      host, port, db = server.split(':')
      redis = Redis.new(:host => host, :port => port,
                        :thread_safe => true, :db => db)
    end
    namespace ||= :resque

    @redis = Redis::Namespace.new(namespace, :redis => redis)
  when Redis::Namespace
    @redis = server
  else
    @redis = Redis::Namespace.new(:resque, :redis => server)
  end
end

#shutdownObject



75
76
77
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 75

def shutdown
  super
end

#startObject



71
72
73
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 71

def start
  super
end

#write(chunk) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 83

def write(chunk)
  queue_name = @queue_mapped ? chunk.key : @queue

  if klass = @worker_class and @bulk_queueing
    records = []
    chunk.msgpack_each {|tag, time, record|
      record.delete(@worker_class_name_tag)
      records << record
    }
    log.debug("Flushing #{records.size} records to #{queue_name}:#{klass}")
    enqueue(queue_name, klass, records)
  else
    chunk.msgpack_each {|tag, time, record|
      klass = @worker_class || record.delete(@worker_class_name_tag)
      if klass && !klass.empty?
        log.debug("Enqueuing one record to #{queue_name}:#{klass}")
        enqueue(queue_name, klass, record)
      else
        log.error("Neither worker_class param nor #{@worker_class_name_tag} record key was supplied.")
      end
    }
  end
end