Class: Legion::Extensions::Actors::Subscription

Inherits:
Object
  • Object
show all
Includes:
Concurrent::Async, Base, Helpers::Transport
Defined in:
lib/legion/extensions/actors/subscription.rb

Instance Method Summary collapse

Methods included from Helpers::Transport

#build_default_exchange, #default_exchange, #exchanges, #messages, #queues, #transport_class, #transport_path

Methods included from Helpers::Base

#actor_class, #actor_const, #actor_name, #calling_class, #calling_class_array, #from_json, #full_path, #lex_class, #lex_const, #lex_name, #normalize, #runner_class, #runner_const, #runner_name, #to_dotted_hash

Methods included from Base

#args, #check_subtask?, #enabled?, #function, #generate_task?, #manual, #runner, #use_runner?

Methods included from Helpers::Lex

#default_settings, included

Methods included from Helpers::Logger

#handle_exception, #log

Methods included from Helpers::Core

#find_setting, #settings

Constructor Details

#initialize(**_options) ⇒ Subscription

Returns a new instance of Subscription.



12
13
14
15
16
17
18
# File 'lib/legion/extensions/actors/subscription.rb', line 12

def initialize(**_options)
  super()
  @queue = queue.new
rescue StandardError => e
  log.fatal e.message
  log.fatal e.backtrace
end

Instance Method Details

#blockObject



47
48
49
# File 'lib/legion/extensions/actors/subscription.rb', line 47

def block
  false
end

#cancelObject



38
39
40
41
42
43
44
45
# File 'lib/legion/extensions/actors/subscription.rb', line 38

def cancel
  return true unless @queue.channel.active

  log.debug "Closing subscription to #{@queue.name}"
  @consumer.cancel
  @queue.channel.close
  true
end

#consumersObject



51
52
53
# File 'lib/legion/extensions/actors/subscription.rb', line 51

def consumers
  1
end

#create_queueObject



20
21
22
23
24
25
26
27
# File 'lib/legion/extensions/actors/subscription.rb', line 20

def create_queue
  queues.const_set(actor_const, Class.new(Legion::Transport::Queue))
  exchange_object = default_exchange.new
  queue_object = Kernel.const_get(queue_string).new

  queue_object.bind(exchange_object, routing_key: actor_name)
  queue_object.bind(exchange_object, routing_key: "#{lex_name}.#{actor_name}.#")
end

#delay_startObject



59
60
61
# File 'lib/legion/extensions/actors/subscription.rb', line 59

def delay_start
  0
end

#find_function(message = {}) ⇒ Object



91
92
93
94
95
96
97
98
# File 'lib/legion/extensions/actors/subscription.rb', line 91

def find_function(message = {})
  return runner_function if actor_class.instance_methods(false).include?(:runner_function)
  return function if actor_class.instance_methods(false).include?(:function)
  return action if actor_class.instance_methods(false).include?(:action)
  return message[:function] if message.key? :function

  function
end

#include_metadata_in_message?Boolean

Returns:

  • (Boolean)


63
64
65
# File 'lib/legion/extensions/actors/subscription.rb', line 63

def 
  true
end

#manual_ackObject



55
56
57
# File 'lib/legion/extensions/actors/subscription.rb', line 55

def manual_ack
  true
end

#process_message(message, metadata, delivery_info) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/legion/extensions/actors/subscription.rb', line 67

def process_message(message, , delivery_info)
  payload = if [:content_encoding] && [:content_encoding] == 'encrypted/cs'
              Legion::Crypt.decrypt(message, [:headers]['iv'])
            elsif [:content_encoding] && [:content_encoding] == 'encrypted/pk'
              Legion::Crypt.decrypt_from_keypair([:headers][:public_key], message)
            else
              message
            end

  message = if [:content_type] == 'application/json'
              Legion::JSON.load(payload)
            else
              { value: payload }
            end
  if 
    message = message.merge([:headers].transform_keys(&:to_sym))
    message[:routing_key] = delivery_info[:routing_key]
  end

  message[:timestamp] = (message[:timestamp_in_ms] / 1000).round if message.key?(:timestamp_in_ms) && !message.key?(:timestamp)
  message[:datetime] = Time.at(message[:timestamp].to_i).to_datetime.to_s if message.key?(:timestamp)
  message
end

#queueObject



29
30
31
32
# File 'lib/legion/extensions/actors/subscription.rb', line 29

def queue
  create_queue unless queues.const_defined?(actor_const)
  Kernel.const_get queue_string
end

#queue_stringObject



34
35
36
# File 'lib/legion/extensions/actors/subscription.rb', line 34

def queue_string
  @queue_string ||= "#{queues}::#{actor_const}"
end

#subscribeObject



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/legion/extensions/actors/subscription.rb', line 100

def subscribe
  require 'legion/extensions/tasker/runners/updater'
  sleep(delay_start)
  consumer_tag = "#{Legion::Settings[:client][:name]}_#{lex_name}_#{runner_name}_#{Thread.current.object_id}"
  on_cancellation = block { cancel }
  @consumer = @queue.subscribe(manual_ack: manual_ack, block: false, consumer_tag: consumer_tag, on_cancellation: on_cancellation) do |delivery_info, , payload|
    message = process_message(payload, , delivery_info)
    if use_runner?
      Legion::Runner.run(**message,
                         runner_class:  runner_class,
                         function:      find_function(message),
                         check_subtask: check_subtask?,
                         generate_task: generate_task?)
    else
      runner_class.send(find_function(message), **message)
    end
    @queue.acknowledge(delivery_info.delivery_tag) if manual_ack

    cancel if Legion::Settings[:client][:shutting_down]
  rescue StandardError => e
    Legion::Logging.error e.message
    Legion::Logging.error e.backtrace
    Legion::Logging.error message
    Legion::Logging.error function
    @queue.reject(delivery_info.delivery_tag) if manual_ack
  end
end