Class: Message::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/message/worker.rb

Defined Under Namespace

Modules: Enqueue

Constant Summary collapse

DEFAULT_JOB_NAME =
'message-worker-default'
DEFAULT_PROCESS_SIZE =
10
DEFAULT_PROCESS_INTERVAL =
5

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_name) ⇒ Worker

Returns a new instance of Worker.



64
65
66
# File 'lib/message/worker.rb', line 64

def initialize(job_name)
  @job_name = job_name
end

Class Attribute Details

.default_jobObject

Returns the value of attribute default_job.



33
34
35
# File 'lib/message/worker.rb', line 33

def default_job
  @default_job
end

.syncObject

Returns the value of attribute sync.



33
34
35
# File 'lib/message/worker.rb', line 33

def sync
  @sync
end

Instance Attribute Details

#job_nameObject (readonly)

Returns the value of attribute job_name.



62
63
64
# File 'lib/message/worker.rb', line 62

def job_name
  @job_name
end

Class Method Details

.defaultObject



39
40
41
# File 'lib/message/worker.rb', line 39

def default
  new(default_job)
end

.jobsObject



51
52
53
# File 'lib/message/worker.rb', line 51

def jobs
  @jobs ||= RUBY_PLATFORM =~ /java/ ? java.util.concurrent.ConcurrentHashMap.new : {}
end

.process(*args) ⇒ Object



43
44
45
# File 'lib/message/worker.rb', line 43

def process(*args)
  default.process(*args)
end

.resetObject



55
56
57
58
59
# File 'lib/message/worker.rb', line 55

def reset
  @default_job = nil
  @sync = nil
  @jobs = nil
end

.start(*args) ⇒ Object



47
48
49
# File 'lib/message/worker.rb', line 47

def start(*args)
  default.start(*args)
end

Instance Method Details

#enq(work) ⇒ Object Also known as: <<



89
90
91
92
93
94
95
# File 'lib/message/worker.rb', line 89

def enq(work)
  if self.class.sync
    process_work(work)
  else
    job.enq(YAML.dump(work))
  end
end

#process(size = 1) ⇒ Object



85
86
87
# File 'lib/message/worker.rb', line 85

def process(size=1)
  job.process(size)
end

#start(options = {}) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/message/worker.rb', line 68

def start(options={})
  size = options[:size] || DEFAULT_PROCESS_SIZE
  interval = options[:interval] || DEFAULT_PROCESS_INTERVAL
  Thread.start do
    begin
      log(:info) { "start" }
      loop do
        process(size)
        sleep interval
      end
      log(:info) { "stopped" }
    rescue => e
      log(:error) { "crashed: #{e.message}\n#{e.backtrace.join("\n")}"}
    end
  end
end