Class: Refinery::EventPublisher

Inherits:
Object
  • Object
show all
Includes:
Configurable, Loggable, Queueable, Utilities
Defined in:
lib/refinery/event_publisher.rb

Overview

Publish events.

Constant Summary collapse

STARTING =

:nodoc:

'starting'
RUNNING =

:nodoc:

'running'
STOPPED =

:nodoc:

'stopped'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#camelize, #decode_message, #encode_message, #host_info

Methods included from Queueable

#queue, #with_queue

Methods included from Configurable

#config, #config=

Methods included from Loggable

#logger

Constructor Details

#initialize(options = {}) ⇒ EventPublisher

Initialize the event publisher

Options:

  • :debug: Set to true to enable debug logging

  • :config: Provide a file path to load that config



20
21
22
23
24
25
# File 'lib/refinery/event_publisher.rb', line 20

def initialize(options={})
  logger.level = Logger::INFO if options[:verbose]
  logger.level = Logger::DEBUG if options[:debug]
  config.load_file(options[:config]) if options[:config]
  self.publishers_directory = options[:publishers] if options[:publishers]
end

Instance Attribute Details

#publishers_directoryObject

The directory where publishers are found. Defaults to ./publishers



38
39
40
# File 'lib/refinery/event_publisher.rb', line 38

def publishers_directory
  @publishers_directory
end

Instance Method Details

#publishersObject

A hash of all publisher classes mapped to last modified timestamps.



43
44
45
# File 'lib/refinery/event_publisher.rb', line 43

def publishers
  @publishers ||= {}
end

#runObject

Run the event publisher



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/refinery/event_publisher.rb', line 60

def run
  @state = RUNNING
  logger.info "Starting event publisher"
  config['processors'].each do |key, settings|
    run_publisher(key, settings)
  end
  
  begin
    threads.each { |thread| thread.join }
  rescue Interrupt => e
  end
  
  logger.info "Exiting event publisher"
end

#run_once(key) ⇒ Object

Run the specified publisher once and return

Raises:

  • (RuntimeError)


48
49
50
51
52
53
54
55
56
57
# File 'lib/refinery/event_publisher.rb', line 48

def run_once(key)
  prefix = config['prefix'] || ''
  settings = config['processors'][key]
  raise RuntimeError, "No processor configuration found for #{key}" unless settings
  queue_name = settings['queue'] || key
  queue_name = "#{prefix}#{queue_name}"
  waiting_queue_name = "#{queue_name}_waiting"
  logger.debug "Using queue #{waiting_queue_name}"
  load_publisher_class(key).new(waiting_queue_name).execute
end

#running?Boolean

Return true if the event publisher is running

Returns:

  • (Boolean)


33
34
35
# File 'lib/refinery/event_publisher.rb', line 33

def running?
  state == RUNNING
end

#stateObject

Get the event publisher state



28
29
30
# File 'lib/refinery/event_publisher.rb', line 28

def state
  @state ||= STARTING
end