Class: Fluent::BufferedResqueOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::BufferedResqueOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_buffered_resque.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #enqueue(queue, klass, args) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ BufferedResqueOutput
constructor
A new instance of BufferedResqueOutput.
- #redis ⇒ Object
-
#redis=(server) ⇒ Object
code from resque.rb.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ BufferedResqueOutput
Returns a new instance of BufferedResqueOutput.
23 24 25 26 27 28 |
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 23 def initialize super require 'multi_json' require 'redis' require 'redis-namespace' end |
Instance Method Details
#configure(conf) ⇒ Object
30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 30 def configure(conf) super @worker_class_name_tag = conf['worker_class_name_tag'] || 'class' @worker_class = conf['worker_class'] self.redis = conf['redis'] if conf['redis'] end |
#enqueue(queue, klass, args) ⇒ Object
66 67 68 69 |
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 66 def enqueue(queue, klass, args) redis.sadd(:queues, queue.to_s) redis.rpush("queue:#{queue}", ::MultiJson.encode(:class => klass, :args => [args])) end |
#format(tag, time, record) ⇒ Object
79 80 81 |
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 79 def format(tag, time, record) [tag, time, record].to_msgpack end |
#redis ⇒ Object
60 61 62 63 64 |
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 60 def redis return @redis if @redis && !@redis.kind_of?(String) self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379" self.redis end |
#redis=(server) ⇒ Object
code from resque.rb
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 39 def redis=(server) case server when String if server =~ /redis\:\/\// redis = Redis.connect(:url => server, :thread_safe => true) else server, namespace = server.split('/', 2) host, port, db = server.split(':') redis = Redis.new(:host => host, :port => port, :thread_safe => true, :db => db) end namespace ||= :resque @redis = Redis::Namespace.new(namespace, :redis => redis) when Redis::Namespace @redis = server else @redis = Redis::Namespace.new(:resque, :redis => server) end end |
#shutdown ⇒ Object
75 76 77 |
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 75 def shutdown super end |
#start ⇒ Object
71 72 73 |
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 71 def start super end |
#write(chunk) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/fluent/plugin/out_buffered_resque.rb', line 83 def write(chunk) queue_name = @queue_mapped ? chunk.key : @queue if klass = @worker_class and @bulk_queueing records = [] chunk.msgpack_each {|tag, time, record| record.delete(@worker_class_name_tag) records << record } log.debug("Flushing #{records.size} records to #{queue_name}:#{klass}") enqueue(queue_name, klass, records) else chunk.msgpack_each {|tag, time, record| klass = @worker_class || record.delete(@worker_class_name_tag) if klass && !klass.empty? log.debug("Enqueuing one record to #{queue_name}:#{klass}") enqueue(queue_name, klass, record) else log.error("Neither worker_class param nor #{@worker_class_name_tag} record key was supplied.") end } end end |