Class: WampClient::Session

Inherits:
Object
  • Object
show all
Includes:
Check
Defined in:
lib/wamp_client/session.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Check

included

Constructor Details

#initialize(transport, options = {}) ⇒ Session

Constructor

Parameters:

  • transport (WampClient::Transport::Base)

    The transport that the session will use

  • options (Hash) (defaults to: {})

    Hash containing different session options

Options Hash (options):

  • :authid (String)

    The authentication ID

  • :authmethods (Array)

    Different auth methods that this client supports



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/wamp_client/session.rb', line 180

def initialize(transport, options={})

  # Parameters
  self.id = nil
  self.realm = nil
  self.options = options || {}
  self.verbose = self.options[:verbose]

  # Outstanding Requests
  self._requests = {
      publish: {},
      subscribe: {},
      unsubscribe: {},
      call: {},
      register: {},
      unregister: {}
  }

  # Init Subs and Regs in place
  self._subscriptions = {}
  self._registrations = {}
  self._defers = {}

  # Setup Transport
  self.transport = transport
  self.transport.on_message do |msg|
    self._receive_message(msg)
  end

  # Other parameters
  self._goodbye_sent = false

  # Setup session callbacks
  @on_join = nil
  @on_leave = nil

end

Instance Attribute Details

#_defersObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _defers
  @_defers
end

#_goodbye_sentObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _goodbye_sent
  @_goodbye_sent
end

#_registrationsObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _registrations
  @_registrations
end

#_requestsObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _requests
  @_requests
end

#_subscriptionsObject

Private attributes



173
174
175
# File 'lib/wamp_client/session.rb', line 173

def _subscriptions
  @_subscriptions
end

#idObject

Returns the value of attribute id.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def id
  @id
end

#optionsObject

Returns the value of attribute options.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def options
  @options
end

#realmObject

Returns the value of attribute realm.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def realm
  @realm
end

#transportObject

Returns the value of attribute transport.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def transport
  @transport
end

#verboseObject

Returns the value of attribute verbose.



170
171
172
# File 'lib/wamp_client/session.rb', line 170

def verbose
  @verbose
end

Instance Method Details

#_error_to_hash(msg) ⇒ Object

Converts and error message to a hash

Parameters:



271
272
273
274
275
276
277
# File 'lib/wamp_client/session.rb', line 271

def _error_to_hash(msg)
  {
      error: msg.error,
      args: msg.arguments,
      kwargs: msg.argumentskw
  }
end

#_generate_idObject

Generates an ID according to the specification (Section 5.1.2)



265
266
267
# File 'lib/wamp_client/session.rb', line 265

def _generate_id
  rand(0..9007199254740992)
end

#_process_CALL_error(msg) ⇒ Object

Processes an error from a call request

Parameters:



930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
# File 'lib/wamp_client/session.rb', line 930

