Class: MCollective::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/mcollective/message.rb

Overview

container for a message, its headers, agent, collective and other meta data

Constant Summary collapse

VALIDTYPES =
[:message, :request, :direct_request, :reply]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(payload, message, options = {}) ⇒ Message

payload - the message body without headers etc, just the text message - the original message received from the middleware options - if the body base64 encoded? options - the agent the message is for/from options - the collective its for/from options - the message headers options - an indicator about the type of message, :message, :request, :direct_request or :reply options - if this is a reply this should old the message we are replying to options - for requests, the filter to encode into the message options - the normal client options hash options - the maximum amount of seconds this message can be valid for options - in the case of replies this is the msgid it is expecting in the replies options - specific request id to use else one will be generated



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/mcollective/message.rb', line 23

def initialize(payload, message, options = {})
  options = {:base64 => false,
             :agent => nil,
             :headers => {},
             :type => :message,
             :request => nil,
             :filter => Util.empty_filter,
             :options => {},
             :ttl => 60,
             :expected_msgid => nil,
             :requestid => nil,
             :collective => nil}.merge(options)

  @payload = payload
  @message = message
  @requestid = options[:requestid]
  @discovered_hosts = nil
  @reply_to = nil

  @type = options[:type]
  @headers = options[:headers]
  @base64 = options[:base64]
  @filter = options[:filter]
  @expected_msgid = options[:expected_msgid]
  @options = options[:options]

  @ttl = @options[:ttl] || Config.instance.ttl
  @msgtime = 0

  @validated = false

  if options[:request]
    @request = options[:request]
    @agent = request.agent
    @collective = request.collective
    @type = :reply
  else
    @agent = options[:agent]
    @collective = options[:collective]
  end

  base64_decode!
end

Instance Attribute Details

#agentObject

Returns the value of attribute agent.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def agent
  @agent
end

#collectiveObject

Returns the value of attribute collective.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def collective
  @collective
end

#discovered_hostsObject

Returns the value of attribute discovered_hosts.



6
7
8
# File 'lib/mcollective/message.rb', line 6

def discovered_hosts
  @discovered_hosts
end

#expected_msgidObject

Returns the value of attribute expected_msgid.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def expected_msgid
  @expected_msgid
end

#filterObject

Returns the value of attribute filter.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def filter
  @filter
end

#headersObject

Returns the value of attribute headers.



5
6
7
# File 'lib/mcollective/message.rb', line 5

def headers
  @headers
end

#messageObject (readonly)

Returns the value of attribute message.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def message
  @message
end

#msgtimeObject (readonly)

Returns the value of attribute msgtime.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def msgtime
  @msgtime
end

#optionsObject

Returns the value of attribute options.



6
7
8
# File 'lib/mcollective/message.rb', line 6

def options
  @options
end

#payloadObject (readonly)

Returns the value of attribute payload.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def payload
  @payload
end

#reply_toObject

Returns the value of attribute reply_to.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def reply_to
  @reply_to
end

#requestObject (readonly)

Returns the value of attribute request.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def request
  @request
end

#requestidObject

Returns the value of attribute requestid.



6
7
8
# File 'lib/mcollective/message.rb', line 6

def requestid
  @requestid
end

#ttlObject

Returns the value of attribute ttl.



6
7
8
# File 'lib/mcollective/message.rb', line 6

def ttl
  @ttl
end

#typeObject

Returns the value of attribute type.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def type
  @type
end

#validatedObject (readonly)

Returns the value of attribute validated.



4
5
6
# File 'lib/mcollective/message.rb', line 4

def validated
  @validated
end

Instance Method Details

#base64?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/mcollective/message.rb', line 129

def base64?
  @base64
end

#base64_decode!Object



115
116
117
118
119
120
# File 'lib/mcollective/message.rb', line 115

def base64_decode!
  return unless @base64

  @payload = SSL.base64_decode(@payload)
  @base64 = false
end

#base64_encode!Object



