Module: RabbitJobs

Extended by:
RabbitJobs
Included in:
RabbitJobs
Defined in:
lib/rabbit_jobs.rb,
lib/rabbit_jobs/util.rb,
lib/rabbit_jobs/worker.rb,
lib/rabbit_jobs/helpers.rb,
lib/rabbit_jobs/version.rb,
lib/rabbit_jobs/publisher.rb,
lib/rabbit_jobs/scheduler.rb,
lib/rabbit_jobs/amqp_helper.rb,
lib/rabbit_jobs/error_mailer.rb,
lib/rabbit_jobs/configuration.rb

Defined Under Namespace

Modules: Helpers, Job, Publisher Classes: AmqpHelper, Configuration, ErrorMailer, Scheduler, Util, Worker

Constant Summary collapse

VERSION =
"0.6.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#loggerObject



115
116
117
118
119
120
121
122
123
# File 'lib/rabbit_jobs.rb', line 115

def logger
  unless @logger
    @logger = Logger.new($stdout)
    @logger.level = Logger::INFO
    @logger.formatter = nil
    @logger.progname = 'rj'
  end
  @logger
end

Instance Method Details

#_run_after_fork_callbacksObject



131
132
133
134
135
136
# File 'lib/rabbit_jobs.rb', line 131

def _run_after_fork_callbacks
  @_after_fork_callbacks ||= []
  @_after_fork_callbacks.each { |callback|
    callback.call
  }
end

#after_fork(&block) ⇒ Object



125
126
127
128
129
# File 'lib/rabbit_jobs.rb', line 125

def after_fork(&block)
  raise unless block_given?
  @_after_fork_callbacks ||= []
  @_after_fork_callbacks << block
end

#before_process_message(&block) ⇒ Object



138
139
140
141
142
# File 'lib/rabbit_jobs.rb', line 138

def before_process_message(&block)
  raise unless block_given?
  @before_process_message_callbacks ||= []
  @before_process_message_callbacks << block
end

#configObject



14
15
16
# File 'lib/rabbit_jobs/configuration.rb', line 14

def config
  @@configuration ||= load_config
end

#configure {|@@configuration| ... } ⇒ Object

Yields:

  • (@@configuration)


9
10
11
12
# File 'lib/rabbit_jobs/configuration.rb', line 9

def configure
  @@configuration ||= Configuration.new
  yield @@configuration if block_given?
end

#direct_publish_to(routing_key, payload, ex = {}, &block) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rabbit_jobs.rb', line 83

def direct_publish_to(routing_key, payload, ex = {}, &block)
  if RJ.running?
    RJ::Publisher.direct_publish_to(routing_key, payload, ex, &block)
  else
    RJ.run {
      RJ::Publisher.direct_publish_to(routing_key, payload, ex) {
        RJ.stop {
          yield if block_given?
        }
      }
    }
  end
end

#load_config(config_file = nil) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rabbit_jobs/configuration.rb', line 18

def load_config(config_file = nil)
  @@configuration ||= nil

  config_file ||= defined?(Rails) && Rails.respond_to?(:root) && Rails.root.join('config/rabbit_jobs.yml')
  if config_file
    if File.exists?(config_file)
      @@configuration ||= Configuration.new
      @@configuration.load_file(config_file)
    end
  end

  unless @@configuration
    self.configure do |c|
      c.prefix 'rabbit_jobs'
    end
  end

  @@configuration
end

#publish(klass, *params, &block) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/rabbit_jobs.rb', line 55

def publish(klass, *params, &block)
  if RJ.running?
    RJ::Publisher.publish(klass, *params, &block)
  else
    RJ.run {
      RJ::Publisher.publish(klass, *params) {
        RJ.stop {
          yield if block_given?
        }
      }
    }
  end
end

#publish_to(routing_key, klass, *params, &block) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/rabbit_jobs.rb', line 69

def publish_to(routing_key, klass, *params, &block)
  if RJ.running?
    RJ::Publisher.publish_to(routing_key, klass, *params, &block)
  else
    RJ.run {
      RJ::Publisher.publish_to(routing_key, klass, *params) {
        RJ.stop {
          yield if block_given?
        }
      }
    }
  end
end

#purge_queue(*routing_keys, &block) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/rabbit_jobs.rb', line 97

def purge_queue(*routing_keys, &block)
  if RJ.running?
    RJ::Publisher.purge_queue(*routing_keys, &block)
  else
    messages_count = 0
    RJ.run {
      RJ::Publisher.purge_queue(*routing_keys) { |count|
        messages_count = count
        RJ.stop {
          yield(count) if block_given?
        }
      }
    }
    messages_count
  end
end

#run_before_process_message_callbacksObject



144
145
146
147
148
149
150
# File 'lib/rabbit_jobs.rb', line 144

def run_before_process_message_callbacks
  @before_process_message_callbacks ||= []
  @before_process_message_callbacks.each { |callback|
    return false unless callback.call
  }
  return true
end

#running?Boolean

Returns:

  • (Boolean)


51
52
53
# File 'lib/rabbit_jobs.rb', line 51

def running?
  EM.reactor_running?
end

#startObject Also known as: run



22
23
24
25
26
27
28
29
# File 'lib/rabbit_jobs.rb', line 22

def start
  raise unless block_given?
  raise if EM.reactor_running?

  EM.run {
    yield
  }
end

#stopObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rabbit_jobs.rb', line 33

def stop
  RJ::Publisher.cleanup

  if AMQP.connection
    AMQP.connection.disconnect {
      AMQP.connection = nil
      AMQP.channel = nil
      EM.stop {
        yield if block_given?
      }
    }
  else
    EM.stop {
      yield if block_given?
    }
  end
end