Class: Liebre::Actor::RPC::Server::Core

Inherits:
Object
  • Object
show all
Includes:
Concurrent::Async
Defined in:
lib/liebre/actor/rpc/server/core.rb

Constant Summary collapse

OPTS =
{:block => false, :manual_ack => false}

Instance Method Summary collapse

Constructor Details

#initialize(server, resources, context, callback_class) ⇒ Core

Returns a new instance of Core.



10
11
12
13
14
15
# File 'lib/liebre/actor/rpc/server/core.rb', line 10

def initialize server, resources, context, callback_class
  @server         = server
  @resources      = resources
  @context        = context
  @callback_class = callback_class
end

Instance Method Details

#cleanObject



46
47
48
49
# File 'lib/liebre/actor/rpc/server/core.rb', line 46

def clean
  queue.delete
  exchange.delete
end

#failed(meta, error) ⇒ Object



43
44
# File 'lib/liebre/actor/rpc/server/core.rb', line 43

def failed meta, error
end

#handle(meta, payload) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/liebre/actor/rpc/server/core.rb', line 28

def handle meta, payload
  callback = callback_class.new(server, meta)

  handler.call(payload, meta, callback) do |error|
    callback.failed(error)
  end
end

#reply(meta, response, opts = {}) ⇒ Object



36
37
38
39
40
41
# File 'lib/liebre/actor/rpc/server/core.rb', line 36

def reply meta, response, opts = {}
  opts = opts.merge :routing_key    => meta.reply_to,
                    :correlation_id => meta.correlation_id

  exchange.publish(response, opts)
end

#startObject



17
18
19
20
21
22
# File 'lib/liebre/actor/rpc/server/core.rb', line 17

def start
  queue.subscribe(OPTS) do |info, meta, payload|
    server.handle(meta, payload)
  end
  exchange
end

#stopObject



24
25
26
# File 'lib/liebre/actor/rpc/server/core.rb', line 24

def stop
  chan.close
end