Class: ActiveMessaging::ThreadedPoller

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/activemessaging/threaded_poller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection = 'default', configuration = {}) ⇒ ThreadedPoller

connection is a string, name of the connection from broker.yml to use for this threaded poller instance

configuration is a list of hashes each has describes a group of worker threads for each group, define what priorities those workers will process

[
  {
    :pool_size  => 1       # number of workers of this type
    :priorities => [1,2,3] # what message priorities this thread will process
  }
]


33
34
35
36
37
# File 'lib/activemessaging/threaded_poller.rb', line 33

def initialize(connection='default', configuration={})
  # default config is a pool size of 3 worker threads
  self.configuration = configuration || [{:pool_size => 3}]
  self.connection = connection
end

Instance Attribute Details

#busyObject

Returns the value of attribute busy.



18
19
20
# File 'lib/activemessaging/threaded_poller.rb', line 18

def busy
  @busy
end

#configurationObject

Returns the value of attribute configuration.



18
19
20
# File 'lib/activemessaging/threaded_poller.rb', line 18

def configuration
  @configuration
end

#connectionObject

Returns the value of attribute connection.



18
19
20
# File 'lib/activemessaging/threaded_poller.rb', line 18

def connection
  @connection
end

#receiverObject

Returns the value of attribute receiver.



18
19
20
# File 'lib/activemessaging/threaded_poller.rb', line 18

def receiver
  @receiver
end

#runningObject

Returns the value of attribute running.



18
19
20
# File 'lib/activemessaging/threaded_poller.rb', line 18

def running
  @running
end

#workersObject

Returns the value of attribute workers.



18
19
20
# File 'lib/activemessaging/threaded_poller.rb', line 18

def workers
  @workers
end

Instance Method Details

#died(worker, reason) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/activemessaging/threaded_poller.rb', line 120

def died(worker, reason)
  busy.delete(worker)

  if running
    logger.info "uh oh, #{worker.inspect} died because of #{reason.class}"
    worker = Worker.new_link(current_actor)
    workers << worker
    receive(worker)
  else
    logger.info "check to see if busy is empty: #{busy.inspect}"
    if busy.empty?
      logger.info "all died: signal stopped"
      after(0){ signal(:shutdown) }
    end
  end
end

#dispatch(message, worker) ⇒ Object



99
100
101
102
103
# File 'lib/activemessaging/threaded_poller.rb', line 99

def dispatch(message, worker)
  workers.delete(worker)
  busy << worker
  worker.execute!(message)
end

#executed(worker) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/activemessaging/threaded_poller.rb', line 105

def executed(worker)
  busy.delete(worker)

  if running
    workers << worker
    receive(worker)
  else
    worker.terminate if worker.alive?
    if busy.empty?
      logger.info "all executed: signal stopped"
      after(0) { signal(:shutdown) }
    end
  end
end

#log_statusObject

recursive method, uses celluloid ‘after’ to keep calling



89
90
91
92
93
# File 'lib/activemessaging/threaded_poller.rb', line 89

def log_status
  return unless logger.debug?
  logger.debug("ActiveMessaging::ThreadedPoller: conn:#{connection}, #{workers.count}, #{busy.count}, #{running}")
  after(10){ log_status }
end

#loggerObject



141
# File 'lib/activemessaging/threaded_poller.rb', line 141

def logger; ActiveMessaging.logger; end

#receive(worker) ⇒ Object



95
96
97
# File 'lib/activemessaging/threaded_poller.rb', line 95

def receive(worker)
  receiver.receive!(worker) if (receiver && running && worker)
end

#startObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/activemessaging/threaded_poller.rb', line 39

def start
  logger.info "ActiveMessaging::ThreadedPoller start"

  # these are workers ready to use
  self.workers = []
  
  # these are workers already working
  self.busy = []
  
  # this indicates if we are running or not, helps threads to stop gracefully
  self.running = true
  
  # subscribe will create the connections based on subscriptions in processsors
  # (you can't find or use the connection until it is created by calling this)
  ActiveMessaging::Gateway.subscribe
  
  # create a message receiver actor, ony need one, using connection
  receiver_connection = ActiveMessaging::Gateway.connection(connection)
  self.receiver = MessageReceiver.new(current_actor, receiver_connection)
  
  # start the workers based on the config
  configuration.each do |c|
    (c[:pool_size] || 1).times{ self.workers << Worker.new_link(current_actor, c) }
  end

  # once all workers are created, start them up
  self.workers.each{|worker| receive(worker)}

  # in debug level, log info about workers every 10 seconds
  log_status
end

#stopObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/activemessaging/threaded_poller.rb', line 71

def stop
  logger.info "ActiveMessaging::ThreadedPoller stop"
  # indicates to all busy workers not to pick up another messages, but does not interrupt
  # also indicates to the message receiver to stop getting more messages 
  self.running = false

  # tell each waiting worker to shut down.  Running ones will be allowed to finish
  receiver.terminate! if receiver.alive?
  logger.info "ActiveMessaging::ThreadedPoller receiver terminated"

  workers.each { |w| w.terminate! if w.alive? }
  logger.info "ActiveMessaging::ThreadedPoller workers terminated"


  after(0) { signal(:shutdown) } if stopped?
end

#stopped?Boolean

Returns:

  • (Boolean)


137
138
139
# File 'lib/activemessaging/threaded_poller.rb', line 137

def stopped?
  (!running && busy.empty?)
end