Module: Hsdq::Receiver
- Included in:
- Hsdq
- Defined in:
- lib/hsdq/receiver.rb
Overview
This module process the incoming messages and call one of 5 messages (ack, request, callback, feedback or error) depending on the type of the message.
Instance Method Summary collapse
-
#check_whitelist(spark, options) ⇒ Boolean, Hash
Call whitelisted? to verify the the topic and task are legit.
-
#get_burst(spark, _options = {}) ⇒ Object
Manage pulling: - the burst (persistent action) associated with the spark from the matching Redis hash - if needed the context data.
-
#get_spark(raw_spark) ⇒ Object
blpop return an array [list_name, data].
-
#h_spark(raw_spark) ⇒ Hash
return the spark (ephemeral part of the message) from the message list.
-
#hsdq_ack(message, context) ⇒ Object
**Placeholder for ack received.
-
#hsdq_add_authorized_tasks(*tasks) ⇒ Array
The authoriced tasks.
-
#hsdq_add_authorized_topics(*topics) ⇒ Array
The authoriced topics.
-
#hsdq_authorized_tasks(*tasks) ⇒ Array
Cached value of the tasks authorized to be processed.
-
#hsdq_authorized_topics(*topics) ⇒ Object
Cached value of the topics authorized to be processed.
-
#hsdq_authorized_types ⇒ Object
Hash of the internal authorized message types.
-
#hsdq_callback(message, context) ⇒ Object
**Placeholder for callback received.
-
#hsdq_error(message, context) ⇒ Object
**Placeholder for error received.
-
#hsdq_feedback(message, context) ⇒ Object
**Placeholder for feedback received.
-
#hsdq_ignit(raw_spark, options) ⇒ Object
Send the ACk and start the processing for the message just received.
The processing will be either executed synchronously or a new thread will be started based on the configuration. -
#hsdq_request(message, context) ⇒ String
**Placeholder for request received.
-
#hsdq_set_authorized_tasks(*tasks) ⇒ Array
The authoriced tasks.
-
#hsdq_set_authorized_topics(*topics) ⇒ Array
The authoriced topics.
-
#pull_burst(burst_p, burst_context_p) ⇒ array
Execute a multi transaction to get the burst and the context from Redis in a single call.
-
#pull_burst_only(burst_p) ⇒ Object
If there is no context this method is used instead of pull_burst.
-
#reject_spark(spark, e) ⇒ Hash
Send an error message back to the sender.
-
#response?(spark) ⇒ boolean
True if the message received is a response.
-
#send_ack(spark) ⇒ Object
Send the ack back to the sender in case of a request.
-
#set_context(spark) ⇒ Object
Save for future use context data into the thread_store.
-
#sparkle(spark, options) ⇒ Object
Entry point for the task to process, this is what is executed in the threads when a message is pulled.
-
#valid_spark?(spark, options) ⇒ Boolean, Hash
Spark validation, call valid_type?.
-
#valid_task?(spark, _options) ⇒ Boolean
test the task against the list of authorised tasks.
- #valid_topic?(spark, _options) ⇒ Boolean
-
#whitelisted?(spark, options) ⇒ Boolean
validate the topic and the task.
Instance Method Details
#check_whitelist(spark, options) ⇒ Boolean, Hash
Call whitelisted? to verify the the topic and task are legit.
164 165 166 167 168 169 170 171 |
# File 'lib/hsdq/receiver.rb', line 164 def check_whitelist(spark, ) begin whitelisted?(spark, ) ? true : (raise ArgumentError.new("Illegal argument in topic or task")) rescue => e reject_spark spark, e false end end |
#get_burst(spark, _options = {}) ⇒ Object
Manage pulling:
- the burst (persistent action) associated with the spark from the matching Redis hash
- if needed the context data
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/hsdq/receiver.rb', line 102 def get_burst(spark, ={}) # get the context parameters context_h = spark[:context] burst_p = -> { cx_data.hget hsdq_key(spark), burst_key(spark) } if response?(spark) && context_h # save previous_sender in thread_store for later reply sent_to context_h[:previous_sender] # set the proc for multi redis to pull the initial request burst_context_p = -> { cx_data.hget hsdq_key(spark), "request_#{context_h[:spark_uid]}" } # exec the redis multi burst_j, burst_context_j = pull_burst(burst_p, burst_context_p) else burst_j, burst_context_j = pull_burst_only burst_p end burst = burst_j ? (JSON.parse burst_j, {symbolize_names: true}) : {} burst_context = burst_context_j ? (JSON.parse burst_context_j, {symbolize_names: true}) : {} [burst, burst_context] end |
#get_spark(raw_spark) ⇒ Object
blpop return an array [list_name, data]
48 49 50 |
# File 'lib/hsdq/receiver.rb', line 48 def get_spark(raw_spark) raw_spark.kind_of?(Array) ? raw_spark.last : raw_spark end |
#h_spark(raw_spark) ⇒ Hash
return the spark (ephemeral part of the message) from the message list
55 56 57 |
# File 'lib/hsdq/receiver.rb', line 55 def h_spark(raw_spark) JSON.parse get_spark(raw_spark), {symbolize_names: true} end |
#hsdq_ack(message, context) ⇒ Object
Placeholder for ack received. You must override hsdq_request in your HsdqXxx class
17 |
# File 'lib/hsdq/receiver.rb', line 17 def hsdq_ack(, context); placeholder; end |
#hsdq_add_authorized_tasks(*tasks) ⇒ Array
241 242 243 |
# File 'lib/hsdq/receiver.rb', line 241 def (*tasks) = [, tasks].flatten end |
#hsdq_add_authorized_topics(*topics) ⇒ Array
247 248 249 |
# File 'lib/hsdq/receiver.rb', line 247 def (*topics) = [, topics].flatten end |
#hsdq_authorized_tasks(*tasks) ⇒ Array
Cached value of the tasks authorized to be processed
210 211 212 213 214 215 216 |
# File 'lib/hsdq/receiver.rb', line 210 def (*tasks) if tasks.any? = [tasks].flatten else ||= [hsdq_opts[:tasks]].flatten end end |
#hsdq_authorized_topics(*topics) ⇒ Object
Cached value of the topics authorized to be processed
219 220 221 222 223 224 225 |
# File 'lib/hsdq/receiver.rb', line 219 def (*topics) if topics.any? = [topics].flatten else ||= [hsdq_opts[:topics]].flatten end end |
#hsdq_authorized_types ⇒ Object
Hash of the internal authorized message types
203 204 205 |
# File 'lib/hsdq/receiver.rb', line 203 def [:request, :ack, :feedback, :callback, :error] end |
#hsdq_callback(message, context) ⇒ Object
Placeholder for callback received. You must override hsdq_request in your HsdqXxx class
20 |
# File 'lib/hsdq/receiver.rb', line 20 def hsdq_callback(, context); placeholder; end |
#hsdq_error(message, context) ⇒ Object
Placeholder for error received. You must override hsdq_request in your HsdqXxx class
26 |
# File 'lib/hsdq/receiver.rb', line 26 def hsdq_error(, context); placeholder; end |
#hsdq_feedback(message, context) ⇒ Object
Placeholder for feedback received. You must override hsdq_request in your HsdqXxx class
23 |
# File 'lib/hsdq/receiver.rb', line 23 def hsdq_feedback(, context); placeholder; end |
#hsdq_ignit(raw_spark, options) ⇒ Object
Send the ACk and start the processing for the message just received.
The processing will be either executed synchronously or a new thread will be started based on the configuration.
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/hsdq/receiver.rb', line 33 def hsdq_ignit(raw_spark, ) spark = h_spark raw_spark send_ack spark if valid_spark? spark, if hsdq_opts[:threaded] # :nocov: hsdq_start_thread -> { sparkle spark, } # :nocov: else sparkle spark, end end end |
#hsdq_request(message, context) ⇒ String
Placeholder for request received. You must override hsdq_request in your HsdqXxx class
After this method has run, there will be no further processing.
14 |
# File 'lib/hsdq/receiver.rb', line 14 def hsdq_request(, context); placeholder; end |
#hsdq_set_authorized_tasks(*tasks) ⇒ Array
229 230 231 |
# File 'lib/hsdq/receiver.rb', line 229 def (*tasks) = tasks.flatten end |
#hsdq_set_authorized_topics(*topics) ⇒ Array
235 236 237 |
# File 'lib/hsdq/receiver.rb', line 235 def (*topics) = topics.flatten end |
#pull_burst(burst_p, burst_context_p) ⇒ array
Execute a multi transaction to get the burst and the context from Redis in a single call
128 129 130 131 132 133 |
# File 'lib/hsdq/receiver.rb', line 128 def pull_burst(burst_p, burst_context_p) cx_data.multi do burst_p.call burst_context_p.call end end |
#pull_burst_only(burst_p) ⇒ Object
If there is no context this method is used instead of pull_burst
138 139 140 |
# File 'lib/hsdq/receiver.rb', line 138 def pull_burst_only(burst_p) [burst_p.call, nil] end |
#reject_spark(spark, e) ⇒ Hash
Send an error message back to the sender
182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/hsdq/receiver.rb', line 182 def reject_spark(spark, e) error = { sent_to: spark[:sender], uid: spark[:uid], sender: channel, params: spark, data: e. } puts "sending error message: #{error}" hsdq_send_error error error end |
#response?(spark) ⇒ boolean
264 265 266 |
# File 'lib/hsdq/receiver.rb', line 264 def response?(spark) [:callback, :feedback, :error].include? spark[:type].to_sym end |
#send_ack(spark) ⇒ Object
Send the ack back to the sender in case of a request
196 197 198 199 200 |
# File 'lib/hsdq/receiver.rb', line 196 def send_ack(spark) return unless ['request', :request].include? spark[:type] ack_msg = spark.merge sent_to: spark[:sender], sender: channel hsdq_send_ack ack_msg end |
#set_context(spark) ⇒ Object
Save for future use context data into the thread_store
90 91 92 93 94 95 |
# File 'lib/hsdq/receiver.rb', line 90 def set_context(spark) # store in thread_store for later use sent_to spark[:sender] previous_sender spark[:sender] context_params({ reply_to: spark[:previous_sender], spark_uid: spark[:spark_uid]}) end |
#sparkle(spark, options) ⇒ Object
Entry point for the task to process, this is what is executed in the threads when a message is pulled.
- Pull the burst (line with the request or response) from the the hash
- Pull the context related to a response if it exist
- Set values for the next hop context in case of a request.
- Call one of the 5 methods (request, ack, callback, feedback, error) in your hsdqXxx class (or the placeholder) based on the message type
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/hsdq/receiver.rb', line 67 def sparkle(spark, ) puts spark.inspect burst, ctx_burst = get_burst spark, context ctx_burst case spark[:type].to_sym when :ack hsdq_ack burst, context when :callback hsdq_callback burst, context when :feedback hsdq_feedback burst, context when :error hsdq_error burst, context when :request set_context spark hsdq_request burst, context end end |
#valid_spark?(spark, options) ⇒ Boolean, Hash
Spark validation, call valid_type?. If invalid:
- an error is sent back to the sender
- false is returned to the processing to stop the action.
149 150 151 152 153 154 155 156 157 |
# File 'lib/hsdq/receiver.rb', line 149 def valid_spark?(spark, ) begin raise ArgumentError.new("Illegal type #{spark[:type]}") unless valid_type? spark[:type] 'request' == spark[:type] ? check_whitelist(spark, ) : true rescue => e reject_spark spark, e false end end |
#valid_task?(spark, _options) ⇒ Boolean
test the task against the list of authorised tasks
253 254 255 256 |
# File 'lib/hsdq/receiver.rb', line 253 def valid_task?(spark, ) return true if spark[:task].nil? || .empty? .include?(spark[:task].to_sym) end |
#valid_topic?(spark, _options) ⇒ Boolean
258 259 260 261 |
# File 'lib/hsdq/receiver.rb', line 258 def valid_topic?(spark, ) return true if spark[:topic].nil? || .empty? .include?(spark[:topic].to_sym) end |
#whitelisted?(spark, options) ⇒ Boolean
validate the topic and the task
174 175 176 |
# File 'lib/hsdq/receiver.rb', line 174 def whitelisted?(spark, ) valid_topic?(spark, ) && valid_task?(spark, ) end |