Class: Isono::MessagingClient

Inherits:
Node
  • Object
show all
Includes:
Logger
Defined in:
lib/isono/messaging_client.rb

Overview

mc = MessagingClient.start puts mc.request(‘endpoint’, ‘func1’, xxxx, xxxx) puts mc.request(‘endpoint’, ‘func2’, xxx, xxx)

mc = MessagingClient.start endpoint = mc.sync_rpc(‘endpoint’) endpoint.func1(xxxx, xxxx) endpoint.func2(xxx, xxx)

Examples:

Sync RPC call with object method.

Sync RPC call using delegated object

Defined Under Namespace

Classes: RpcSyncDelegator

Instance Attribute Summary

Attributes inherited from Node

#boot_token, #manifest, #value_objects

Attributes included from AmqpClient

#amqp_client, #mq

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, initialize

Methods inherited from Node

#after_close, #after_connect, #before_close, #before_connect, inherited, instance, #node_id, stop

Methods included from EventObservable

#add_observer, #add_observer_once, #fire_event, #remove_observer

Methods included from AmqpClient

#after_close, #after_connect, #amq, #amqp_server_uri, #before_close, #before_connect, #close, #connect, #connected?, #create_channel, #on_close, #on_connect, #on_disconnected, #publish_to

Constructor Details

#initialize(m = nil, &blk) ⇒ MessagingClient

Returns a new instance of MessagingClient.



55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/isono/messaging_client.rb', line 55

def initialize(m=nil, &blk)
  m ||= Manifest.new(Dir.pwd) {
    node_name 'msgclient'
    node_instance_id Util.gen_id

    load_module NodeModules::EventChannel
    load_module NodeModules::RpcChannel
    load_module NodeModules::JobChannel
  }
  m.instance_eval(&blk) if blk
  super(m)
end

Class Method Details

.start(amqp_uri, manifest = nil, &blk) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/isono/messaging_client.rb', line 21

def self.start(amqp_uri, manifest=nil, &blk)
  node = self.new(manifest, &blk)

  if EventMachine.reactor_thread?
    EventMachine.schedule {
      node.connect(amqp_uri)
    }
  else
    q = ::Queue.new
    EventMachine.schedule {
      node.connect(amqp_uri) { |type|
        q << type
      }
    }
    case q.deq
    when :success
    when :error
      raise "Connection failed: #{amqp_uri}"
    end
  end

  node
end

Instance Method Details

#event_publish(evname, opts = {}) ⇒ Object



111
112
113
# File 'lib/isono/messaging_client.rb', line 111

def event_publish(evname, opts={})
  NodeModules::EventChannel.new(self).publish(evname, opts)
end

#request(endpoint, key, *args, &blk) ⇒ Object



102
103
104
105
# File 'lib/isono/messaging_client.rb', line 102

def request(endpoint, key, *args, &blk)
  rpc = NodeModules::RpcChannel.new(self)
  rpc.request(endpoint, key, *args, &blk)
end

#stopObject



45
46
47
48
49
50
51
52
53
# File 'lib/isono/messaging_client.rb', line 45

def stop
  if connected?
    close {
      EventMachine.schedule {
        EventMachine.stop
      }
    }
  end
end

#submit(job_endpoint, key, *args) ⇒ Object



107
108
109
# File 'lib/isono/messaging_client.rb', line 107

def submit(job_endpoint, key, *args)
  NodeModules::JobChannel.new(self).submit(job_endpoint, key, *args)
end

#sync_rpc(endpoint, opts = {}) ⇒ Object



97
98
99
100
# File 'lib/isono/messaging_client.rb', line 97

def sync_rpc(endpoint, opts={})
  rpc = NodeModules::RpcChannel.new(self)
  RpcSyncDelegator.new(rpc, endpoint, opts)
end