122
123
124
125
126
127
# File 'lib/mcollective/message.rb', line 122

def base64_encode!
  return if @base64

  @payload = SSL.base64_encode(@payload)
  @base64 = true
end

#create_reqidObject



241
242
243
244
245
246
# File 'lib/mcollective/message.rb', line 241

def create_reqid
  # we gsub out the -s so that the format of the id does not
  # change from previous versions, these should just be more
  # unique than previous ones
  SSL.uuid.gsub("-", "")
end

#decode!Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/mcollective/message.rb', line 183

def decode!
  raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type)

  begin
    @payload = PluginManager["security_plugin"].decodemsg(self)
  rescue Exception => e
    if type == :request
      # If we're a server receiving a request, reraise
      raise(e)
    else
      # We're in the client, log and carry on as best we can

      # Note: mc_sender is unverified.  The verified identity is in the
      # payload we just failed to decode
      Log.warn("Failed to decode a message from '#{headers["mc_sender"]}': #{e}")
      return
    end
  end

  if type == :request
    raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid])
  end

  [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop|
    instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop)
  end
end

#descriptionObject



133
134
135
136
137
138
139
# File 'lib/mcollective/message.rb', line 133

def description
  cid = ""
  cid += payload[:callerid] + "@" if payload.include?(:callerid)
  cid += payload[:senderid]

  "#{requestid} for agent '#{agent}' in collective '#{collective}' from #{cid}"
end

#encode!Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/mcollective/message.rb', line 141

def encode!
  case type
    when :reply
      raise "Cannot encode a reply message if no request has been associated with it" unless request
      raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid])

      @requestid = request.payload[:requestid]
      @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid])
    when :request, :direct_request
      validate_compound_filter(@filter["compound"]) unless @filter["compound"].empty?

      @requestid = create_reqid unless @requestid
      @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl)
    else
      raise "Cannot encode #{type} messages"
  end
end

#publishObject

publish a reply message by creating a target name and sending it



228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/mcollective/message.rb', line 228

def publish
  # If we've been specificaly told about hosts that were discovered
  # use that information to do P2P calls if appropriate else just
  # send it as is.
  config = Config.instance
  if @discovered_hosts && config.direct_addressing && (@discovered_hosts.size <= config.direct_addressing_threshold)
    self.type = :direct_request
    Log.debug("Handling #{requestid} as a direct request")
  end

  PluginManager['connector_plugin'].publish(self)
end

#validateObject

Perform validation against the message by checking filters and ttl

Raises:



212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/mcollective/message.rb', line 212

def validate
  raise "Can only validate request messages" unless type == :request

  msg_age = Time.now.utc.to_i - msgtime

  if msg_age > ttl
    PluginManager["global_stats"].ttlexpired
    raise(MsgTTLExpired, "Message #{description} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}. Rejecting message.")
  end

  raise(NotTargettedAtUs, "Message #{description} does not pass filters. Ignoring message.") unless PluginManager["security_plugin"].validate_filter?(payload[:filter])

  @validated = true
end

#validate_compound_filter(compound_filter) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/mcollective/message.rb', line 159

def validate_compound_filter(compound_filter)
  compound_filter.each do |filter|
    filter.each do |statement|
      if statement["fstatement"]
        functionname = statement["fstatement"]["name"]
        pluginname = Data.pluginname(functionname)
        value = statement["fstatement"]["value"]

        ddl = DDL.new(pluginname, :data)

        # parses numbers and booleans entered as strings into proper
        # types of data so that DDL validation will pass
        statement["fstatement"]["params"] = Data.ddl_transform_input(ddl, statement["fstatement"]["params"])

        Data.ddl_validate(ddl, statement["fstatement"]["params"])

        unless value && Data.ddl_has_output?(ddl, value)
          DDL.validation_fail!(:PLMC41, "Data plugin '%{functionname}()' does not return a '%{value}' value", :error, {:functionname => functionname, :value => value})
        end
      end
    end
  end
end