Module: Hsdq::Receiver

Included in:
Hsdq
Defined in:
lib/hsdq/receiver.rb

Overview

This module process the incoming messages and call one of 5 messages (ack, request, callback, feedback or error) depending on the type of the message.

Instance Method Summary collapse

Instance Method Details

#check_whitelist(spark, options) ⇒ Boolean, Hash

Call whitelisted? to verify the the topic and task are legit.



164
165
166
167
168
169
170
171
# File 'lib/hsdq/receiver.rb', line 164

def check_whitelist(spark, options)
  begin
    whitelisted?(spark, options) ? true : (raise ArgumentError.new("Illegal argument in topic or task"))
  rescue => e
    reject_spark spark, e
    false
  end
end

#get_burst(spark, _options = {}) ⇒ Object

Manage pulling:

  • the burst (persistent action) associated with the spark from the matching Redis hash
  • if needed the context data


102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/hsdq/receiver.rb', line 102

def get_burst(spark, _options={})
  # get the context parameters
  context_h = spark[:context]

  burst_p = -> { cx_data.hget hsdq_key(spark), burst_key(spark) }
  if response?(spark) && context_h
    # save previous_sender in thread_store for later reply
    sent_to context_h[:previous_sender]
    # set the proc for multi redis to pull the initial request
    burst_context_p = -> { cx_data.hget hsdq_key(spark), "request_#{context_h[:spark_uid]}" }
    # exec the redis multi
    burst_j, burst_context_j = pull_burst(burst_p, burst_context_p)
  else
    burst_j, burst_context_j = pull_burst_only burst_p
  end

  burst         = burst_j ? (JSON.parse burst_j, {symbolize_names: true}) : {}
  burst_context = burst_context_j ? (JSON.parse burst_context_j, {symbolize_names: true}) : {}

  [burst, burst_context]
end

#get_spark(raw_spark) ⇒ Object

blpop return an array [list_name, data]



48
49
50
# File 'lib/hsdq/receiver.rb', line 48

def get_spark(raw_spark)
  raw_spark.kind_of?(Array) ? raw_spark.last : raw_spark
end

#h_spark(raw_spark) ⇒ Hash

return the spark (ephemeral part of the message) from the message list



55
56
57
# File 'lib/hsdq/receiver.rb', line 55

def h_spark(raw_spark)
  JSON.parse get_spark(raw_spark), {symbolize_names: true}
end

#hsdq_ack(message, context) ⇒ Object

Placeholder for ack received. You must override hsdq_request in your HsdqXxx class

See Also:



17
# File 'lib/hsdq/receiver.rb', line 17

def hsdq_ack(message, context);      placeholder; end

#hsdq_add_authorized_tasks(*tasks) ⇒ Array



241
242
243
# File 'lib/hsdq/receiver.rb', line 241

def hsdq_add_authorized_tasks(*tasks)
  @hsdq_authorized_tasks = [hsdq_authorized_tasks, tasks].flatten
end

#hsdq_add_authorized_topics(*topics) ⇒ Array



247
248
249
# File 'lib/hsdq/receiver.rb', line 247

def hsdq_add_authorized_topics(*topics)
  @hsdq_authorized_topics = [hsdq_authorized_topics, topics].flatten
end

#hsdq_authorized_tasks(*tasks) ⇒ Array

Cached value of the tasks authorized to be processed



210
211
212
213
214
215
216
# File 'lib/hsdq/receiver.rb', line 210

def hsdq_authorized_tasks(*tasks)
  if tasks.any?
    @hsdq_authorized_tasks = [tasks].flatten
  else
    @hsdq_authorized_tasks ||= [hsdq_opts[:tasks]].flatten
  end
end

#hsdq_authorized_topics(*topics) ⇒ Object

Cached value of the topics authorized to be processed



219
220
221
222
223
224
225
# File 'lib/hsdq/receiver.rb', line 219

def hsdq_authorized_topics(*topics)
  if topics.any?
    @hsdq_authorized_topics = [topics].flatten
  else
    @hsdq_authorized_topics ||= [hsdq_opts[:topics]].flatten
  end
end

#hsdq_authorized_typesObject

Hash of the internal authorized message types



203
204
205
# File 'lib/hsdq/receiver.rb', line 203

def hsdq_authorized_types
  [:request, :ack, :feedback, :callback, :error]
end

#hsdq_callback(message, context) ⇒ Object

Placeholder for callback received. You must override hsdq_request in your HsdqXxx class

See Also:



20
# File 'lib/hsdq/receiver.rb', line 20

def hsdq_callback(message, context); placeholder; end

#hsdq_error(message, context) ⇒ Object

Placeholder for error received. You must override hsdq_request in your HsdqXxx class

See Also:



26
# File 'lib/hsdq/receiver.rb', line 26

def hsdq_error(message, context);    placeholder; end

#hsdq_feedback(message, context) ⇒ Object

Placeholder for feedback received. You must override hsdq_request in your HsdqXxx class

See Also:



23
# File 'lib/hsdq/receiver.rb', line 23

def hsdq_feedback(message, context); placeholder; end

#hsdq_ignit(raw_spark, options) ⇒ Object

Send the ACk and start the processing for the message just received.
The processing will be either executed synchronously or a new thread will be started based on the configuration.



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/hsdq/receiver.rb', line 33

