Class: WampClient::Session

Inherits:
Object
  • Object
show all
Includes:
Check
Defined in:
lib/wamp_client/session.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Check

included

Constructor Details

#initialize(transport, options = {}) ⇒ Session

Constructor

Parameters:

  • transport (WampClient::Transport::Base)

    The transport that the session will use

  • options (Hash) (defaults to: {})

    Hash containing different session options

Options Hash (options):

  • :authid (String)

    The authentication ID

  • :authmethods (Array)

    Different auth methods that this client supports



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/wamp_client/session.rb', line 194

def initialize(transport, options={})

  # Parameters
  self.id = nil
  self.realm = nil
  self.options = options || {}
  self.verbose = self.options[:verbose]

  # Outstanding Requests
  self._requests = {
      publish: {},
      subscribe: {},
      unsubscribe: {},
      call: {},
      register: {},
      unregister: {}
  }

  # Init Subs and Regs in place
  self._subscriptions = {}
  self._registrations = {}
  self._defers = {}

  # Setup Transport
  self.transport = transport
  self.transport.on_message do |msg|
    self._receive_message(msg)
  end

  # Other parameters
  self._goodbye_sent = false

  # Setup session callbacks
  @on_join = nil
  @on_leave = nil
  @on_challenge = nil

end

Instance Attribute Details

#_defersObject

Private attributes



187
188
189
# File 'lib/wamp_client/session.rb', line 187

def _defers
  @_defers
end

#_goodbye_sentObject

Private attributes



187
188
189
# File 'lib/wamp_client/session.rb', line 187

def _goodbye_sent
  @_goodbye_sent
end

#_registrationsObject

Private attributes



187
188
189
# File 'lib/wamp_client/session.rb', line 187

def _registrations
  @_registrations
end

#_requestsObject

Private attributes



187
188
189
# File 'lib/wamp_client/session.rb', line 187

def _requests
  @_requests
end

#_subscriptionsObject

Private attributes



187
188
189
# File 'lib/wamp_client/session.rb', line 187

def _subscriptions
  @_subscriptions
end

#idObject

Returns the value of attribute id.



184
185
186
# File 'lib/wamp_client/session.rb', line 184

def id
  @id
end

#optionsObject

Returns the value of attribute options.



184
185
186
# File 'lib/wamp_client/session.rb', line 184

def options
  @options
end

#realmObject

Returns the value of attribute realm.



184
185
186
# File 'lib/wamp_client/session.rb', line 184

def realm
  @realm
end

#transportObject

Returns the value of attribute transport.



184
185
186
# File 'lib/wamp_client/session.rb', line 184

def transport
  @transport
end

#verboseObject

Returns the value of attribute verbose.



184
185
186
# File 'lib/wamp_client/session.rb', line 184

def verbose
  @verbose
end

Instance Method Details

#_error_to_hash(msg) ⇒ Object

Converts and error message to a hash

Parameters:



286
287
288
289
290
291
292
# File 'lib/wamp_client/session.rb', line 286

def _error_to_hash(msg)
  {
      error: msg.error,
      args: msg.arguments,
      kwargs: msg.argumentskw
  }
end

#_generate_idObject

Generates an ID according to the specification (Section 5.1.2)



280
281
282
# File 'lib/wamp_client/session.rb', line 280

def _generate_id
  rand(0..9007199254740992)
end

#_process_CALL_error(msg) ⇒ Object

Processes an error from a call request

Parameters:



961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
# File 'lib/wamp_client/session.rb', line 961

