Class: Watership::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/watership.rb

Instance Method Summary collapse

Constructor Details

#initialize(uri, env) ⇒ Client

Returns a new instance of Client.



69
70
71
# File 'lib/watership.rb', line 69

def initialize(uri, env)
  @uri, @env = uri, env
end

Instance Method Details

#clear_channelObject



150
151
152
153
154
155
156
157
158
159
160
# File 'lib/watership.rb', line 150

def clear_channel
  if thread[:bunny_channel].respond_to?(:close)
    begin
      thread[:bunny_channel].close
    rescue *CLEAR_CHANNEL_EXCEPTIONS => exception # the channel is already closed
      logger.error("Exception #{exception.class.name} got raised in Watership#clear_channel")
    end
  end
ensure
  thread[:bunny_channel] = nil
end

#clear_connectionObject



129
130
131
132
133
134
135
136
137
138
139
# File 'lib/watership.rb', line 129

def clear_connection
  if @bunny_connection.respond_to?(:close)
    begin
      @bunny_connection.close
    rescue *CLEAR_CONNECTION_EXCEPTIONS => exception # the connection is already closed
      logger.error("Exception #{exception.class.name} got raised in Watership#clear_connection")
    end
  end
ensure
  @bunny_connection = nil
end

#closeObject



108
109
110
111
# File 'lib/watership.rb', line 108

def close
  clear_channel
  clear_connection
end

#connect_with_queue(name, options = {}) ⇒ Object



98
99
100
101
102
# File 'lib/watership.rb', line 98

def connect_with_queue(name, options = {})
  with_channel do |channel|
    channel.queue(name, { durable: true }.merge(options)) if channel
  end
end

#connectionObject Also known as: create_connection



119
120
121
122
123
124
125
126
# File 'lib/watership.rb', line 119

def connection
  @bunny_connection ||= begin
    Bunny.new(@uri).tap { |bunny| bunny.start }
  rescue Timeout::Error => exception
    logger.error("Exception #{exception.class.name} got raised in Watership#connection")
    raise exception
  end
end

#create_channelObject



146
147
148
# File 'lib/watership.rb', line 146

def create_channel
  thread[:bunny_channel] ||= connection.create_channel
end

#enqueue(options = {}) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/watership.rb', line 73

def enqueue(options = {})
  if options[:name]
    raise ArgumentError, ":name is no longer valid. Use :queue instead"
  end

  options  = options.dup
  message  = options.delete(:message)
  name     = options.delete(:queue)
  fallback = options.delete(:fallback)

  queue = connect_with_queue(name, options)
  queue.publish(JSON.generate(message))
rescue *CONNECTION_EXCEPTIONS => exception
  fallback.call if fallback
  notify(exception)
  clear_channel
  logger.error(exception.class.name)
  raise exception
rescue StandardError => exception
  fallback.call if fallback
  notify(exception)
  logger.error(exception.class.name)
  raise exception
end

#loggerObject



167
168
169
# File 'lib/watership.rb', line 167

def logger
  Watership.logger
end

#notify(exception) ⇒ Object



162
163
164
165
# File 'lib/watership.rb', line 162

def notify(exception)
  Bugsnag.notify(exception) if defined?(Bugsnag) && @env == 'production'
  Airbrake.notify_or_ignore(exception) if defined?(Airbrake) && @env == 'production'
end

#reconnectObject



113
114
115
116
117
# File 'lib/watership.rb', line 113

def reconnect
  close
  create_channel
  true
end

#threadObject



104
105
106
# File 'lib/watership.rb', line 104

def thread
  Thread.current[:"watership_#{self.object_id}"] ||= {}
end

#with_channel {|channel| ... } ⇒ Object

Yields:

  • (channel)


141
142
143
144
# File 'lib/watership.rb', line 141

def with_channel
  channel = create_channel
  yield(channel) if channel
end