Class: Emissary::Operator

Inherits:
Object show all
Includes:
OperatorStatistics
Defined in:
lib/emissary/operator.rb,
lib/emissary/operator/amqp.rb

Defined Under Namespace

Modules: AMQP

Constant Summary collapse

DEFAULT_STATUS_INTERVAL =
3600
DEFAULT_MAX_WORKERS =
50
MAX_WORKER_TTL =
60

Constants included from OperatorStatistics

Emissary::OperatorStatistics::RX_COUNT_MUTEX, Emissary::OperatorStatistics::TX_COUNT_MUTEX

Instance Attribute Summary collapse

Attributes included from OperatorStatistics

#rx_count, #tx_count

Class Method Summary collapse

Instance Method Summary collapse

Methods included from OperatorStatistics

#increment_rx_count, #increment_tx_count

Constructor Details

#initialize(config, *args) ⇒ Operator

Returns a new instance of Operator.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/emissary/operator.rb', line 87

def initialize(config, *args)
  @config    = config
  @workers   = (args[0][:max_workers] || DEFAULT_MAX_WORKERS rescue DEFAULT_MAX_WORKERS)

  @agents    = WorkQueue.new(@workers, nil, MAX_WORKER_TTL)
  @publisher = WorkQueue.new(@workers, nil, MAX_WORKER_TTL)

  @timer     = nil
  @stats     = WorkQueue.new(1, nil, MAX_WORKER_TTL)

  @rx_count  = 0
  @tx_count  = 0

  @shutting_down = false
  @connected = false
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



64
65
66
# File 'lib/emissary/operator.rb', line 64

def config
  @config
end

#shutting_downObject (readonly)

Returns the value of attribute shutting_down.



64
65
66
# File 'lib/emissary/operator.rb', line 64

def shutting_down
  @shutting_down
end

#signatureObject (readonly)

Returns the value of attribute signature.



64
65
66
# File 'lib/emissary/operator.rb', line 64

def signature
  @signature
end

Class Method Details

.new(config, *args) ⇒ Object

Override .new so subclasses don’t have to call super and can ignore connection-specific arguments



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/emissary/operator.rb', line 69

def self.new(config, *args)
  allocate.instance_eval do
    # Store signature
    @signature = config[:signature]

    # Call a superclass's #initialize if it has one
    initialize(config, *args)

    # post initialize callback
    post_init

    # set signature nil
    @signature ||= Digest::MD5.hexdigest(config.to_s)
    
    self
  end
end

Instance Method Details

#acknowledge(message) ⇒ Object



121
122
# File 'lib/emissary/operator.rb', line 121

def acknowledge message
end

#closeObject

Raises:

  • (NotImplementedError)


131
132
133
# File 'lib/emissary/operator.rb', line 131

def close
  raise NotImplementedError, 'The close method must be defined by the operator module'
end

#connectObject

Raises:

  • (NotImplementedError)


109
110
111
# File 'lib/emissary/operator.rb', line 109

def connect
  raise NotImplementedError, 'The connect method must be defined by the operator module'
end

#connected?Boolean

Returns:

  • (Boolean)


104
# File 'lib/emissary/operator.rb', line 104

def connected?() @connected; end

#disconnectObject



143
144
145
146
# File 'lib/emissary/operator.rb', line 143

def disconnect
  close
  @connected = false
end

#enabled?(what) ⇒ Boolean

Returns:

  • (Boolean)


171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/emissary/operator.rb', line 171

def enabled? what
  unless [ :startup, :shutdown, :stats ].include? what.to_sym
    Emissary.logger.debug "Testing '#{what}' - it's disabled. Not a valid option."
    return false
  end
  
  unless config[what]
    Emissary.logger.debug "Testing '#{what}' - it's disabled. Missing from configuration."
    return false
  end
  
  if (config[:disable]||[]).include? what.to_s
    Emissary.logger.debug "Testing '#{what}' - it's disabled. Listed in 'disable' configuration option."
    return false
  end
  
  Emissary.logger.debug "Testing '#{what}' - it's enabled.."
  return true
end

#notify(type) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/emissary/operator.rb', line 241

def notify type
  return unless enabled? type and EM.reactor_running?
  
  message = Emissary::Message.new(:data => { :agent => :emissary, :method => type })
  case type
    when :startup, :shutdown
      message.recipient = config[type]
    when :stats
      message.agent = :stats
      message.method = :gather
  end

  Emissary.logger.notice "Running #{type.to_s.capitalize} Notifier"
  receive message
end

#post_initObject



106
107
# File 'lib/emissary/operator.rb', line 106

def post_init
end

#receive(message) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/emissary/operator.rb', line 199

