Class: Message::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/message/worker.rb

Defined Under Namespace

Modules: Enqueue

Constant Summary collapse

DEFAULT_JOB_NAME =
'message-worker-default'
DEFAULT_PROCESS_SIZE =
10
DEFAULT_PROCESS_INTERVAL =
5

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_name) ⇒ Worker

Returns a new instance of Worker.



72
73
74
# File 'lib/message/worker.rb', line 72

def initialize(job_name)
  @job_name = job_name
end

Class Attribute Details

.default_jobObject

Returns the value of attribute default_job.



33
34
35
# File 'lib/message/worker.rb', line 33

def default_job
  @default_job
end

.syncObject

Returns the value of attribute sync.



33
34
35
# File 'lib/message/worker.rb', line 33

def sync
  @sync
end

Instance Attribute Details

#job_nameObject (readonly)

Returns the value of attribute job_name.



70
71
72
# File 'lib/message/worker.rb', line 70

def job_name
  @job_name
end

Class Method Details

.callbacksObject



55
56
57
# File 'lib/message/worker.rb', line 55

def callbacks
  @callbacks ||= {:start => [], :crash => [], :stop => []}
end

.defaultObject



39
40
41
# File 'lib/message/worker.rb', line 39

def default
  new(default_job)
end

.jobsObject



51
52
53
# File 'lib/message/worker.rb', line 51

def jobs
  @jobs ||= RUBY_PLATFORM =~ /java/ ? java.util.concurrent.ConcurrentHashMap.new : {}
end

.process(*args) ⇒ Object



43
44
45
# File 'lib/message/worker.rb', line 43

def process(*args)
  default.process(*args)
end

.resetObject



59
60
61
62
63
# File 'lib/message/worker.rb', line 59

def reset
  @default_job = nil
  @sync = nil
  @jobs = nil
end

.start(*args) ⇒ Object



47
48
49
# File 'lib/message/worker.rb', line 47

def start(*args)
  default.start(*args)
end

Instance Method Details

#enq(work) ⇒ Object Also known as: <<



106
107
108
109
110
111
112
# File 'lib/message/worker.rb', line 106

def enq(work)
  if self.class.sync
    process_work(work)
  else
    job.enq(YAML.dump(work))
  end
end

#process(size = 1) ⇒ Object



102
103
104
# File 'lib/message/worker.rb', line 102

def process(size=1)
  job.process(size)
end

#start(options = {}) ⇒ Object



76
77
78
79
80
81
82
# File 'lib/message/worker.rb', line 76

def start(options={})
  Thread.start do
    self.work_in_thread(options) do |size|
      process(size)
    end
  end
end

#work_in_thread(options, &block) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/message/worker.rb', line 84

def work_in_thread(options, &block)
  size = options[:size] || DEFAULT_PROCESS_SIZE
  interval = options[:interval] || DEFAULT_PROCESS_INTERVAL
  delay = options[:delay] || 10 + rand(20)
  begin
    callback(:start, options)
    sleep delay if delay > 0
    loop do
      yield(size)
      sleep interval
    end
  rescue => e
    callback(:crash, e)
  ensure
    callback(:stop)
  end
end