Class: Message::Filters

Inherits:
Object
  • Object
show all
Defined in:
lib/message/filters.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeFilters

Returns a new instance of Filters.



7
8
9
10
11
12
13
14
15
# File 'lib/message/filters.rb', line 7

def initialize
  @data = Hash.new{|h,k|h[k]=[]}
  @config = {
    :error_handling_callback => lambda {|type, job, msg, e| }
  }
  defaults.each do |t, m|
    self[t] << [m, method(m)]
  end
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



5
6
7
# File 'lib/message/filters.rb', line 5

def config
  @config
end

Instance Method Details

#[](type) ⇒ Object



17
18
19
# File 'lib/message/filters.rb', line 17

def [](type)
  @data[type]
end

#benchmarking(filter, job) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/message/filters.rb', line 44

def benchmarking(filter, job)
  lambda do |size, &processor|
    filter.call(size) do |msg|
      ret = nil
      Message.logger.info { "#{job.name}: processing one message"}
      s = Benchmark.realtime do
        ret = processor.call(msg)
      end
      Message.logger.info { "#{job.name}: processed in #{(1000 * s).to_i}ms" }
      ret
    end
  end
end

#defaultsObject



21
22
23
24
25
26
27
# File 'lib/message/filters.rb', line 21

def defaults
  [
    [:process, :error_handling],
    [:enq, :error_handling],
    [:process, :benchmarking]
  ]
end

#error_handling(filter, job) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/message/filters.rb', line 29

def error_handling(filter, job)
  lambda do |arg, &processor|
    type = processor ? :process : :enq
    log_error(type, job, (type == :enq ? arg : nil)) do
      if processor
        filter.call(arg) do |msg|
          log_error(type, job, msg) { processor.call(msg) }
        end
      else
        filter.call(arg)
      end
    end
  end
end