def receive message
  @agents.enqueue_b {
    begin
      raise message.errors.first unless message.errors.empty? or not message.errors.first.is_a? Exception
      Emissary.logger.debug " ---> [DISPATCHER] Dispatching new message ... "
      Emissary.dispatch(message, config, self).activate
      # ack message if need be (operator dependant)
      received message
    rescue ::Emissary::Error::InvalidMessageFormat => e
      Emissary.logger.warning e.message
      rejected message, :requeue => true
      # if it was an encoding error, then we are done - nothing more we can do
    rescue Exception => e
      Emissary.logger.error "AgentThread Error: #{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
      send message.error(e)
      rejected message, :requeue => true
    else
      increment_rx_count
    end
    Emissary.logger.debug " ---> [DISPATCHER] tasks/workers: #{@agents.cur_tasks}/#{@agents.cur_threads}"
  }
end

#received(message) ⇒ Object



191
192
193
# File 'lib/emissary/operator.rb', line 191

def received message
  acknowledge message
end

#reject(message, requeue = true) ⇒ Object



124
125
# File 'lib/emissary/operator.rb', line 124

def reject message, requeue = true
end

#rejected(message, opts = { :requeue => true }) ⇒ Object



195
196
197
# File 'lib/emissary/operator.rb', line 195

def rejected message, opts = { :requeue => true }
  reject message, opts
end

#runObject



135
136
137
138
139
140
141
# File 'lib/emissary/operator.rb', line 135

def run
  @connected = !!connect
  subscribe 
  schedule_statistics_gatherer
  notify :startup
  connected?
end

#schedule_statistics_gathererObject



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/emissary/operator.rb', line 257

def schedule_statistics_gatherer
  stats_interval = enabled?(:stats) && config[:stats][:interval] ? config[:stats][:interval].to_i : DEFAULT_STATUS_INTERVAL
  
  # setup agent to process sending of messages
  @timer = EM.add_periodic_timer(stats_interval) do
    rx = rx_count; tx = tx_count
    rx_throughput = sprintf "%0.4f", (rx.to_f / stats_interval.to_f)
    tx_throughput = sprintf "%0.4f", (tx.to_f / stats_interval.to_f)
    
    Emissary.logger.notice "[statistics] publisher tasks/workers: #{@publisher.cur_tasks}/#{@publisher.cur_threads}"
    Emissary.logger.notice "[statistics] dispatcher tasks/workers: #{@agents.cur_tasks}/#{@agents.cur_threads}"
    Emissary.logger.notice "[statistics] #{tx} in #{stats_interval} seconds - tx rate: #{tx_throughput}/sec"
    Emissary.logger.notice "[statistics] #{rx} in #{stats_interval} seconds - rx rate: #{rx_throughput}/sec"
    
    notify :stats
  end
end

#send(message) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/emissary/operator.rb', line 222

def send message
  @publisher.enqueue_b {
    Emissary.logger.debug " ---> [PUBLISHER]  Sending new message ... "
    begin
      unless message.will_loop?
        Emissary.logger.debug "[PUBLISHER] -- Sending message..."
        send_data message
        increment_tx_count
      else
        Emissary.logger.notice "Not sending message destined for myself - would loop."
      end
    rescue Exception => e
      Emissary.logger.error "PublisherThread Error: #{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
      @shutting_down = true
    end
    Emissary.logger.debug " ---> [PUBLISHER]  tasks/workers: #{@publisher.cur_tasks}/#{@publisher.cur_threads}"
  }
end

#send_dataObject

Raises:

  • (NotImplementedError)


127
128
129
# File 'lib/emissary/operator.rb', line 127

def send_data
  raise NotImplementedError, 'The send_data method must be defined by the operator module'
end

#shutdown!Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/emissary/operator.rb', line 150

def shutdown!
  unless shutting_down?
    @shutting_down = true
  
    Emissary.logger.info "Cancelling periodic timer for statistics gatherer..."
    @timer.cancel
    
    Emissary.logger.notice "Shutting down..."
    notify :shutdown
  
    Emissary.logger.info "Shutting down agent workqueue..."
    @agents.join
  
    Emissary.logger.info "Shutting down publisher workqueue..."
    @publisher.join
  
    Emissary.logger.info "Disconnecting..."
    disconnect
  end
end

#shutting_down?Boolean

Returns:

  • (Boolean)


148
# File 'lib/emissary/operator.rb', line 148

def shutting_down?() @shutting_down; end

#subscribeObject

Raises:

  • (NotImplementedError)


113
114
115
# File 'lib/emissary/operator.rb', line 113

def subscribe
  raise NotImplementedError, 'The subscrie method must be defined by the operator module'
end

#unsubscribeObject

Raises:

  • (NotImplementedError)


117
118
119
# File 'lib/emissary/operator.rb', line 117

def unsubscribe
  raise NotImplementedError, 'The unsubscribe method must be defined by the operator module'
end