Class: Hoth::Providers::BeanstalkdProvider

Inherits:
Object
  • Object
show all
Defined in:
lib/hoth/providers/beanstalkd_provider.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(module_name = nil) ⇒ BeanstalkdProvider

Returns a new instance of BeanstalkdProvider.

Raises:

  • (ArgumentError)


14
15
16
17
18
# File 'lib/hoth/providers/beanstalkd_provider.rb', line 14

def initialize(module_name = nil)
  raise ArgumentError.new("You have to specify the module name for the beanstalkd provider") unless module_name
  @module_name = module_name
  identify_services_to_listen_for
end

Instance Attribute Details

#module_nameObject (readonly)

Returns the value of attribute module_name.



12
13
14
# File 'lib/hoth/providers/beanstalkd_provider.rb', line 12

def module_name
  @module_name
end

#services_to_listen_forObject (readonly)

Returns the value of attribute services_to_listen_for.



12
13
14
# File 'lib/hoth/providers/beanstalkd_provider.rb', line 12

def services_to_listen_for
  @services_to_listen_for
end

Instance Method Details

#listenObject



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/hoth/providers/beanstalkd_provider.rb', line 20

def listen
  EM.run {
    self.services_to_listen_for.each do |service|
      conn_for_service = EMJack::Connection.new(:host => service.endpoint.host, :port => service.endpoint.port)
      conn_for_service.watch(service.transport.tube_name)
      conn_for_service.each_job do |job|
        responsible_service = ServiceRegistry.locate_service(service.name)

        raise ServiceNotFoundException.new("The requested service '#{service.name}' was not found!") if responsible_service.nil?

        begin
          decoded_params = responsible_service.transport.encoder.decode(job.body)
          Hoth::Logger.debug "decoded_params: #{decoded_params}"
          Hoth::Services.send(service.name, *decoded_params)
        rescue => e
          Hoth::Logger.warn "An error occured while invoking the service: #{e.message}"
        ensure
          conn_for_service.delete(job)
        end
      end
    end
  } 
end