def _process_CALL_error(msg)

  # Remove the pending publish and inform the caller of the failure
  call = self._requests[:call].delete(msg.request_request)
  if call

    details = msg.details || {}
    details[:procedure] = call[:p] unless details[:procedure]
    details[:type] = 'call'

    c = call[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_EVENT(msg) ⇒ Object

Processes and event from the broker

Parameters:



456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/wamp_client/session.rb', line 456

def _process_EVENT(msg)

  args = msg.publish_arguments || []
  kwargs = msg.publish_argumentskw || {}

  s = self._subscriptions[msg.subscribed_subscription]
  if s
    details = msg.details || {}
    details[:publication] = msg.published_publication

    h = s.handler
    h.call(args, kwargs, details) if h
  end

end

#_process_INTERRUPT(msg) ⇒ Object

Processes the interrupt

Parameters:



770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
# File 'lib/wamp_client/session.rb', line 770

def _process_INTERRUPT(msg)

  request = msg.invocation_request
  mode = msg.options[:mode]

  defer = self._defers[request]
  if defer
    r = self._registrations[defer.registration]
    if r
      # If it exists, call the interrupt handler to inform it of the interrupt
      i = r.i_handler
      error = nil
      if i
        begin
          error = i.call(request, mode)
        rescue Exception => e
          error = e
        end
      end

      error ||= 'interrupt'

      # Send the error back to the client
      self._send_INVOCATION_error(request, error, true)
    end

    # Delete the defer
    self._defers.delete(request)
  end

end

#_process_INVOCATION(msg) ⇒ Object

Processes and event from the broker

Parameters:



712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
# File 'lib/wamp_client/session.rb', line 712

def _process_INVOCATION(msg)

  request = msg.request
  args = msg.call_arguments || []
  kwargs = msg.call_argumentskw || {}

  details = msg.details || {}
  details[:request] = request

  r = self._registrations[msg.registered_registration]
  if r
    h = r.handler
    if h
      begin
        value = h.call(args, kwargs, details)

        # If a defer was returned, handle accordingly
        if value.is_a? WampClient::Defer::CallDefer
          value.request = request
          value.registration = msg.registered_registration

          # Store the defer
          self._defers[request] = value

          # On complete, send the result
          value.on_complete do |defer, result|
            self._send_INVOCATION_result(defer.request, result, {}, true)
            self._defers.delete(defer.request)
          end

          # On error, send the error
          value.on_error do |defer, error|
            self._send_INVOCATION_error(defer.request, error, true)
            self._defers.delete(defer.request)
          end

          # For progressive, return the progress
          if value.is_a? WampClient::Defer::ProgressiveCallDefer
            value.on_progress do |defer, result|
              self._send_INVOCATION_result(defer.request, result, {progress: true}, true)
            end
          end

        # Else it was a normal response
        else
          self._send_INVOCATION_result(request, value)
        end

      rescue Exception => error
        self._send_INVOCATION_error(request, error)
      end

    end
  end
end

#_process_PUBLISH_error(msg) ⇒ Object

Processes an error from a publish request

Parameters:



584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
# File 'lib/wamp_client/session.rb', line 584

def _process_PUBLISH_error(msg)

  # Remove the pending publish and inform the caller of the failure
  s = self._requests[:publish].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'publish'

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_PUBLISHED(msg) ⇒ Object

Processes the response to a publish request

Parameters:



565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
# File 'lib/wamp_client/session.rb', line 565

def _process_PUBLISHED(msg)

  # Remove the pending publish and alert the callback
  p = self._requests[:publish].delete(msg.publish_request)
  if p

    details = {}
    details[:topic] = p[:t]
    details[:type] = 'publish'
    details[:publication] = msg.publication

    c = p[:c]
    c.call(p, nil, details) if c
  end

end

#_process_REGISTER_error(msg) ⇒ Object

Processes an error from a request

Parameters:



651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
# File 'lib/wamp_client/session.rb', line 651

def _process_REGISTER_error(msg)

  # Remove the pending registration and inform the caller of the failure
  r = self._requests[:register].delete(msg.request_request)
  if r

    details = msg.details || {}
    details[:procedure] = r[:p] unless details[:procedure]
    details[:type] = 'register'

    c = r[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_REGISTERED(msg) ⇒ Object

Processes the response to a register request

Parameters:



631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
# File 'lib/wamp_client/session.rb', line 631

def _process_REGISTERED(msg)

  # Remove the pending subscription, add it to the registered ones, and inform the caller
  r = self._requests[:register].delete(msg.register_request)
  if r
    n_r = Registration.new(r[:p], r[:h], r[:o], r[:i], self, msg.registration)
    self._registrations[msg.registration] = n_r

    details = {}
    details[:procedure] = r[:p]
    details[:type] = 'register'

    c = r[:c]
    c.call(n_r, nil, details) if c
  end

end

#_process_RESULT(msg) ⇒ Object

Processes the response to a publish request

Parameters:



909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
# File 'lib/wamp_client/session.rb', line 909

def _process_RESULT(msg)

  details = msg.details || {}

  call = self._requests[:call][msg.call_request]

  # Don't remove if progress is true and the options had receive_progress true
  self._requests[:call].delete(msg.call_request) unless (details[:progress] and (call and call[:o][:receive_progress]))

  if call
    details[:procedure] = call[:p] unless details[:procedure]
    details[:type] = 'call'

    c = call[:c]
    c.call(CallResult.new(msg.yield_arguments, msg.yield_argumentskw), nil, details) if c
  end

end

#_process_SUBSCRIBE_error(msg) ⇒ Object

Processes an error from a request

Parameters:



438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/wamp_client/session.rb', line 438

def _process_SUBSCRIBE_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  s = self._requests[:subscribe].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'subscribe'

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_SUBSCRIBED(msg) ⇒ Object

Processes the response to a subscribe request

Parameters:



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
# File 'lib/wamp_client/session.rb', line 418

def _process_SUBSCRIBED(msg)

  # Remove the pending subscription, add it to the registered ones, and inform the caller
  s = self._requests[:subscribe].delete(msg.subscribe_request)
  if s

    details = {}
    details[:topic] = s[:t] unless details[:topic]
    details[:type] = 'subscribe'

    n_s = Subscription.new(s[:t], s[:h], s[:o], self, msg.subscription)
    self._subscriptions[msg.subscription] = n_s
    c = s[:c]
    c.call(n_s, nil, details) if c
  end

end

#_process_UNREGISTER_error(msg) ⇒ Object

Processes an error from a request

Parameters:



847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
# File 'lib/wamp_client/session.rb', line 847

def _process_UNREGISTER_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  r = self._requests[:unregister].delete(msg.request_request)
  if r

    details = msg.details || {}
    details[:procedure] = r[:r].procedure unless details[:procedure]
    details[:type] = 'unregister'

    c = r[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_UNREGISTERED(msg) ⇒ Object

Processes the response to a unregister request

Parameters:



827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
# File 'lib/wamp_client/session.rb', line 827

def _process_UNREGISTERED(msg)

  # Remove the pending unregistration, add it to the registered ones, and inform the caller
  r = self._requests[:unregister].delete(msg.unregister_request)
  if r
    r_s = r[:r]
    self._registrations.delete(r_s.id)

    details = {}
    details[:procedure] = r_s.procedure
    details[:type] = 'unregister'

    c = r[:c]
    c.call(r_s, nil, details) if c
  end

end

#_process_UNSUBSCRIBE_error(msg) ⇒ Object

Processes an error from a request

Parameters:



518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
# File 'lib/wamp_client/session.rb', line 518

def _process_UNSUBSCRIBE_error(msg)

  # Remove the pending subscription and inform the caller of the failure
  s = self._requests[:unsubscribe].delete(msg.request_request)
  if s

    details = msg.details || {}
    details[:topic] = s[:s].topic unless details[:topic]
    details[:type] = 'unsubscribe'

    c = s[:c]
    c.call(nil, self._error_to_hash(msg), details) if c
  end

end

#_process_UNSUBSCRIBED(msg) ⇒ Object

Processes the response to a unsubscribe request

Parameters:



497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
# File 'lib/wamp_client/session.rb', line 497

def _process_UNSUBSCRIBED(msg)

  # Remove the pending unsubscription, add it to the registered ones, and inform the caller
  s = self._requests[:unsubscribe].delete(msg.unsubscribe_request)
  if s
    n_s = s[:s]
    self._subscriptions.delete(n_s.id)

    details = {}
    details[:topic] = s[:s].topic
    details[:type] = 'unsubscribe'

    c = s[:c]
    c.call(n_s, nil, details) if c
  end

end

#_receive_message(msg) ⇒ Object

Processes received messages

Parameters:

  • msg (Array)


288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/wamp_client/session.rb', line 288

def _receive_message(msg)

  message = WampClient::Message::Base.parse(msg)

  if self.verbose
    puts 'RX: ' + message.to_s if message
    puts 'RX(non-wamp): ' + msg.to_s unless message
  end

  # WAMP Session is not open
  if self.id.nil?

    # Parse the welcome message
    if message.is_a? WampClient::Message::Welcome
      self.id = message.session
      @on_join.call(message.details) unless @on_join.nil?
    elsif message.is_a? WampClient::Message::Challenge

      if @on_challenge
        signature, extra = @on_challenge.call(message.authmethod, message.extra)
      else
        signature = nil
        extra = nil
      end

      signature ||= ''
      extra ||= {}

      authenticate = WampClient::Message::Authenticate.new(signature, extra)
      self._send_message(authenticate)

    elsif message.is_a? WampClient::Message::Abort
      @on_leave.call(message.reason, message.details) unless @on_leave.nil?
    end

  # Wamp Session is open
  else

    # If goodbye, close the session
    if message.is_a? WampClient::Message::Goodbye

      # If we didn't send the goodbye, respond
      unless self._goodbye_sent
        goodbye = WampClient::Message::Goodbye.new({}, 'wamp.error.goodbye_and_out')
        self._send_message(goodbye)
      end

      # Close out session
      self.id = nil
      self.realm = nil
      self._goodbye_sent = false
      @on_leave.call(message.reason, message.details) unless @on_leave.nil?

    else

      # Process Errors
      if message.is_a? WampClient::Message::Error
        if message.request_type == WampClient::Message::Types::SUBSCRIBE
          self._process_SUBSCRIBE_error(message)
        elsif message.request_type == WampClient::Message::Types::UNSUBSCRIBE
          self._process_UNSUBSCRIBE_error(message)
        elsif message.request_type == WampClient::Message::Types::PUBLISH
          self._process_PUBLISH_error(message)
        elsif message.request_type == WampClient::Message::Types::REGISTER
          self._process_REGISTER_error(message)
        elsif message.request_type == WampClient::Message::Types::UNREGISTER
          self._process_UNREGISTER_error(message)
        elsif message.request_type == WampClient::Message::Types::CALL
          self._process_CALL_error(message)
        else
          # TODO: Some Error??  Not Implemented yet
        end

      # Process Messages
      else
        if message.is_a? WampClient::Message::Subscribed
          self._process_SUBSCRIBED(message)
        elsif message.is_a? WampClient::Message::Unsubscribed
          self._process_UNSUBSCRIBED(message)
        elsif message.is_a? WampClient::Message::Published
          self._process_PUBLISHED(message)
        elsif message.is_a? WampClient::Message::Event
          self._process_EVENT(message)
        elsif message.is_a? WampClient::Message::Registered
          self._process_REGISTERED(message)
        elsif message.is_a? WampClient::Message::Unregistered
          self._process_UNREGISTERED(message)
        elsif message.is_a? WampClient::Message::Invocation
          self._process_INVOCATION(message)
        elsif message.is_a? WampClient::Message::Interrupt
          self._process_INTERRUPT(message)
        elsif message.is_a? WampClient::Message::Result
          self._process_RESULT(message)
        else
          # TODO: Some Error??  Not Implemented yet
        end
      end

    end
  end

end

#_send_INVOCATION_error(request, error, check_defer = false) ⇒ Object

Sends an error back to the caller

Parameters:

  • request (Integer)
    • The request ID

  • error


670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
# File 'lib/wamp_client/session.rb', line 670

def _send_INVOCATION_error(request, error, check_defer=false)
  # Prevent responses for defers that have already completed or had an error
  if check_defer and not self._defers[request]
    return
  end

  if error.nil?
    error = CallError.new('wamp.error.runtime')
  elsif not error.is_a?(CallError)
    error = CallError.new('wamp.error.runtime', [error.to_s])
  end

  error_msg = WampClient::Message::Error.new(WampClient::Message::Types::INVOCATION, request, {}, error.error, error.args, error.kwargs)
  self._send_message(error_msg)
end

#_send_INVOCATION_result(request, result, options = {}, check_defer = false) ⇒ Object

Sends a result for the invocation



687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
# File 'lib/wamp_client/session.rb', line 687

def _send_INVOCATION_result(request, result, options={}, check_defer=false)
  # Prevent responses for defers that have already completed or had an error
  if check_defer and not self._defers[request]
    return
  end

  if result.nil?
    result = CallResult.new
  elsif result.is_a?(CallError)
    # Do nothing
  elsif not result.is_a?(CallResult)
    result = CallResult.new([result])
  end

  if result.is_a?(CallError)
    self._send_INVOCATION_error(request, result)
  else
    yield_msg = WampClient::Message::Yield.new(request, options, result.args, result.kwargs)
    self._send_message(yield_msg)
  end
end

#_send_message(msg) ⇒ Object

Sends a message

Parameters:



281
282
283
284
# File 'lib/wamp_client/session.rb', line 281

def _send_message(msg)
  puts 'TX: ' + msg.to_s if self.verbose
  self.transport.send_message(msg.payload)
end

#call(procedure, args = nil, kwargs = nil, options = {}, &callback) ⇒ Call

Publishes and event to a topic

Parameters:

  • procedure (String)

    The procedure to invoke

  • args (Array) (defaults to: nil)

    The arguments

  • kwargs (Hash) (defaults to: nil)

    The keyword arguments

  • options (Hash) (defaults to: {})

    The options for the call

  • callback (block)

    The callback(result, error, details) called to signal if the call was a success or not

Returns:

  • (Call)

    An object representing the call



874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
# File 'lib/wamp_client/session.rb', line 874

def call(procedure, args=nil, kwargs=nil, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'call'"
  end

  self.class.check_uri('procedure', procedure)
  self.class.check_dict('options', options)
  self.class.check_list('args', args, true)
  self.class.check_dict('kwargs', kwargs, true)

  # Create a new call request
  request = self._generate_id
  self._requests[:call][request] = {p: procedure, a: args, k: kwargs, o: options, c: callback}

  # Send the message
  msg = WampClient::Message::Call.new(request, options, procedure, args, kwargs)
  self._send_message(msg)

  call = Call.new(self, request)

  # Timeout Logic
  if options[:timeout] and options[:timeout] > 0
    self.transport.timer(options[:timeout]) do
      # Once the timer expires, if the call hasn't completed, cancel it
      if self._requests[:call][call.id]
        call.cancel
      end
    end
  end

  call
end

#cancel(call, mode = 'skip') ⇒ Object

Cancels a call

Parameters:

  • call (Call)
    • The call object

  • mode (String) (defaults to: 'skip')
    • The mode of the skip. Options are ‘skip’, ‘kill’, ‘killnowait’



953
954
955
956
957
958
959
960
961
962
963
# File 'lib/wamp_client/session.rb', line 953

def cancel(call, mode='skip')
  unless is_open?
    raise RuntimeError, "Session must be open to call 'cancel'"
  end

  self.class.check_nil('call', call, false)

  # Send the message
  cancel = WampClient::Message::Cancel.new(call.id, { mode: mode })
  self._send_message(cancel)
end

#is_open?Boolean

Returns ‘true’ if the session is open

Returns:

  • (Boolean)


219
220
221
# File 'lib/wamp_client/session.rb', line 219

def is_open?
  !self.id.nil?
end

#join(realm) ⇒ Object

Joins the WAMP Router

Parameters:

  • realm (String)

    The name of the realm



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/wamp_client/session.rb', line 225

def join(realm)
  if is_open?
    raise RuntimeError, "Session must be closed to call 'join'"
  end

  self.class.check_uri('realm', realm)

  self.realm = realm

  details = {}
  details[:roles] = WAMP_FEATURES
  details[:agent] = "Ruby-WampClient-#{WampClient::VERSION}"
  details[:authid] = self.options[:authid] if self.options[:authid]
  details[:authmethods] = self.options[:authmethods] if self.options[:authmethods]

  # Send Hello message
  hello = WampClient::Message::Hello.new(realm, details)
  self._send_message(hello)
end

#leave(reason = 'wamp.close.normal', message = 'user initiated') ⇒ Object

Leaves the WAMP Router

Parameters:

  • reason (String) (defaults to: 'wamp.close.normal')

    URI signalling the reason for leaving



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/wamp_client/session.rb', line 247

def leave(reason='wamp.close.normal', message='user initiated')
  unless is_open?
    raise RuntimeError, "Session must be opened to call 'leave'"
  end

  self.class.check_uri('reason', reason, true)
  self.class.check_string('message', message, true)

  details = {}
  details[:message] = message

  # Send Goodbye message
  goodbye = WampClient::Message::Goodbye.new(details, reason)
  self._send_message(goodbye)
  self._goodbye_sent = true
end

#on_challenge(&on_challenge) ⇒ Object



166
167
168
# File 'lib/wamp_client/session.rb', line 166

def on_challenge(&on_challenge)
  @on_challenge = on_challenge
end

#on_join(&on_join) ⇒ Object



148
149
150
# File 'lib/wamp_client/session.rb', line 148

def on_join(&on_join)
  @on_join = on_join
end

#on_leave(&on_leave) ⇒ Object



156
157
158
# File 'lib/wamp_client/session.rb', line 156

def on_leave(&on_leave)
  @on_leave = on_leave
end

#publish(topic, args = nil, kwargs = nil, options = {}, &callback) ⇒ Object

Publishes and event to a topic

Parameters:

  • topic (String)

    The topic to publish the event to

  • args (Array) (defaults to: nil)

    The arguments

  • kwargs (Hash) (defaults to: nil)

    The keyword arguments

  • options (Hash) (defaults to: {})

    The options for the publish

  • callback (block)

    The callback(publish, error, details) called to signal if the publish was a success or not



544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
# File 'lib/wamp_client/session.rb', line 544

def publish(topic, args=nil, kwargs=nil, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'publish'"
  end

  self.class.check_uri('topic', topic)
  self.class.check_dict('options', options)
  self.class.check_list('args', args, true)
  self.class.check_dict('kwargs', kwargs, true)

  # Create a new publish request
  request = self._generate_id
  self._requests[:publish][request] = {t: topic, a: args, k: kwargs, o: options, c: callback} if options[:acknowledge]

  # Send the message
  publish = WampClient::Message::Publish.new(request, options, topic, args, kwargs)
  self._send_message(publish)
end

#register(procedure, handler, options = nil, interrupt = nil, &callback) ⇒ Object

Register to a procedure

Parameters:

  • procedure (String)

    The procedure to register for

  • handler (lambda)

    The handler(args, kwargs, details) when an invocation is received

  • options (Hash, nil) (defaults to: nil)

    The options for the registration

  • interrupt (lambda) (defaults to: nil)

    The handler(request, mode) when an interrupt is received

  • callback (block)

    The callback(registration, error, details) called to signal if the registration was a success or not



610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/wamp_client/session.rb', line 610

def register(procedure, handler, options=nil, interrupt=nil, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'register'"
  end

  options ||= {}

  self.class.check_uri('procedure', procedure)
  self.class.check_nil('handler', handler, false)

  # Create a new registration request
  request = self._generate_id
  self._requests[:register][request] = {p: procedure, h: handler, i: interrupt, o: options, c: callback}

  # Send the message
  register = WampClient::Message::Register.new(request, options, procedure)
  self._send_message(register)
end

#subscribe(topic, handler, options = {}, &callback) ⇒ Object

Subscribes to a topic

Parameters:

  • topic (String)

    The topic to subscribe to

  • handler (lambda)

    The handler(args, kwargs, details) when an event is received

  • options (Hash) (defaults to: {})

    The options for the subscription

  • callback (block)

    The callback(subscription, error) called to signal if the subscription was a success or not



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/wamp_client/session.rb', line 398

def subscribe(topic, handler, options={}, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'subscribe'"
  end

  self.class.check_uri('topic', topic)
  self.class.check_dict('options', options)
  self.class.check_nil('handler', handler, false)

  # Create a new subscribe request
  request = self._generate_id
  self._requests[:subscribe][request] = {t: topic, h: handler, o: options, c: callback}

  # Send the message
  subscribe = WampClient::Message::Subscribe.new(request, options, topic)
  self._send_message(subscribe)
end

#unregister(registration, &callback) ⇒ Object

Unregisters from a procedure

Parameters:

  • registration (Registration)

    The registration object from when the registration was created

  • callback (block)

    The callback(registration, error, details) called to signal if the unregistration was a success or not



809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
# File 'lib/wamp_client/session.rb', line 809

def unregister(registration, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'unregister'"
  end

  self.class.check_nil('registration', registration, false)

  # Create a new unsubscribe request
  request = self._generate_id
  self._requests[:unregister][request] = { r: registration, c: callback }

  # Send the message
  unregister = WampClient::Message::Unregister.new(request, registration.id)
  self._send_message(unregister)
end

#unsubscribe(subscription, &callback) ⇒ Object

Unsubscribes from a subscription

Parameters:

  • subscription (Subscription)

    The subscription object from when the subscription was created

  • callback (block)

    The callback(subscription, error, details) called to signal if the subscription was a success or not



479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
# File 'lib/wamp_client/session.rb', line 479

def unsubscribe(subscription, &callback)
  unless is_open?
    raise RuntimeError, "Session must be open to call 'unsubscribe'"
  end

  self.class.check_nil('subscription', subscription, false)

  # Create a new unsubscribe request
  request = self._generate_id
  self._requests[:unsubscribe][request] = { s: subscription, c: callback }

  # Send the message
  unsubscribe = WampClient::Message::Unsubscribe.new(request, subscription.id)
  self._send_message(unsubscribe)
end