Class: ASS::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/ass/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, opts = {}) ⇒ Server

Returns a new instance of Server.



4
5
6
7
8
# File 'lib/ass/server.rb', line 4

def initialize(name,opts={})
  @name = name
  # the server is a fanout (ignores routing key)
  @exchange = ASS.mq.fanout(name,opts)
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



2
3
4
# File 'lib/ass/server.rb', line 2

def name
  @name
end

Instance Method Details

#call(name, method, data, opts = {}, meta = nil) ⇒ Object



117
118
119
120
121
122
123
124
125
# File 'lib/ass/server.rb', line 117

def call(name,method,data,opts={},meta=nil)
  reply_to = opts[:reply_to] || self.name
  ASS.call(name,
           method,
           data,
           opts.merge(:reply_to => reply_to),
           meta)
  
end

#cast(name, method, data, opts = {}, meta = nil) ⇒ Object



127
128
129
130
131
132
133
134
# File 'lib/ass/server.rb', line 127

def cast(name,method,data,opts={},meta=nil)
  reply_to = nil # the remote server will not reply
  ASS.call(name,
           method,
           data,
           opts.merge(:reply_to => nil),
           meta)
end

#exchangeObject



10
11
12
# File 'lib/ass/server.rb', line 10

def exchange
  @exchange
end

#inspectObject



136
137
138
# File 'lib/ass/server.rb', line 136

def inspect
  "#<#{self.class} #{self.name}>"
end

#queue(opts = {}) ⇒ Object



14
15
16
17
18
19
20
# File 'lib/ass/server.rb', line 14

def queue(opts={})
  unless @queue
    @queue ||= ASS.mq.queue(self.name,opts)
    @queue.bind(self.exchange)
  end
  self
end

#react(_callback = nil, _opts = nil, &_block) ⇒ Object

takes options available to MQ::Queue# takes options available to MQ::Queue#subscribe



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/ass/server.rb', line 23

def react(_callback=nil,_opts=nil,&_block)
  if _block
    _opts = _callback
    _callback = _block
  end
  _opts = {} if _opts.nil?
  
  # second call would just swap out the callback.
  @factory = ASS::CallbackFactory.new(_callback)
  
  return(self) if @subscribed
  @subscribed = true
  @ack = _opts[:ack]
  self.queue unless @queue

  # yikes!! potential for scary bugs
  @queue.subscribe(_opts) do |info,payload|
    payload = ::Marshal.load(payload)
    #p [info,payload]
    callback_object = @factory.callback_for(self,info,payload)
    proc { #|callback_object=prepare_callback(@callback,info,payload)|
      operation = proc {
        with_handlers do
          callback_object.send(:on_call,payload[:data])
        end
      }
      done = proc { |result|
        # the client MUST exist, otherwise it's an error.
        ## FIXME it's bad if the server dies b/c
        ## the client isn't there. It's bad that
        ## this can cause the server to fail.
        ##
        ## I am not sure what happens if message
        ## is unroutable. I think it's just
        ## silently dropped unless the mandatory
        ## option is given.
        case status = result[0]
        when :ok
          if info.reply_to
            data = result[1]
            # respond with cast (we don't want
            # to get a response to our response,
            # then respond to the response of
            # this response, and so on.)
            ASS.cast(info.reply_to,
                     payload[:method],
                     data, {
                       :routing_key => info.routing_key,
                       :message_id => info.message_id},
                     payload[:meta])
          end
          info.ack if @ack
        when :resend
          # resend the same message
          ASS.call(self.name,
                   payload[:method],
                   payload[:data], {
                     :reply_to => info.reply_to, # this could be nil for cast
                     :routing_key => info.routing_key,
                     :message_id => info.message_id},
                   payload[:meta])
          info.ack if @ack
        when :discard
          # no response back to client
          info.ack if @ack
        when :error
          # programmatic error. don't ack
          error = result[1]
          if callback_object.respond_to?(:on_error)
            begin
              callback_object.on_error(error,payload[:data])
              info.ack if @ack # successful error handling
            rescue => more_error
              $stderr.puts more_error
              $stderr.puts more_error.backtrace
              ASS.stop
            end
          else
            # unhandled error
            $stderr.puts error
            $stderr.puts error.backtrace
            ASS.stop
          end
          # don't ack.
        end
      }
      EM.defer operation, done
    }.call

    
  end
  self
end