def hsdq_ignit(raw_spark, options)
  spark = h_spark raw_spark
  send_ack spark
  if   valid_spark? spark, options
    if hsdq_opts[:threaded]
      # :nocov:
      hsdq_start_thread -> { sparkle spark, options }
      # :nocov:
    else
      sparkle spark, options
    end
  end
end

#hsdq_request(message, context) ⇒ String

Placeholder for request received. You must override hsdq_request in your HsdqXxx class
After this method has run, there will be no further processing.



14
# File 'lib/hsdq/receiver.rb', line 14

def hsdq_request(message, context);  placeholder; end

#hsdq_set_authorized_tasks(*tasks) ⇒ Array



229
230
231
# File 'lib/hsdq/receiver.rb', line 229

def hsdq_set_authorized_tasks(*tasks)
  @hsdq_authorized_tasks = tasks.flatten
end

#hsdq_set_authorized_topics(*topics) ⇒ Array



235
236
237
# File 'lib/hsdq/receiver.rb', line 235

def hsdq_set_authorized_topics(*topics)
  @hsdq_authorized_topics = topics.flatten
end

#pull_burst(burst_p, burst_context_p) ⇒ array

Execute a multi transaction to get the burst and the context from Redis in a single call



128
129
130
131
132
133
# File 'lib/hsdq/receiver.rb', line 128

def pull_burst(burst_p, burst_context_p)
  cx_data.multi do
    burst_p.call
    burst_context_p.call
  end
end

#pull_burst_only(burst_p) ⇒ Object

If there is no context this method is used instead of pull_burst

See Also:



138
139
140
# File 'lib/hsdq/receiver.rb', line 138

def pull_burst_only(burst_p)
  [burst_p.call, nil]
end

#reject_spark(spark, e) ⇒ Hash

Send an error message back to the sender



182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/hsdq/receiver.rb', line 182

def reject_spark(spark, e)
  error = {
    sent_to: spark[:sender],
    uid:     spark[:uid],
    sender:  channel,
    params:  spark,
    data:    e.message
  }
  puts "sending error message: #{error}"
  hsdq_send_error error
  error
end

#response?(spark) ⇒ boolean



264
265
266
# File 'lib/hsdq/receiver.rb', line 264

def response?(spark)
  [:callback, :feedback, :error].include? spark[:type].to_sym
end

#send_ack(spark) ⇒ Object

Send the ack back to the sender in case of a request



196
197
198
199
200
# File 'lib/hsdq/receiver.rb', line 196

def send_ack(spark)
  return unless ['request', :request].include? spark[:type]
  ack_msg = spark.merge sent_to: spark[:sender], sender: channel
  hsdq_send_ack ack_msg
end

#set_context(spark) ⇒ Object

Save for future use context data into the thread_store



90
91
92
93
94
95
# File 'lib/hsdq/receiver.rb', line 90

def set_context(spark)
  # store in thread_store for later use
  sent_to spark[:sender]
  previous_sender spark[:sender]
  context_params({ reply_to: spark[:previous_sender], spark_uid: spark[:spark_uid]})
end

#sparkle(spark, options) ⇒ Object

Entry point for the task to process, this is what is executed in the threads when a message is pulled.

  • Pull the burst (line with the request or response) from the the hash
  • Pull the context related to a response if it exist
  • Set values for the next hop context in case of a request.
  • Call one of the 5 methods (request, ack, callback, feedback, error) in your hsdqXxx class (or the placeholder) based on the message type


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/hsdq/receiver.rb', line 67

def sparkle(spark, options)
  puts spark.inspect
  burst, ctx_burst = get_burst spark, options
  context ctx_burst

  case spark[:type].to_sym
    when :ack
      hsdq_ack burst, context
    when :callback
      hsdq_callback burst, context
    when :feedback
      hsdq_feedback burst, context
    when :error
      hsdq_error burst, context
    when :request
      set_context spark

      hsdq_request burst, context
  end
end

#valid_spark?(spark, options) ⇒ Boolean, Hash

Spark validation, call valid_type?. If invalid:

  • an error is sent back to the sender
  • false is returned to the processing to stop the action.


149
150
151
152
153
154
155
156
157
# File 'lib/hsdq/receiver.rb', line 149

def   valid_spark?(spark, options)
  begin
    raise ArgumentError.new("Illegal type #{spark[:type]}") unless valid_type? spark[:type]
    'request' == spark[:type] ? check_whitelist(spark, options) : true
  rescue => e
    reject_spark spark, e
    false
  end
end

#valid_task?(spark, _options) ⇒ Boolean

test the task against the list of authorised tasks



253
254
255
256
# File 'lib/hsdq/receiver.rb', line 253

def valid_task?(spark, _options)
  return true if spark[:task].nil? || hsdq_authorized_tasks.empty?
  hsdq_authorized_tasks.include?(spark[:task].to_sym)
end

#valid_topic?(spark, _options) ⇒ Boolean



258
259
260
261
# File 'lib/hsdq/receiver.rb', line 258

def valid_topic?(spark, _options)
  return true if spark[:topic].nil? || hsdq_authorized_topics.empty?
  hsdq_authorized_topics.include?(spark[:topic].to_sym)
end

#whitelisted?(spark, options) ⇒ Boolean

validate the topic and the task



174
175
176
# File 'lib/hsdq/receiver.rb', line 174

def whitelisted?(spark, options)
  valid_topic?(spark, options) && valid_task?(spark, options)
end