Class: NatsWork::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/natswork/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, options = {}) ⇒ Worker

Returns a new instance of Worker.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/natswork/worker.rb', line 11

def initialize(connection, options = {})
  @connection = connection
  @name = options[:name] || generate_worker_name
  @queues = Array(options[:queues] || 'default')
  @concurrency = options[:concurrency] || 10

  @running = false
  @paused = false
  @stopping = false
  @mutex = Mutex.new

  @jobs_processed = Concurrent::AtomicFixnum.new(0)
  @jobs_failed = Concurrent::AtomicFixnum.new(0)
  @active_jobs = Concurrent::AtomicFixnum.new(0)

  @heartbeat_thread = nil
  @polling_threads = []
  @started_at = nil
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



9
10
11
# File 'lib/natswork/worker.rb', line 9

def concurrency
  @concurrency
end

#connectionObject (readonly)

Returns the value of attribute connection.



9
10
11
# File 'lib/natswork/worker.rb', line 9

def connection
  @connection
end

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/natswork/worker.rb', line 9

def name
  @name
end

#queuesObject (readonly)

Returns the value of attribute queues.



9
10
11
# File 'lib/natswork/worker.rb', line 9

def queues
  @queues
end

Instance Method Details

#accepting_jobs?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/natswork/worker.rb', line 83

def accepting_jobs?
  @running && !@paused && !@stopping
end

#graceful_shutdown(timeout: 30) ⇒ Object



114
115
116
117
118
119
120
121
122
# File 'lib/natswork/worker.rb', line 114

def graceful_shutdown(timeout: 30)
  stop_thread = Thread.new { stop }
  stop_thread.join(timeout)

  return unless stop_thread.alive?

  stop_thread.kill
  force_shutdown
end

#heartbeatObject



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/natswork/worker.rb', line 101

def heartbeat
  @connection.publish('natswork.workers.heartbeat', {
                        worker_id: @name,
                        status: current_status,
                        queues: @queues,
                        concurrency: @concurrency,
                        jobs_processed: @jobs_processed.value,
                        jobs_failed: @jobs_failed.value,
                        active_jobs: @active_jobs.value,
                        timestamp: Time.now.to_f
                      })
end

#pauseObject



59
60
61
62
63
# File 'lib/natswork/worker.rb', line 59

def pause
  @mutex.synchronize do
    @paused = true
  end
end

#paused?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/natswork/worker.rb', line 75

def paused?
  @paused
end

#resumeObject



65
66
67
68
69
# File 'lib/natswork/worker.rb', line 65

def resume
  @mutex.synchronize do
    @paused = false
  end
end

#running?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/natswork/worker.rb', line 71

def running?
  @running
end

#startObject



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/natswork/worker.rb', line 31

def start
  @mutex.synchronize do
    return if @running

    @running = true
    @stopping = false
    @started_at = Time.now

    start_heartbeat
    start_polling
  end
end

#statsObject



87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/natswork/worker.rb', line 87

def stats
  {
    name: @name,
    status: current_status,
    queues: @queues,
    concurrency: @concurrency,
    jobs_processed: @jobs_processed.value,
    jobs_failed: @jobs_failed.value,
    active_jobs: @active_jobs.value,
    started_at: @started_at,
    uptime: @started_at ? Time.now - @started_at : 0
  }
end

#stopObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/natswork/worker.rb', line 44

def stop
  @mutex.synchronize do
    return unless @running

    @stopping = true
    wait_for_jobs

    stop_polling
    stop_heartbeat

    @running = false
    @stopping = false
  end
end

#stopping?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/natswork/worker.rb', line 79

def stopping?
  @stopping
end