Class: Skein::Client::Worker

Inherits:
Skein::Connected show all
Defined in:
lib/skein/client/worker.rb

Instance Attribute Summary

Attributes inherited from Skein::Connected

#channel, #connection, #context, #ident

Instance Method Summary collapse

Methods inherited from Skein::Connected

#lock

Constructor Details

#initialize(queue_name, exchange_name: nil, connection: nil, context: nil) ⇒ Worker

Instance Methods =====================================================



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/skein/client/worker.rb', line 6

def initialize(queue_name, exchange_name: nil, connection: nil, context: nil)
  super(connection: connection, context: context)

  lock do
    @reply_exchange = self.channel.default_exchange
    @queue = self.channel.queue(queue_name, durable: true)

    if (exchange_name)
      @exchange = self.channel.direct(exchange_name, durable: true)

      @queue.bind(@exchange)
    end

    @handler = Skein::Handler.for(self)

    @thread = Thread.new do
      Thread.abort_on_exception = true

      Skein::Adapter.subscribe(@queue) do |payload, delivery_tag, reply_to|
        @handler.handle(payload) do |reply_json|
          channel.acknowledge(delivery_tag, true)

          if (reply_to)
            @reply_exchange.publish(
              reply_json,
              routing_key: reply_to,
              content_type: 'application/json'
            )
          end
        end
      end
    end
  end
end

Instance Method Details

#async?Boolean

Returns:

  • (Boolean)


52
53
54
55
56
# File 'lib/skein/client/worker.rb', line 52

def async?
  # Define this method as `true` in any subclass that requires async
  # callback-style delegation.
  false
end

#closeObject



41
42
43
44
45
46
# File 'lib/skein/client/worker.rb', line 41

def close
  @thread.kill
  @thread.join

  super
end

#joinObject



48
49
50
# File 'lib/skein/client/worker.rb', line 48

def join
  @thread and @thread.join
end