Class: NatsWork::Message
- Inherits:
-
Object
- Object
- NatsWork::Message
- Defined in:
- lib/natswork/message.rb
Constant Summary collapse
- PROTOCOL_VERSION =
'1.0.0'
- TYPE_JOB_DISPATCH =
'job.dispatch'
- TYPE_JOB_RESULT =
'job.result'
- TYPE_JOB_ERROR =
'job.error'
- VALID_TYPES =
[ TYPE_JOB_DISPATCH, TYPE_JOB_RESULT, TYPE_JOB_ERROR ].freeze
Instance Attribute Summary collapse
-
#arguments ⇒ Object
Returns the value of attribute arguments.
-
#created_at ⇒ Object
Returns the value of attribute created_at.
-
#enqueued_at ⇒ Object
Returns the value of attribute enqueued_at.
-
#error ⇒ Object
Returns the value of attribute error.
-
#job_class ⇒ Object
Returns the value of attribute job_class.
-
#job_id ⇒ Object
Returns the value of attribute job_id.
-
#max_retries ⇒ Object
Returns the value of attribute max_retries.
-
#metadata ⇒ Object
Returns the value of attribute metadata.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#reply_to ⇒ Object
Returns the value of attribute reply_to.
-
#result ⇒ Object
Returns the value of attribute result.
-
#retry_count ⇒ Object
Returns the value of attribute retry_count.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
-
#type ⇒ Object
Returns the value of attribute type.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ Message
constructor
A new instance of Message.
- #to_hash ⇒ Object
- #to_json(*args) ⇒ Object
- #valid? ⇒ Boolean
- #validate! ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Message
Returns a new instance of Message.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/natswork/message.rb', line 26 def initialize( = {}) @type = [:type] || TYPE_JOB_DISPATCH @job_id = [:job_id] || SecureRandom.uuid @job_class = [:job_class] @queue = [:queue] @arguments = [:arguments] || [] @retry_count = [:retry_count] || 0 @max_retries = [:max_retries] || 3 @timeout = [:timeout] || 30 @reply_to = [:reply_to] @metadata = [:metadata] || {} @created_at = [:created_at] || Time.now.iso8601 @enqueued_at = [:enqueued_at] || @created_at @result = [:result] @error = [:error] end |
Instance Attribute Details
#arguments ⇒ Object
Returns the value of attribute arguments.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def arguments @arguments end |
#created_at ⇒ Object
Returns the value of attribute created_at.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def created_at @created_at end |
#enqueued_at ⇒ Object
Returns the value of attribute enqueued_at.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def enqueued_at @enqueued_at end |
#error ⇒ Object
Returns the value of attribute error.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def error @error end |
#job_class ⇒ Object
Returns the value of attribute job_class.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def job_class @job_class end |
#job_id ⇒ Object
Returns the value of attribute job_id.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def job_id @job_id end |
#max_retries ⇒ Object
Returns the value of attribute max_retries.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def max_retries @max_retries end |
#metadata ⇒ Object
Returns the value of attribute metadata.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def @metadata end |
#queue ⇒ Object
Returns the value of attribute queue.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def queue @queue end |
#reply_to ⇒ Object
Returns the value of attribute reply_to.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def reply_to @reply_to end |
#result ⇒ Object
Returns the value of attribute result.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def result @result end |
#retry_count ⇒ Object
Returns the value of attribute retry_count.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def retry_count @retry_count end |
#timeout ⇒ Object
Returns the value of attribute timeout.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def timeout @timeout end |
#type ⇒ Object
Returns the value of attribute type.
22 23 24 |
# File 'lib/natswork/message.rb', line 22 def type @type end |
Class Method Details
.from_json(json_string) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/natswork/message.rb', line 86 def self.from_json(json_string) begin data = JSON.parse(json_string, symbolize_names: true) rescue JSON::ParserError => e raise InvalidMessageError, "Invalid JSON: #{e.}" end if data[:protocol_version] && data[:protocol_version] != PROTOCOL_VERSION major_version = data[:protocol_version].split('.').first supported_major = PROTOCOL_VERSION.split('.').first if major_version != supported_major raise InvalidMessageError, "Unsupported protocol version: #{data[:protocol_version]}" end end # Decompress arguments if compressed if data[:metadata] && data[:metadata]['compressed'] compressed_payload = { compressed: true, data: data[:arguments] } data[:arguments] = Compression.decompress(compressed_payload) end new(data) end |
Instance Method Details
#to_hash ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/natswork/message.rb', line 63 def to_hash hash = { type: @type, job_id: @job_id, job_class: @job_class, queue: @queue, arguments: @arguments, retry_count: @retry_count, max_retries: @max_retries, timeout: @timeout, metadata: @metadata, created_at: @created_at, enqueued_at: @enqueued_at, protocol_version: PROTOCOL_VERSION } hash[:reply_to] = @reply_to if @reply_to hash[:result] = @result if @result hash[:error] = @error if @error hash end |
#to_json(*args) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/natswork/message.rb', line 43 def to_json(*args) hash = to_hash # Check if arguments should be compressed if Compression.should_compress?(@arguments) compression_result = Compression.compress(@arguments) if compression_result[:compressed] hash[:arguments] = compression_result[:data] hash[:metadata] ||= {} hash[:metadata]['compressed'] = true hash[:metadata]['compression_ratio'] = Compression.compression_ratio( compression_result[:original_size], compression_result[:compressed_size] ) end end hash.to_json(*args) end |
#valid? ⇒ Boolean
114 115 116 117 118 119 |
# File 'lib/natswork/message.rb', line 114 def valid? return false unless @job_class && @queue return false unless VALID_TYPES.include?(@type) true end |
#validate! ⇒ Object
121 122 123 124 125 126 127 128 129 |
# File 'lib/natswork/message.rb', line 121 def validate! raise InvalidMessageError, 'job_class is required' unless @job_class raise InvalidMessageError, 'queue is required' unless @queue raise InvalidMessageError, "Invalid message type: #{@type}" unless VALID_TYPES.include?(@type) true end |