Class: Appfuel::Service::Worker

Inherits:
Object
  • Object
show all
Includes:
Application::AppContainer, Application::Dispatcher, Sneakers::Worker
Defined in:
lib/appfuel/service/worker.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.allow_all_action?Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/appfuel/service/worker.rb', line 31

def allow_all_action?
  @allow_all_actions
end

.container_class_typeObject



27
28
29
# File 'lib/appfuel/service/worker.rb', line 27

def container_class_type
  'consumers'
end

.inherited(klass) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/appfuel/service/worker.rb', line 10

def inherited(klass)
  container = klass.app_container
  consumer_key = "#{klass.top_container_key}.consumer_keys"
  unless container.key?(consumer_key)
    container.register(consumer_key, [])
  end
  key = klass.container_class_path
  container.register(key, klass)
  container[consumer_key] << key

  @allow_all_actions = true
end

.register(routes) ⇒ Object



35
36
37
38
39
40
41
42
43
44
# File 'lib/appfuel/service/worker.rb', line 35

def register(routes)
  @allow_all_actions = false

  routes = [routes] if routes.is_a?(String)

  unless routes.is_a?(Array)
    fail "register accepts only String or Array"
  end
  @registered_actions = routes
end

.registered?(route) ⇒ Boolean

Returns:

  • (Boolean)


50
51
52
53
54
# File 'lib/appfuel/service/worker.rb', line 50

def registered?(route)
  return true if allowed_all_actions? == true

  registered_actions.include?(route)
end

.registered_actionsObject



46
47
48
# File 'lib/appfuel/service/worker.rb', line 46

def registered_actions
  @registered_actions ||= []
end

.top_container_keyObject



23
24
25
# File 'lib/appfuel/service/worker.rb', line 23

def top_container_key
  "message_brokers"
end

Instance Method Details

#loggerObject



115
116
117
# File 'lib/appfuel/service/worker.rb', line 115

def logger
  @logger ||= app_container[:logger]
end

#publish_rpc(request, response) ⇒ Nil

Publish a response for the rpc request.

Parameters:

  • request (MsgRequest)
  • respons (Appfuel::Response)

Returns:

  • (Nil)


105
106
107
108
109
110
111
112
113
# File 'lib/appfuel/service/worker.rb', line 105

def publish_rpc(request, response)
  options = {
    correlation_id: request.correlation_id,
    routing_key: request.reply_to,
    headers: { "action_route" => request.action_route }
  }
  publish(response.to_json, options)
  nil
end

#rpc?(properties) ⇒ Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/appfuel/service/worker.rb', line 96

def rpc?(properties)
  properties.correlation_id && properties.reply_to
end

#work_with_params(msg, delivery_info, properties) ⇒ Appfuel::Response

Sneakers worker hook to handle messages from RabbitMQ

Parameters:

  • msg (String)

    JSON string of inputs

  • delivery_info (Bunny::Delivery::Info)
  • properties (Bunny::MessageProperties)

Returns:

  • (Appfuel::Response)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/appfuel/service/worker.rb', line 64

def work_with_params(msg, delivery_info, properties)
  begin
    request  = create_request(msg, delivery_info, properties)
  rescue => e
    request = create_request('{}', delivery_info, properties)
    handle_exception("failed to build request", e, request)
    return ack!
  end

  handle_invalid_routes(request.action_route) do |exception|
    handle_exception("dispatch not allowed", exception, request)
    return ack!
  end

  begin
    response = dispatch(request, app_container)
  rescue => e
    handle_exception("failed to dispatch", e, request)
    return ack!
  end

  if response.failure?
    logger.error "[#{request.action_route}] #{response.errors.format}"
  end

  if rpc?(properties)
    publish_rpc(request, response)
  end

  ack!
end