Class: DotNetServices::MessageBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/dot_net_services/message_buffer.rb

Overview

The MessageBuffer class is used for creating Volatile Message Buffer (VMB) endpoints on the .NET Services bus, and retrieving messages from them.

A VMB is a temporary message buffer that can be used for asynchronous communications between senders and receivers of messages. A process may request creation of a VMB on any URL within a solution namespace. Any HTTP requests (other than GETs) to that URL are stored in the buffer, until some process retrieves them by sending an HTTP X-RETRIEVE request to the management endpoint.

VMB can exist by itself, not tied to the lifecycle of a process that originally created it. Further information about VMBs can be found in .NET Services portal [go.microsoft.com/fwlink/?LinkID=129428].

Usage examples

MessageBuffer instance represents a VMB endpoint. Typical usage looks as follows:

error_handler = lambda { |error| logger.error(error) }

buffer = DotNetServices::MessageBuffer.open_and_poll(
        "MySolution/MyVMB",
        {:username => 'MySolution', :password => 'my_password'},
        error_handler) do
  |message|
  ... process the message
end

This invocation does all of the following:

  • Creates a MessageBuffer instance associated with a specified endpoint (/MySolution/MyVMB)

  • obains a security token from trhe Identity service (see Session for details on that).

  • Creates a VMB on the .NET Services bus if it doesn’t exist yet

  • immediately starts polling it

  • if it retrieves a message, it passes it to the block. The message is an instance of Net::HTTP::Request that looks exactly the same as what the sender originally sent to the bus.

  • if an error occurs while polling, the error is passed to the error_handler block. Polling then continues.

Guidelines

In cases where an application (which in this case means a process) needs to process multiple types of messages, Microsoft recommends to create a single VMB per application, route all message types through the same VMB, and route messages to appropriate processors within the application itself.

In the current version of the API, we provide no explicit support for clustering message processors. If you use MessageBuffer#open_and_poll(), it’s recommended that you only run one cipy of a message processor. If clustering is required (for high availability reasons), you can use lower-level MessageBuffer methods to do it.

.NET Services VMBs provide pub-sub and multicast functionality which can be used even over plain HTTP. Which is amazing. However, we don’t support this functionality in the current version of our library.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, auth_data) ⇒ MessageBuffer

