Class: Bricolage::SQSMessage
- Inherits:
-
Object
- Object
- Bricolage::SQSMessage
- Defined in:
- lib/bricolage/sqsdatasource.rb
Direct Known Subclasses
Bricolage::StreamingLoad::DispatcherMessage, Bricolage::StreamingLoad::LoaderMessage, UnknownSQSMessage
Constant Summary collapse
- SQS_EVENT_SOURCE =
'bricolage:system'
Instance Attribute Summary collapse
-
#delay_seconds ⇒ Object
readonly
Valid only for sending messages.
-
#message_id ⇒ Object
readonly
Valid only for received messages.
-
#name ⇒ Object
readonly
abstract init_message(**message_params).
-
#receipt_handle ⇒ Object
readonly
Returns the value of attribute receipt_handle.
-
#source ⇒ Object
readonly
Returns the value of attribute source.
-
#time ⇒ Object
readonly
Returns the value of attribute time.
Class Method Summary collapse
-
.create(name:, time: Time.now.getutc, source: SQS_EVENT_SOURCE, delay_seconds: 0, **message_params) ⇒ Object
Writer interface.
-
.for_sqs_record(msg, rec) ⇒ Object
abstract SQSMessage.get_concrete_class(msg, rec).
- .for_sqs_result(result) ⇒ Object
- .get_event_time(rec) ⇒ Object
- .parse_sqs_record(msg, rec) ⇒ Object
Instance Method Summary collapse
- #body ⇒ Object
-
#initialize(name:, time:, source:, message_id: nil, receipt_handle: nil, delay_seconds: nil, **message_params) ⇒ SQSMessage
constructor
A new instance of SQSMessage.
Constructor Details
#initialize(name:, time:, source:, message_id: nil, receipt_handle: nil, delay_seconds: nil, **message_params) ⇒ SQSMessage
Returns a new instance of SQSMessage.
390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/bricolage/sqsdatasource.rb', line 390 def initialize(name:, time:, source:, message_id: nil, receipt_handle: nil, delay_seconds: nil, **) @name = name @time = time @source = source @message_id = @receipt_handle = receipt_handle @delay_seconds = delay_seconds (**) end |
Instance Attribute Details
#delay_seconds ⇒ Object (readonly)
Valid only for sending messages
418 419 420 |
# File 'lib/bricolage/sqsdatasource.rb', line 418 def delay_seconds @delay_seconds end |
#message_id ⇒ Object (readonly)
Valid only for received messages
413 414 415 |
# File 'lib/bricolage/sqsdatasource.rb', line 413 def @message_id end |
#name ⇒ Object (readonly)
abstract init_message(**message_params)
407 408 409 |
# File 'lib/bricolage/sqsdatasource.rb', line 407 def name @name end |
#receipt_handle ⇒ Object (readonly)
Returns the value of attribute receipt_handle.
414 415 416 |
# File 'lib/bricolage/sqsdatasource.rb', line 414 def receipt_handle @receipt_handle end |
#source ⇒ Object (readonly)
Returns the value of attribute source.
409 410 411 |
# File 'lib/bricolage/sqsdatasource.rb', line 409 def source @source end |
#time ⇒ Object (readonly)
Returns the value of attribute time.
408 409 410 |
# File 'lib/bricolage/sqsdatasource.rb', line 408 def time @time end |
Class Method Details
.create(name:, time: Time.now.getutc, source: SQS_EVENT_SOURCE, delay_seconds: 0, **message_params) ⇒ Object
Writer interface
351 352 353 354 355 356 357 358 |
# File 'lib/bricolage/sqsdatasource.rb', line 351 def SQSMessage.create( name:, time: Time.now.getutc, source: SQS_EVENT_SOURCE, delay_seconds: 0, **) new(name: name, time: time, source: source, delay_seconds: delay_seconds, **) end |
.for_sqs_record(msg, rec) ⇒ Object
abstract SQSMessage.get_concrete_class(msg, rec)
370 371 372 |
# File 'lib/bricolage/sqsdatasource.rb', line 370 def SQSMessage.for_sqs_record(msg, rec) new(** SQSMessage.parse_sqs_record(msg, rec).merge(parse_sqs_record(msg, rec))) end |
.for_sqs_result(result) ⇒ Object
360 361 362 363 364 365 366 |
# File 'lib/bricolage/sqsdatasource.rb', line 360 def SQSMessage.for_sqs_result(result) result..flat_map {|msg| body = JSON.parse(msg.body) records = body['Records'] or next [UnknownSQSMessage.for_sqs_record(msg, nil)] records.map {|rec| get_concrete_class(msg, rec).for_sqs_record(msg, rec) } } end |
.get_event_time(rec) ⇒ Object
384 385 386 387 388 |
# File 'lib/bricolage/sqsdatasource.rb', line 384 def SQSMessage.get_event_time(rec) return nil unless rec str = rec['eventTime'] or return nil Time.parse(str) rescue nil end |
.parse_sqs_record(msg, rec) ⇒ Object
374 375 376 377 378 379 380 381 382 |
# File 'lib/bricolage/sqsdatasource.rb', line 374 def SQSMessage.parse_sqs_record(msg, rec) { message_id: msg., receipt_handle: msg.receipt_handle, name: (rec ? rec['eventName'] : nil), time: get_event_time(rec), source: (rec ? rec['eventSource'] : nil) } end |
Instance Method Details
#body ⇒ Object
420 421 422 423 424 425 426 427 428 429 430 |
# File 'lib/bricolage/sqsdatasource.rb', line 420 def body obj = {} [ ['eventName', @name], ['eventTime', (@time ? @time.iso8601 : nil)], ['eventSource', @source] ].each do |name, value| obj[name] = value if value end obj end |