Module: RSMP::Proxy::Modules::Send

Included in:
RSMP::Proxy
Defined in:
lib/rsmp/proxy/modules/send.rb

Overview

Message sending functionality Handles sending messages, validation, and buffering

Instance Method Summary collapse

Instance Method Details

#apply_nts_message_attributes(message) ⇒ Object



69
70
71
72
# File 'lib/rsmp/proxy/modules/send.rb', line 69

def apply_nts_message_attributes(message)
  message.attributes['ntsOId'] = main && main.ntsoid ? main.ntsoid : ''
  message.attributes['xNId'] = main && main.xnid ? main.xnid : ''
end

#buffer_message(message) ⇒ Object



31
32
33
34
# File 'lib/rsmp/proxy/modules/send.rb', line 31

def buffer_message(message)
  # TODO
  # log "Cannot send #{message.type} because the connection is closed.", message: message, level: :error
end

#handle_send_schema_error(message, error) ⇒ Object



7
8
9
10
11
12
# File 'lib/rsmp/proxy/modules/send.rb', line 7

def handle_send_schema_error(message, error)
  schemas_string = error.schemas.map { |schema| "#{schema.first}: #{schema.last}" }.join(', ')
  str = "Could not send #{message.type} because schema validation failed (#{schemas_string}): #{error.message}"
  log str, message: message, level: :error
  distribute_error error.exception("#{str} #{message.json}")
end

#log_send(message, reason = nil) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rsmp/proxy/modules/send.rb', line 36

def log_send(message, reason = nil)
  str = if reason
          "Sent #{message.type} #{reason}"
        else
          "Sent #{message.type}"
        end

  if message.type == 'MessageNotAck'
    log str, message: message, level: :warning
  else
    log str, message: message, level: :log
  end
end

#send_and_optionally_collect(message, options) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/rsmp/proxy/modules/send.rb', line 50

def send_and_optionally_collect(message, options)
  collect_options = options[:collect] || options[:collect!]
  if collect_options
    task = @task.async do |task|
      task.annotate 'send_and_optionally_collect'
      collector = yield collect_options     # call block to create collector
      collector.collect
      collector.ok! if options[:collect!]   # raise any errors if the bang version was specified
      collector
    end

    send_message message, validate: options[:validate]
    { sent: message, collector: task.wait }
  else
    send_message message, validate: options[:validate]
    { sent: message }
  end
end

#send_message(message, reason = nil, validate: true, force: false) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/rsmp/proxy/modules/send.rb', line 14

def send_message(message, reason = nil, validate: true, force: false)
  raise NotReady if !force && !connected?
  raise IOError unless @protocol

  message.direction = :out
  message.generate_json
  message.validate schemas unless validate == false
  @protocol.write_lines message.json
  expect_acknowledgement message
  distribute message
  log_send message, reason
rescue IOError
  buffer_message message
rescue SchemaError, RSMP::Schema::Error => e
  handle_send_schema_error(message, e)
end