Class: Legion::Extensions::Actors::Subscription
- Inherits:
-
Object
- Object
- Legion::Extensions::Actors::Subscription
show all
- Includes:
- Concurrent::Async, Base, Helpers::Transport
- Defined in:
- lib/legion/extensions/actors/subscription.rb
Instance Method Summary
collapse
#build_default_exchange, #default_exchange, #exchanges, #messages, #queues, #transport_class, #transport_path
#actor_class, #actor_const, #actor_name, #calling_class, #calling_class_array, #from_json, #full_path, #lex_class, #lex_const, #lex_name, #normalize, #runner_class, #runner_const, #runner_name, #to_dotted_hash
Methods included from Base
#args, #check_subtask?, #enabled?, #function, #generate_task?, #manual, #runner, #use_runner?
#default_settings, included
#handle_exception, #log
#find_setting, #settings
Constructor Details
#initialize(**_options) ⇒ Subscription
Returns a new instance of Subscription.
12
13
14
15
16
17
18
|
# File 'lib/legion/extensions/actors/subscription.rb', line 12
def initialize(**_options)
super()
@queue = queue.new
rescue StandardError => e
log.fatal e.message
log.fatal e.backtrace
end
|
Instance Method Details
#block ⇒ Object
47
48
49
|
# File 'lib/legion/extensions/actors/subscription.rb', line 47
def block
false
end
|
#cancel ⇒ Object
38
39
40
41
42
43
44
45
|
# File 'lib/legion/extensions/actors/subscription.rb', line 38
def cancel
return true unless @queue.channel.active
log.debug "Closing subscription to #{@queue.name}"
@consumer.cancel
@queue.channel.close
true
end
|
#consumers ⇒ Object
51
52
53
|
# File 'lib/legion/extensions/actors/subscription.rb', line 51
def consumers
1
end
|
#create_queue ⇒ Object
20
21
22
23
24
25
26
27
|
# File 'lib/legion/extensions/actors/subscription.rb', line 20
def create_queue
queues.const_set(actor_const, Class.new(Legion::Transport::Queue))
exchange_object = default_exchange.new
queue_object = Kernel.const_get(queue_string).new
queue_object.bind(exchange_object, routing_key: actor_name)
queue_object.bind(exchange_object, routing_key: "#{lex_name}.#{actor_name}.#")
end
|
#delay_start ⇒ Object
59
60
61
|
# File 'lib/legion/extensions/actors/subscription.rb', line 59
def delay_start
0
end
|
#find_function(message = {}) ⇒ Object
91
92
93
94
95
96
97
98
|
# File 'lib/legion/extensions/actors/subscription.rb', line 91
def find_function(message = {})
return runner_function if actor_class.instance_methods(false).include?(:runner_function)
return function if actor_class.instance_methods(false).include?(:function)
return action if actor_class.instance_methods(false).include?(:action)
return message[:function] if message.key? :function
function
end
|
63
64
65
|
# File 'lib/legion/extensions/actors/subscription.rb', line 63
def include_metadata_in_message?
true
end
|
#manual_ack ⇒ Object
55
56
57
|
# File 'lib/legion/extensions/actors/subscription.rb', line 55
def manual_ack
true
end
|
#process_message(message, metadata, delivery_info) ⇒ Object
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/legion/extensions/actors/subscription.rb', line 67
def process_message(message, metadata, delivery_info)
payload = if metadata[:content_encoding] && metadata[:content_encoding] == 'encrypted/cs'
Legion::Crypt.decrypt(message, metadata[:headers]['iv'])
elsif metadata[:content_encoding] && metadata[:content_encoding] == 'encrypted/pk'
Legion::Crypt.decrypt_from_keypair(metadata[:headers][:public_key], message)
else
message
end
message = if metadata[:content_type] == 'application/json'
Legion::JSON.load(payload)
else
{ value: payload }
end
if include_metadata_in_message?
message = message.merge(metadata[:headers].transform_keys(&:to_sym))
message[:routing_key] = delivery_info[:routing_key]
end
message[:timestamp] = (message[:timestamp_in_ms] / 1000).round if message.key?(:timestamp_in_ms) && !message.key?(:timestamp)
message[:datetime] = Time.at(message[:timestamp].to_i).to_datetime.to_s if message.key?(:timestamp)
message
end
|
#queue ⇒ Object
29
30
31
32
|
# File 'lib/legion/extensions/actors/subscription.rb', line 29
def queue
create_queue unless queues.const_defined?(actor_const)
Kernel.const_get queue_string
end
|
#queue_string ⇒ Object
34
35
36
|
# File 'lib/legion/extensions/actors/subscription.rb', line 34
def queue_string
@queue_string ||= "#{queues}::#{actor_const}"
end
|
#subscribe ⇒ Object
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/legion/extensions/actors/subscription.rb', line 100
def subscribe
require 'legion/extensions/tasker/runners/updater'
sleep(delay_start)
consumer_tag = "#{Legion::Settings[:client][:name]}_#{lex_name}_#{runner_name}_#{Thread.current.object_id}"
on_cancellation = block { cancel }
@consumer = @queue.subscribe(manual_ack: manual_ack, block: false, consumer_tag: consumer_tag, on_cancellation: on_cancellation) do |delivery_info, metadata, payload|
message = process_message(payload, metadata, delivery_info)
if use_runner?
Legion::Runner.run(**message,
runner_class: runner_class,
function: find_function(message),
check_subtask: check_subtask?,
generate_task: generate_task?)
else
runner_class.send(find_function(message), **message)
end
@queue.acknowledge(delivery_info.delivery_tag) if manual_ack
cancel if Legion::Settings[:client][:shutting_down]
rescue StandardError => e
Legion::Logging.error e.message
Legion::Logging.error e.backtrace
Legion::Logging.error message
Legion::Logging.error function
@queue.reject(delivery_info.delivery_tag) if manual_ack
end
end
|