def _process_CALL_error(msg)

  # Remove the pending publish and inform the caller of the failure
  call = self._requests[:call].delete(msg.request_request)
  if call

    details = msg.details || {}
    details[:procedure] = call[:p] unless details[:procedure]
    details[:type] = 'call'
    details[:session] = self

    c = call[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_EVENT(msg) ⇒ Object

Processes and event from the broker

Parameters:



473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/wamp_client/session.rb', line 473

def _process_EVENT(msg)

  args = msg.publish_arguments || []
  kwargs = msg.publish_argumentskw || {}

  s = self._subscriptions[msg.subscribed_subscription]
  if s
    details = msg.details || {}
    details[:publication] = msg.published_publication
    details[:session] = self

    h = s.handler
    h.call(args, kwargs, details) if h
  end

end

#_process_INTERRUPT(msg) ⇒ Object

Processes the interrupt

Parameters:



798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
# File 'lib/wamp_client/session.rb', line 798

def _process_INTERRUPT(msg)

  request = msg.invocation_request
  mode = msg.options[:mode]

  defer = self._defers[request]
  if defer
    r = self._registrations[defer.registration]
    if r
      # If it exists, call the interrupt handler to inform it of the interrupt
      i = r.i_handler
      error = nil
      if i
        begin
          error = i.call(request, mode)
        rescue Exception => e
          error = e
        end
      end

      error ||= 'interrupt'

      # Send the error back to the client
      self._send_INVOCATION_error(request, error, true)
    end

    # Delete the defer
    self._defers.delete(request)
  end

end

#_process_INVOCATION(msg) ⇒ Object

Processes and event from the broker

Parameters:



739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
# File 'lib/wamp_client/session.rb', line 739

def _process_INVOCATION(msg)

  request = msg.request
  args = msg.call_arguments || []
  kwargs = msg.call_argumentskw || {}

  details = msg.details || {}
  details[:request] = request
  details[:session] = self

  r = self._registrations[msg.registered_registration]
  if r
    h = r.handler
    if h
      begin
        value = h.call(args, kwargs, details)

        # If a defer was returned, handle accordingly
        if value.is_a? WampClient::Defer::CallDefer
          value.request = request
          value.registration = msg.registered_registration

          # Store the defer
          self._defers[request] = value

          # On complete, send the result
          value.on_complete do |defer, result|
            self.yield(defer.request, result, {}, true)
            self._defers.delete(defer.request)
          end

          # On error, send the error
          value.on_error do |defer, error|
            self._send_INVOCATION_error(defer.request, error, true)
            self._defers.delete(defer.request)
          end

          # For progressive, return the progress
          if value.is_a? WampClient::Defer::ProgressiveCallDefer
            value.on_progress do |defer, result|
              self.yield(defer.request, result, {progress: true}, true)
            end
          end

        # Else it was a normal response
        else
          self.yield(request, value)
        end

      rescue Exception => error
        self._send_INVOCATION_error(request, error)
      end

    end
  end
end

#_process_PUBLISH_error(msg) ⇒ Object

Processes an error from a publish request

Parameters:



605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
# File 'lib/wamp_client/session.rb', line 605

def _process_PUBLISH_error(msg)

  # Remove the pending publish and inform the caller of the failure
  s = self._requests[:publish].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'publish'
    details[:session] = self

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_PUBLISHED(msg) ⇒ Object

Processes the response to a publish request

Parameters:



585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
# File 'lib/wamp_client/session.rb', line 585

def _process_PUBLISHED(msg)

  # Remove the pending publish and alert the callback
  p = self._requests[:publish].delete(msg.publish_request)
  if p

    details = {}
    details[:topic] = p[:t]
    details[:type] = 'publish'
    details[:publication] = msg.publication
    details[:session] = self

    c = p[:c]
    c.call(p, nil, details) if c
  end

end

#_process_REGISTER_error(msg) ⇒ Object

Processes an error from a request

Parameters:



674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
# File 'lib/wamp_client/session.rb', line 674

def _process_REGISTER_error(msg)

  # Remove the pending registration and inform the caller of the failure
  r = self._requests[:register].delete(msg.request_request)
  if r

    details = msg.details || {}
    details[:procedure] = r[:p] unless details[:procedure]
    details[:type] = 'register'
    details[:session] = self

    c = r[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_REGISTERED(msg) ⇒ Object

Processes the response to a register request

Parameters:



653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
# File 'lib/wamp_client/session.rb', line 653

def _process_REGISTERED(msg)

  # Remove the pending subscription, add it to the registered ones, and inform the caller
  r = self._requests[:register].delete(msg.register_request)
  if r
    n_r = Registration.new(r[:p], r[:h], r[:o], r[:i], self, msg.registration)
    self._registrations[msg.registration] = n_r

    details = {}
    details[:procedure] = r[:p]
    details[:type] = 'register'
    details[:session] = self

    c = r[:c]
    c.call(n_r, nil, details) if c
  end

end

#_process_RESULT(msg) ⇒ Object

Processes the response to a publish request

Parameters:



939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
# File 'lib/wamp_client/session.rb', line 939

def _process_RESULT(msg)

  details = msg.details || {}

  call = self._requests[:call][msg.call_request]

  # Don't remove if progress is true and the options had receive_progress true
  self._requests[:call].delete(msg.call_request) unless (details[:progress] and (call and call[:o][:receive_progress]))

  if call
    details[:procedure] = call[:p] unless details[:procedure]
    details[:type] = 'call'
    details[:session] = self

    c = call[:c]
    c.call(CallResult.new(msg.yield_arguments, msg.yield_argumentskw), nil, details) if c
  end

end

#_process_SUBSCRIBE_error(msg) ⇒ Object

Processes an error from a request

Parameters:



454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
# File 'lib/wamp_client/session.rb', line 454

def _process_SUBSCRIBE_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  s = self._requests[:subscribe].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'subscribe'
    details[:session] = self

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_SUBSCRIBED(msg) ⇒ Object

Processes the response to a subscribe request

Parameters:



433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# File 'lib/wamp_client/session.rb', line 433

def _process_SUBSCRIBED(msg)

  # Remove the pending subscription, add it to the registered ones, and inform the caller
  s = self._requests[:subscribe].delete(msg.subscribe_request)
  if s

    details = {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'subscribe'
    details[:session] = self

    n_s = Subscription.new(s[:t], s[:h], s[:o], self, msg.subscription)
    self._subscriptions[msg.subscription] = n_s
    c = s[:c]
    c.call(n_s, nil, details) if c
  end

end

#_process_UNREGISTER_error(msg) ⇒ Object

Processes an error from a request

Parameters:



876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
# File 'lib/wamp_client/session.rb', line 876

def _process_UNREGISTER_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  r = self._requests[:unregister].delete(msg.request_request)
  if r

    details = msg.details || {}
    details[:procedure] = r[:r].procedure unless details[:procedure]
    details[:type] = 'unregister'
    details[:session] = self

    c = r[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_UNREGISTERED(msg) ⇒ Object

Processes the response to a unregister request

Parameters:



855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
# File 'lib/wamp_client/session.rb', line 855

def _process_UNREGISTERED(msg)

  # Remove the pending unregistration, add it to the registered ones, and inform the caller
  r = self._requests[:unregister].delete(msg.unregister_request)
  if r
    r_s = r[:r]
    self._registrations.delete(r_s.id)

    details = {}
    details[:procedure] = r_s.procedure
    details[:type] = 'unregister'
    details[:session] = self

    c = r[:c]
    c.call(r_s, nil, details) if c
  end

end

#_process_UNSUBSCRIBE_error(msg) ⇒ Object

Processes an error from a request

Parameters:



537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
# File 'lib/wamp_client/session.rb', line 537

def _process_UNSUBSCRIBE_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  s = self._requests[:unsubscribe].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:s].topic unless details[:topic]
    details[:type] = 'unsubscribe'
    details[:session] = self

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_UNSUBSCRIBED(msg) ⇒ Object

Processes the response to a unsubscribe request

Parameters:



515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
# File 'lib/wamp_client/session.rb', line 515

def _process_UNSUBSCRIBED(msg)

  # Remove the pending unsubscription, add it to the registered ones, and inform the caller
  s = self._requests[:unsubscribe].delete(msg.unsubscribe_request)
  if s
    n_s = s[:s]
    self._subscriptions.delete(n_s.id)

    details = {}
    details[:topic] = s[:s].topic
    details[:type] = 'unsubscribe'
    details[:session] = self

    c = s[:c]
    c.call(n_s, nil, details) if c
  end

end

#_receive_message(msg) ⇒ Object

Processes received messages

Parameters:

  • msg (Array)


303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# File 'lib/wamp_client/session.rb', line 303

def _receive_message(msg)

  message = WampClient::Message::Base.parse(msg)

  if self.verbose
    puts 'RX: ' + message.to_s if message
    puts 'RX(non-wamp): ' + msg.to_s unless message
  end

  # WAMP Session is not open
  if self.id.nil?

    # Parse the welcome message
    if message.is_a? WampClient::Message::Welcome
      self.id = message.session
      @on_join.call(message.details) unless @on_join.nil?
    elsif message.is_a? WampClient::Message::Challenge

      if @on_challenge
        signature, extra = @on_challenge.call(message.authmethod, message.extra)
      else
        signature = nil
        extra = nil
      end

      signature ||= ''
      extra ||= {}

      authenticate = WampClient::Message::Authenticate.new(signature, extra)
      self._send_message(authenticate)

    elsif message.is_a? WampClient::Message::Abort
      @on_leave.call(message.reason, message.details) unless @on_leave.nil?
    end

  # Wamp Session is open
  else

    # If goodbye, close the session
    if message.is_a? WampClient::Message::Goodbye

      # If we didn't send the goodbye, respond
      unless self._goodbye_sent
        goodbye = WampClient::Message::Goodbye.new({}, 'wamp.error.goodbye_and_out')
        self._send_message(goodbye)
      end

      # Close out session
      self.id = nil
      self.realm = nil
      self._goodbye_sent = false
      @on_leave.call(message.reason, message.details) unless @on_leave.nil?

    else

      # Process Errors
      if message.is_a? WampClient::Message::Error
        if message.request_type == WampClient::Message::Types::SUBSCRIBE
          self._process_SUBSCRIBE_error(message)
        elsif message.request_type == WampClient::Message::Types::UNSUBSCRIBE
          self._process_UNSUBSCRIBE_error(message)
        elsif message.request_type == WampClient::Message::Types::PUBLISH
          self._process_PUBLISH_error(message)
        elsif message.request_type == WampClient::Message::Types::REGISTER
          self._process_REGISTER_error(message)
        elsif message.request_type == WampClient::Message::Types::UNREGISTER
          self._process_UNREGISTER_error(message)
        elsif message.request_type == WampClient::Message::Types::CALL
          self._process_CALL_error(message)
        else
          # TODO: Some Error??  Not Implemented yet
        end

      # Process Messages
      else
        if message.is_a? WampClient::Message::Subscribed
          self._process_SUBSCRIBED(message)
        elsif message.is_a? WampClient::Message::Unsubscribed
          self._process_UNSUBSCRIBED(message)
        elsif message.is_a? WampClient::Message::Published
          self._process_PUBLISHED(message)
        elsif message.is_a? WampClient::Message::Event
          self._process_EVENT(message)
        elsif message.is_a? WampClient::Message::Registered
          self._process_REGISTERED(message)
        elsif message.is_a? WampClient::Message::Unregistered
          self._process_UNREGISTERED(message)
        elsif message.is_a? WampClient::Message::Invocation
          self._process_INVOCATION(message)
        elsif message.is_a? WampClient::Message::Interrupt
          self._process_INTERRUPT(message)
        elsif message.is_a? WampClient::Message::Result
          self._process_RESULT(message)
        else
          # TODO: Some Error??  Not Implemented yet
        end
      end

    end
  end

end

#_send_INVOCATION_error(request, error, check_defer = false) ⇒ Object

Sends an error back to the caller

Parameters:

  • request (Integer)
    • The request ID

  • error


694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
# File 'lib/wamp_client/session.rb', line 694

def _send_INVOCATION_error(request, error, check_defer=false)
  # Prevent responses for defers that have already completed or had an error
  if check_defer and not self._defers[request]
    return
  end

  if error.nil?
    error = CallError.new('wamp.error.runtime')
  elsif not error.is_a?(CallError)
    error = CallError.new('wamp.error.runtime', [error.to_s])
  end

  error_msg = WampClient::Message::Error.new(WampClient::Message::Types::INVOCATION, request, {}, error.error, error.args, error.kwargs)
  self._send_message(error_msg)
end

#_send_message(msg) ⇒ Object

Sends a message

Parameters:



296
297
298
299
# File 'lib/wamp_client/session.rb', line 296

def _send_message(msg)
  puts 'TX: ' + msg.to_s if self.verbose
  self.transport.send_message(msg.payload)
end

#call(procedure, args = nil, kwargs = nil, options = {}, &callback) ⇒ Call

Publishes and event to a topic

Parameters:

  • procedure (String)

    The procedure to invoke

  • args (Array) (defaults to: nil)

    The arguments

  • kwargs (Hash) (defaults to: nil)

    The keyword arguments

  • options (Hash) (defaults to: {})

    The options for the call

  • callback (block)

    The callback(result, error, details) called to signal if the call was a success or not

Returns:

  • (Call)

    An object representing the call



904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
# File 'lib/wamp_client/session.rb', line 904

def call(procedure, args=nil, kwargs=nil, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'call'"
  end

  self.class.check_uri('procedure', procedure)
  self.class.check_dict('options', options)
  self.class.check_list('args', args, true)
  self.class.check_dict('kwargs', kwargs, true)

  # Create a new call request
  request = self._generate_id
  self._requests[:call][request] = {p: procedure, a: args, k: kwargs, o: options, c: callback}

  # Send the message
  msg = WampClient::Message::Call.new(request, options, procedure, args, kwargs)
  self._send_message(msg)

  call = Call.new(self, request)

  # Timeout Logic
  if options[:timeout] and options[:timeout] > 0
    self.transport.add_timer(options[:timeout]) do
      # Once the timer expires, if the call hasn't completed, cancel it
      if self._requests[:call][call.id]
        call.cancel
      end
    end
  end

  call
end

#cancel(call, mode = 'skip') ⇒ Object

Cancels a call

Parameters:

  • call (Call)
    • The call object

  • mode (String) (defaults to: 'skip')
    • The mode of the skip. Options are ‘skip’, ‘kill’, ‘killnowait’



985
986
987
988
989
990
991
992
993
994
995
# File 'lib/wamp_client/session.rb', line 985

def cancel(call, mode='skip')
  unless is_open?
    raise RuntimeError, "Session must be open to call 'cancel'"
  end

  self.class.check_nil('call', call, false)

  # Send the message
  cancel = WampClient::Message::Cancel.new(call.id, { mode: mode })
  self._send_message(cancel)
end

#is_open?Boolean

Returns ‘true’ if the session is open

Returns:

  • (Boolean)


234
235
236
# File 'lib/wamp_client/session.rb', line 234

def is_open?
  !self.id.nil?
end

#join(realm) ⇒ Object

Joins the WAMP Router

Parameters:

  • realm (String)

    The name of the realm



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/wamp_client/session.rb', line 240

def join(realm)
  if is_open?
    raise RuntimeError, "Session must be closed to call 'join'"
  end

  self.class.check_uri('realm', realm)

  self.realm = realm

  details = {}
  details[:roles] = WAMP_FEATURES
  details[:agent] = "Ruby-WampClient-#{WampClient::VERSION}"
  details[:authid] = self.options[:authid] if self.options[:authid]
  details[:authmethods] = self.options[:authmethods] if self.options[:authmethods]

  # Send Hello message
  hello = WampClient::Message::Hello.new(realm, details)
  self._send_message(hello)
end

#leave(reason = 'wamp.close.normal', message = 'user initiated') ⇒ Object

Leaves the WAMP Router

Parameters:

  • reason (String) (defaults to: 'wamp.close.normal')

    URI signalling the reason for leaving



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/wamp_client/session.rb', line 262

def leave(reason='wamp.close.normal', message='user initiated')
  unless is_open?
    raise RuntimeError, "Session must be opened to call 'leave'"
  end

  self.class.check_uri('reason', reason, true)
  self.class.check_string('message', message, true)

  details = {}
  details[:message] = message

  # Send Goodbye message
  goodbye = WampClient::Message::Goodbye.new(details, reason)
  self._send_message(goodbye)
  self._goodbye_sent = true
end

#on(event, &callback) ⇒ Object

Simple setter for callbacks



171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/wamp_client/session.rb', line 171

def on(event, &callback)
  case event
    when :join
      self.on_join(&callback)
    when :challenge
      self.on_challenge(&callback)
    when :leave
      self.on_leave(&callback)
    else
      raise RuntimeError, "Unknown on(event) '#{event}'"
  end
end

#on_challenge(&on_challenge) ⇒ Object



166
167
168
# File 'lib/wamp_client/session.rb', line 166

def on_challenge(&on_challenge)
  @on_challenge = on_challenge
end

#on_join(&on_join) ⇒ Object



148
149
150
# File 'lib/wamp_client/session.rb', line 148

def on_join(&on_join)
  @on_join = on_join
end

#on_leave(&on_leave) ⇒ Object



156
157
158
# File 'lib/wamp_client/session.rb', line 156

def on_leave(&on_leave)
  @on_leave = on_leave
end

#publish(topic, args = nil, kwargs = nil, options = {}, &callback) ⇒ Object

Publishes and event to a topic

Parameters:

  • topic (String)

    The topic to publish the event to

  • args (Array) (defaults to: nil)

    The arguments

  • kwargs (Hash) (defaults to: nil)

    The keyword arguments

  • options (Hash) (defaults to: {})

    The options for the publish

  • callback (block)

    The callback(publish, error, details) called to signal if the publish was a success or not



564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
# File 'lib/wamp_client/session.rb', line 564

def publish(topic, args=nil, kwargs=nil, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'publish'"
  end

  self.class.check_uri('topic', topic)
  self.class.check_dict('options', options)
  self.class.check_list('args', args, true)
  self.class.check_dict('kwargs', kwargs, true)

  # Create a new publish request
  request = self._generate_id
  self._requests[:publish][request] = {t: topic, a: args, k: kwargs, o: options, c: callback} if options[:acknowledge]

  # Send the message
  publish = WampClient::Message::Publish.new(request, options, topic, args, kwargs)
  self._send_message(publish)
end

#register(procedure, handler, options = nil, interrupt = nil, &callback) ⇒ Object

Register to a procedure

Parameters:

  • procedure (String)

    The procedure to register for

  • handler (lambda)

    The handler(args, kwargs, details) when an invocation is received

  • options (Hash, nil) (defaults to: nil)

    The options for the registration

  • interrupt (lambda) (defaults to: nil)

    The handler(request, mode) when an interrupt is received

  • callback (block)

    The callback(registration, error, details) called to signal if the registration was a success or not



632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
# File 'lib/wamp_client/session.rb', line 632

def register(procedure, handler, options=nil, interrupt=nil, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'register'"
  end

  options ||= {}

  self.class.check_uri('procedure', procedure)
  self.class.check_nil('handler', handler, false)

  # Create a new registration request
  request = self._generate_id
  self._requests[:register][request] = {p: procedure, h: handler, i: interrupt, o: options, c: callback}

  # Send the message
  register = WampClient::Message::Register.new(request, options, procedure)
  self._send_message(register)
end

#subscribe(topic, handler, options = {}, &callback) ⇒ Object

Subscribes to a topic

Parameters:

  • topic (String)

    The topic to subscribe to

  • handler (lambda)

    The handler(args, kwargs, details) when an event is received

  • options (Hash) (defaults to: {})

    The options for the subscription

  • callback (block)

    The callback(subscription, error) called to signal if the subscription was a success or not



413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/wamp_client/session.rb', line 413

def subscribe(topic, handler, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'subscribe'"
  end

  self.class.check_uri('topic', topic)
  self.class.check_dict('options', options)
  self.class.check_nil('handler', handler, false)

  # Create a new subscribe request
  request = self._generate_id
  self._requests[:subscribe][request] = {t: topic, h: handler, o: options, c: callback}

  # Send the message
  subscribe = WampClient::Message::Subscribe.new(request, options, topic)
  self._send_message(subscribe)
end

#unregister(registration, &callback) ⇒ Object

Unregisters from a procedure

Parameters:

  • registration (Registration)

    The registration object from when the registration was created

  • callback (block)

    The callback(registration, error, details) called to signal if the unregistration was a success or not



837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
# File 'lib/wamp_client/session.rb', line 837

def unregister(registration, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'unregister'"
  end

  self.class.check_nil('registration', registration, false)

  # Create a new unsubscribe request
  request = self._generate_id
  self._requests[:unregister][request] = { r: registration, c: callback }

  # Send the message
  unregister = WampClient::Message::Unregister.new(request, registration.id)
  self._send_message(unregister)
end

#unsubscribe(subscription, &callback) ⇒ Object

Unsubscribes from a subscription

Parameters:

  • subscription (Subscription)

    The subscription object from when the subscription was created

  • callback (block)

    The callback(subscription, error, details) called to signal if the subscription was a success or not



497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
# File 'lib/wamp_client/session.rb', line 497

def unsubscribe(subscription, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'unsubscribe'"
  end

  self.class.check_nil('subscription', subscription, false)

  # Create a new unsubscribe request
  request = self._generate_id
  self._requests[:unsubscribe][request] = { s: subscription, c: callback }

  # Send the message
  unsubscribe = WampClient::Message::Unsubscribe.new(request, subscription.id)
  self._send_message(unsubscribe)
end

#yield(request, result, options = {}, check_defer = false) ⇒ Object

Sends a result for the invocation

Parameters:

  • request (Integer)
    • The id of the request

  • result (CallError, CallResult, anything)
    • If it is a CallError, the error will be returned

  • options (Hash) (defaults to: {})
    • The options to be sent with the yield



714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
# File 'lib/wamp_client/session.rb', line 714

def yield(request, result, options={}, check_defer=false)
  # Prevent responses for defers that have already completed or had an error
  if check_defer and not self._defers[request]
    return
  end

  if result.nil?
    result = CallResult.new
  elsif result.is_a?(CallError)
    # Do nothing
  elsif not result.is_a?(CallResult)
    result = CallResult.new([result])
  end

  if result.is_a?(CallError)
    self._send_INVOCATION_error(request, result)
  else
    yield_msg = WampClient::Message::Yield.new(request, options, result.args, result.kwargs)
    self._send_message(yield_msg)
  end
end