Initializes a new MessageBuffer instance. Does not create a VMB on the .NET Services bus (use #open for that)



89
90
91
92
# File 'lib/dot_net_services/message_buffer.rb', line 89

def initialize(name, auth_data)
  @name = name
  @session = Session.new(name, auth_data)
end

Instance Attribute Details

#sessionObject (readonly)

Returns the value of attribute session.



54
55
56
# File 'lib/dot_net_services/message_buffer.rb', line 54

def session
  @session
end

Class Method Details

.open(name, auth_data, &block) ⇒ Object

Creates a MessageBuffer instance, acquires a security token, registers a VMB if necessary. Unlike Session#open, auth_data is mandatory here. If given a block, passes the MessageBuffer instance to the block, and closes it at the end of the block returns the result of the block (if called with a block), or the buffer instance opened



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/dot_net_services/message_buffer.rb', line 62

def open(name, auth_data, &block)
  buffer = MessageBuffer.new(name, auth_data).open
  if block_given?
    begin
      return yield(buffer)
    ensure
      # TODO gracefully close the buffer

    end
  else
    return buffer
  end
end

.open_and_poll(name, subscription_endpoint, auth_data, error_handler = nil, &block) ⇒ Object

Invoke MessageBuffer#open to create a MessageBuffer instance, subscribe the buffer to receive messages posted to the subscription_endpoint, then poll it until the containing thread or process is terminated.

Whenever a message is retrieved from the buffer, this method passes it to the block. When an error occurs (while polling, or raised by the block), it is passed to error_handler, which should be a lambda, or an object with handle(error) public method. If no error_handler is provided, the error is printed out to STDERR and then ignored.



82
83
84
# File 'lib/dot_net_services/message_buffer.rb', line 82

def open_and_poll(name, subscription_endpoint, auth_data, error_handler=nil, &block)
  MessageBuffer.new(name, auth_data).open_and_poll(subscription_endpoint, error_handler, &block)
end

Instance Method Details

#deleteObject

Delete the message buffer. This removes the buffer entirely from the bus, not just the local reference to it.



201
202
203
204
205
206
207
208
# File 'lib/dot_net_services/message_buffer.rb', line 201

def delete
  response = @session.delete

  unless response.is_a?(Net::HTTPNoContent)
    raise "Deleting VMB failed. Response was #{response.class.name}"
  end
  self
end

#expiresObject

Queries the VMB management endpoint for the VMB expiry time. With every #poll (HTTP X-RETRIEVE to the VMB management endpoint) the bus sets the expiry time of this VMB to 30 minutes later. If 30 minutes pass and there are no further polls, the bus automatically delets the buffer.



213
214
215
216
217
218
219
220
221
222
223
# File 'lib/dot_net_services/message_buffer.rb', line 213

def expires
  response = @session.get_from_relay

  unless response.is_a?(Net::HTTPOK)
    raise "Querying expiry status of VMB failed. Response was #{response.class}"
  end
  
  expires_header = response["expires"]
  raise "Querying expiry status of VMB failed. Response doesn't have expires: header" unless expires_header
  DateTime.parse(expires_header)
end

#keep_aliveObject

Performs an empty POST to the VMB management endpoint, which extends the VMB expiry time without retrieving any messages.



227
228
229
230
231
232
233
234
235
236
# File 'lib/dot_net_services/message_buffer.rb', line 227

def keep_alive
  # if we don't pass a linefeed as a body to the VMB management endpoint, it responds with HTTP 411 Length Required

  response = @session.post_to_relay "\n"
  case response
  when Net::HTTPSuccess
    self
  else
    raise "POST to the VMB management endpoint failed. Response was #{response.class}"
  end
end

#openObject

Initiates a VMB session by:

  • acquiring a security token

  • checks if there is already a VMB at the endpoint

  • creating a VMB if necessary



113
114
115
116
# File 'lib/dot_net_services/message_buffer.rb', line 113

def open
  register unless @session.get_from_relay.is_a?(Net::HTTPSuccess)
  self
end

#open_and_poll(subscription_endpoint, error_handler = nil, &block) ⇒ Object

Open the buffer (see #open), subscribe the buffer to receive messages posted to the subscription_endpoint, then poll it until the containing thread or process is terminated.

Whenever a message is retrieved from the buffer, this method passes it to the block. When an error occurs (while polling, or raised by the block), it is passed to error_handler, which should be a lambda, or an object with handle(error) public method. If no error_handler is provided, the error is printed out to STDERR and then ignored.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/dot_net_services/message_buffer.rb', line 126

def open_and_poll(subscription_endpoint, error_handler=nil, &block)
  raise "MessageBuffer#open_and_poll requires a block" unless block_given?

  @subscription_endpoint = subscription_endpoint 

  open
  subscribe(@subscription_endpoint)

  begin
    loop do
      begin
        message = poll
        block.call(message) if message
      rescue Object => error
        if error_handler
          if error_handler.is_a?(Proc)
            error_handler.call(error)
          else
            error_handler.handle(error)
          end
        else
          STDERR.puts
          STDERR.puts "Message Buffer Error"
          STDERR.puts error.message
          STDERR.puts error.backtrace.map { |line| "  #{line}"}
          STDERR.puts
        end
      end
    end
  ensure
    @subscription_endpoint = nil
  end
end

#poll(timeout = 15) ⇒ Object

Poll VMB endpoint for new messages; return the message if one was retrieved, or nil if it wasn’t.

Raises an exception if the response from the bus contains an error code (which usually means that the VMB is not registered, but may mean other things, for example if the bus itself is not accessible for some reason).

timeout parameter regulates how long a polling request will be held by the bus if there are no messages.

An HTTP server that holds HTTP connections because it has nothing to respond with is somewhat uncommon, so a more detailed explanation is due.

Normally, any HTTP interaction begins by opening of a TCP socket from the client to the server. The client then writes the HTTP request into that socket and waits until the server responds back and closes the socket. If the server doesn’t respond for a long time (a minute, usually), the client drops the socket and declares timeout.

Many times, when you poll a VMB endpoint, it will have no messages. If the bus simply came back immediately with “No Content” response, this would cause a VMB subscriber needs to constantly poll the buffer, abusing the bus infrastructure.

.NET Services gets around this problem by making the client connection hang for some time, either until there is a message, or a certain number of seconds has passed, and there is still no message. That number of seconds is what the timeout parameter specifies. Microsoft suggested that 10-20 seconds is reasonable for production purposes. Default value is 15 seconds.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/dot_net_services/message_buffer.rb', line 183

def poll(timeout=15)
  response = @session.retrieve(:encoding => 'asreply', :timeout => timeout)
  case response
  when Net::HTTPNoContent
    return nil
  when Net::HTTPNotFound
    register
    subscribe(@subscription_endpoint) if @subscription_endpoint
    return nil
  when Net::HTTPSuccess
    return response
  else
    raise "Retrieving messages failed. Response was #{response.class.name}" unless response.is_a?(Net::HTTPSuccess)
  end
end

#registerObject

Register the message buffer (by sending X-CREATEMB to the management endpoint).

Returns the buffer. Raises an exception if the creation is not successful.



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/dot_net_services/message_buffer.rb', line 97

def register
  createmb_response = @session.createmb

  unless createmb_response.is_a?(Net::HTTPCreated)
    # the buffer may already be there

    unless @session.get_from_relay.is_a?(Net::HTTPSuccess)
      raise "Creating VMB failed. Service responded with #{createmb_response.class.name}"
    end
  end
  self
end

#subscribe(target_path) ⇒ Object

Subscribe to an endpoint.

HTTP messages sent to the endpoint will be routed to this message buffer



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/dot_net_services/message_buffer.rb', line 241

def subscribe(target_path)
  # Changes for .Net Services March 2009 CTP release (M5)

  tmpurl = @session.authenticator.username + "." + DotNetServices.root_url + "/" + target_path

  # Replace unwanted slashes

  tmpurl = tmpurl.gsub("\/\/\/", "\/")
  tmpurl = tmpurl.gsub("\/\/", "\/")
  subscription_endpoint_url = "http://" + tmpurl

  subscribe_response = @session.subscribe(:target => subscription_endpoint_url)
  case subscribe_response
  when Net::HTTPSuccess
    return self
  when Net::HTTPConflict
    unsubscribe(target_path)
    resubscribe_response = @session.subscribe(:target => subscription_endpoint_url)
    unless resubscribe_response.is_a?(Net::HTTPSuccess)
      raise "Second X-SUBSCRIBE to VMB management endpoint failed. Response was #{resubscribe_response.class}"
    end
  else
    raise "X-SUBSCRIBE to VMB management endpoint failed. Response was #{subscribe_response.class}"
  end
end

#unsubscribe(target_path) ⇒ Object

Unsubscribe from an endpoint.



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/dot_net_services/message_buffer.rb', line 266

def unsubscribe(target_path)
  # Changes for .Net Services March 2009 CTP release (M5)

  tmpurl = @session.authenticator.username + "." + DotNetServices.root_url + "/" + target_path

  # Replace unwanted slashes

  tmpurl = tmpurl.gsub("\/\/\/", "\/")
  tmpurl = tmpurl.gsub("\/\/", "\/")
  subscription_endpoint_url = "http://" + tmpurl

  unsubscribe_response = @session.unsubscribe(:target => subscription_endpoint_url)
  unless unsubscribe_response.is_a?(Net::HTTPSuccess)
    raise "X-UNSUBSCRIBE to VMB management endpoint failed. Response was #{subscribe_response.class}"
  end
  self
end