Class: Fluent::ResqueOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::ResqueOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_resque.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #enqueue(queue, klass, args) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ ResqueOutput
constructor
A new instance of ResqueOutput.
- #redis ⇒ Object
-
#redis=(server) ⇒ Object
code from resque.rb.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ ResqueOutput
Returns a new instance of ResqueOutput.
16 17 18 19 20 21 |
# File 'lib/fluent/plugin/out_resque.rb', line 16 def initialize super require 'multi_json' require 'redis' require 'redis-namespace' end |
Instance Method Details
#configure(conf) ⇒ Object
23 24 25 26 27 28 |
# File 'lib/fluent/plugin/out_resque.rb', line 23 def configure(conf) super @worker_class_name_tag = conf['worker_class_name_tag'] || 'class' self.redis = conf['redis'] if conf['redis'] end |
#enqueue(queue, klass, args) ⇒ Object
58 59 60 61 |
# File 'lib/fluent/plugin/out_resque.rb', line 58 def enqueue(queue, klass, args) redis.sadd(:queues, queue.to_s) redis.rpush("queue:#{queue}", ::MultiJson.encode(:class => klass, :args => [args])) end |
#format(tag, time, record) ⇒ Object
71 72 73 |
# File 'lib/fluent/plugin/out_resque.rb', line 71 def format(tag, time, record) [tag, time, record].to_msgpack end |
#redis ⇒ Object
52 53 54 55 56 |
# File 'lib/fluent/plugin/out_resque.rb', line 52 def redis return @redis if @redis && !@redis.kind_of?(String) self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379" self.redis end |
#redis=(server) ⇒ Object
code from resque.rb
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/out_resque.rb', line 31 def redis=(server) case server when String if server =~ /redis\:\/\// redis = Redis.connect(:url => server, :thread_safe => true) else server, namespace = server.split('/', 2) host, port, db = server.split(':') redis = Redis.new(:host => host, :port => port, :thread_safe => true, :db => db) end namespace ||= :resque @redis = Redis::Namespace.new(namespace, :redis => redis) when Redis::Namespace @redis = server else @redis = Redis::Namespace.new(:resque, :redis => server) end end |
#shutdown ⇒ Object
67 68 69 |
# File 'lib/fluent/plugin/out_resque.rb', line 67 def shutdown super end |
#start ⇒ Object
63 64 65 |
# File 'lib/fluent/plugin/out_resque.rb', line 63 def start super end |
#write(chunk) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/fluent/plugin/out_resque.rb', line 75 def write(chunk) queue_name = @queue_mapped ? chunk.key : @queue chunk.msgpack_each {|tag, time, record| klass = record.delete(@worker_class_name_tag) if klass && !klass.empty? enqueue(queue_name, klass, record) else $log.error("record have not #{@worker_class_name_tag} key.") end } end |