Class: NatsWork::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/natswork/message.rb

Constant Summary collapse

PROTOCOL_VERSION =
'1.0.0'
TYPE_JOB_DISPATCH =
'job.dispatch'
TYPE_JOB_RESULT =
'job.result'
TYPE_JOB_ERROR =
'job.error'
VALID_TYPES =
[
  TYPE_JOB_DISPATCH,
  TYPE_JOB_RESULT,
  TYPE_JOB_ERROR
].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Message

Returns a new instance of Message.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/natswork/message.rb', line 26

def initialize(options = {})
  @type = options[:type] || TYPE_JOB_DISPATCH
  @job_id = options[:job_id] || SecureRandom.uuid
  @job_class = options[:job_class]
  @queue = options[:queue]
  @arguments = options[:arguments] || []
  @retry_count = options[:retry_count] || 0
  @max_retries = options[:max_retries] || 3
  @timeout = options[:timeout] || 30
  @reply_to = options[:reply_to]
  @metadata = options[:metadata] || {}
  @created_at = options[:created_at] || Time.now.iso8601
  @enqueued_at = options[:enqueued_at] || @created_at
  @result = options[:result]
  @error = options[:error]
end

Instance Attribute Details

#argumentsObject

Returns the value of attribute arguments.



22
23
24
# File 'lib/natswork/message.rb', line 22

def arguments
  @arguments
end

#created_atObject

Returns the value of attribute created_at.



22
23
24
# File 'lib/natswork/message.rb', line 22

def created_at
  @created_at
end

#enqueued_atObject

Returns the value of attribute enqueued_at.



22
23
24
# File 'lib/natswork/message.rb', line 22

def enqueued_at
  @enqueued_at
end

#errorObject

Returns the value of attribute error.



22
23
24
# File 'lib/natswork/message.rb', line 22

def error
  @error
end

#job_classObject

Returns the value of attribute job_class.



22
23
24
# File 'lib/natswork/message.rb', line 22

def job_class
  @job_class
end

#job_idObject

Returns the value of attribute job_id.



22
23
24
# File 'lib/natswork/message.rb', line 22

def job_id
  @job_id
end

#max_retriesObject

Returns the value of attribute max_retries.



22
23
24
# File 'lib/natswork/message.rb', line 22

def max_retries
  @max_retries
end

#metadataObject

Returns the value of attribute metadata.



22
23
24
# File 'lib/natswork/message.rb', line 22

def 
  @metadata
end

#queueObject

Returns the value of attribute queue.



22
23
24
# File 'lib/natswork/message.rb', line 22

def queue
  @queue
end

#reply_toObject

Returns the value of attribute reply_to.



22
23
24
# File 'lib/natswork/message.rb', line 22

def reply_to
  @reply_to
end

#resultObject

Returns the value of attribute result.



22
23
24
# File 'lib/natswork/message.rb', line 22

def result
  @result
end

#retry_countObject

Returns the value of attribute retry_count.



22
23
24
# File 'lib/natswork/message.rb', line 22

def retry_count
  @retry_count
end

#timeoutObject

Returns the value of attribute timeout.



22
23
24
# File 'lib/natswork/message.rb', line 22

def timeout
  @timeout
end

#typeObject

Returns the value of attribute type.



22
23
24
# File 'lib/natswork/message.rb', line 22

def type
  @type
end

Class Method Details

.from_json(json_string) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/natswork/message.rb', line 86

def self.from_json(json_string)
  begin
    data = JSON.parse(json_string, symbolize_names: true)
  rescue JSON::ParserError => e
    raise InvalidMessageError, "Invalid JSON: #{e.message}"
  end

  if data[:protocol_version] && data[:protocol_version] != PROTOCOL_VERSION
    major_version = data[:protocol_version].split('.').first
    supported_major = PROTOCOL_VERSION.split('.').first

    if major_version != supported_major
      raise InvalidMessageError, "Unsupported protocol version: #{data[:protocol_version]}"
    end
  end

  # Decompress arguments if compressed
  if data[:metadata] && data[:metadata]['compressed']
    compressed_payload = {
      compressed: true,
      data: data[:arguments]
    }
    data[:arguments] = Compression.decompress(compressed_payload)
  end

  new(data)
end

Instance Method Details

#to_hashObject



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/natswork/message.rb', line 63

def to_hash
  hash = {
    type: @type,
    job_id: @job_id,
    job_class: @job_class,
    queue: @queue,
    arguments: @arguments,
    retry_count: @retry_count,
    max_retries: @max_retries,
    timeout: @timeout,
    metadata: @metadata,
    created_at: @created_at,
    enqueued_at: @enqueued_at,
    protocol_version: PROTOCOL_VERSION
  }

  hash[:reply_to] = @reply_to if @reply_to
  hash[:result] = @result if @result
  hash[:error] = @error if @error

  hash
end

#to_json(*args) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/natswork/message.rb', line 43

def to_json(*args)
  hash = to_hash

  # Check if arguments should be compressed
  if Compression.should_compress?(@arguments)
    compression_result = Compression.compress(@arguments)
    if compression_result[:compressed]
      hash[:arguments] = compression_result[:data]
      hash[:metadata] ||= {}
      hash[:metadata]['compressed'] = true
      hash[:metadata]['compression_ratio'] = Compression.compression_ratio(
        compression_result[:original_size],
        compression_result[:compressed_size]
      )
    end
  end

  hash.to_json(*args)
end

#valid?Boolean

Returns:

  • (Boolean)


114
115
116
117
118
119
# File 'lib/natswork/message.rb', line 114

def valid?
  return false unless @job_class && @queue
  return false unless VALID_TYPES.include?(@type)

  true
end

#validate!Object



121
122
123
124
125
126
127
128
129
# File 'lib/natswork/message.rb', line 121

def validate!
  raise InvalidMessageError, 'job_class is required' unless @job_class

  raise InvalidMessageError, 'queue is required' unless @queue

  raise InvalidMessageError, "Invalid message type: #{@type}" unless VALID_TYPES.include?(@type)

  true
end