Class: DotNetServices::MessageBuffer
- Inherits:
-
Object
- Object
- DotNetServices::MessageBuffer
- Defined in:
- lib/dot_net_services/message_buffer.rb
Overview
The MessageBuffer class is used for creating Volatile Message Buffer (VMB) endpoints on the .NET Services bus, and retrieving messages from them.
A VMB is a temporary message buffer that can be used for asynchronous communications between senders and receivers of messages. A process may request creation of a VMB on any URL within a solution namespace. Any HTTP requests (other than GETs) to that URL are stored in the buffer, until some process retrieves them by sending an HTTP X-RETRIEVE request to the management endpoint.
VMB can exist by itself, not tied to the lifecycle of a process that originally created it. Further information about VMBs can be found in .NET Services portal [go.microsoft.com/fwlink/?LinkID=129428].
Usage examples
MessageBuffer instance represents a VMB endpoint. Typical usage looks as follows:
error_handler = lambda { |error| logger.error(error) }
buffer = DotNetServices::MessageBuffer.open_and_poll(
"MySolution/MyVMB",
{:username => 'MySolution', :password => 'my_password'},
error_handler) do
||
... process the
end
This invocation does all of the following:
-
Creates a MessageBuffer instance associated with a specified endpoint (/MySolution/MyVMB)
-
obains a security token from trhe Identity service (see Session for details on that).
-
Creates a VMB on the .NET Services bus if it doesn’t exist yet
-
immediately starts polling it
-
if it retrieves a message, it passes it to the block. The message is an instance of Net::HTTP::Request that looks exactly the same as what the sender originally sent to the bus.
-
if an error occurs while polling, the error is passed to the error_handler block. Polling then continues.
Guidelines
In cases where an application (which in this case means a process) needs to process multiple types of messages, Microsoft recommends to create a single VMB per application, route all message types through the same VMB, and route messages to appropriate processors within the application itself.
In the current version of the API, we provide no explicit support for clustering message processors. If you use MessageBuffer#open_and_poll(), it’s recommended that you only run one cipy of a message processor. If clustering is required (for high availability reasons), you can use lower-level MessageBuffer methods to do it.
.NET Services VMBs provide pub-sub and multicast functionality which can be used even over plain HTTP. Which is amazing. However, we don’t support this functionality in the current version of our library.
Instance Attribute Summary collapse
-
#session ⇒ Object
readonly
Returns the value of attribute session.
Class Method Summary collapse
-
.open(name, auth_data, &block) ⇒ Object
Creates a MessageBuffer instance, acquires a security token, registers a VMB if necessary.
-
.open_and_poll(name, subscription_endpoint, auth_data, error_handler = nil, &block) ⇒ Object
Invoke MessageBuffer#open to create a MessageBuffer instance, subscribe the buffer to receive messages posted to the
subscription_endpoint, then poll it until the containing thread or process is terminated.
Instance Method Summary collapse
-
#delete ⇒ Object
Delete the message buffer.
-
#expires ⇒ Object
Queries the VMB management endpoint for the VMB expiry time.
-
#initialize(name, auth_data) ⇒ MessageBuffer
constructor
Initializes a new MessageBuffer instance.
-
#keep_alive ⇒ Object
Performs an empty POST to the VMB management endpoint, which extends the VMB expiry time without retrieving any messages.
-
#open ⇒ Object
Initiates a VMB session by: * acquiring a security token * checks if there is already a VMB at the endpoint * creating a VMB if necessary.
-
#open_and_poll(subscription_endpoint, error_handler = nil, &block) ⇒ Object
Open the buffer (see #open), subscribe the buffer to receive messages posted to the
subscription_endpoint, then poll it until the containing thread or process is terminated. -
#poll(timeout = 15) ⇒ Object
Poll VMB endpoint for new messages; return the message if one was retrieved, or nil if it wasn’t.
-
#register ⇒ Object
Register the message buffer (by sending X-CREATEMB to the management endpoint).
-
#subscribe(target_path) ⇒ Object
Subscribe to an endpoint.
-
#unsubscribe(target_path) ⇒ Object
Unsubscribe from an endpoint.
Constructor Details
#initialize(name, auth_data) ⇒ MessageBuffer
Initializes a new MessageBuffer instance. Does not create a VMB on the .NET Services bus (use #open for that)
89 90 91 92 |
# File 'lib/dot_net_services/message_buffer.rb', line 89 def initialize(name, auth_data) @name = name @session = Session.new(name, auth_data) end |
Instance Attribute Details
#session ⇒ Object (readonly)
Returns the value of attribute session.
54 55 56 |
# File 'lib/dot_net_services/message_buffer.rb', line 54 def session @session end |
Class Method Details
.open(name, auth_data, &block) ⇒ Object
Creates a MessageBuffer instance, acquires a security token, registers a VMB if necessary. Unlike Session#open, auth_data is mandatory here. If given a block, passes the MessageBuffer instance to the block, and closes it at the end of the block returns the result of the block (if called with a block), or the buffer instance opened
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/dot_net_services/message_buffer.rb', line 62 def open(name, auth_data, &block) buffer = MessageBuffer.new(name, auth_data).open if block_given? begin return yield(buffer) ensure # TODO gracefully close the buffer end else return buffer end end |
.open_and_poll(name, subscription_endpoint, auth_data, error_handler = nil, &block) ⇒ Object
Invoke MessageBuffer#open to create a MessageBuffer instance, subscribe the buffer to receive messages posted to the subscription_endpoint, then poll it until the containing thread or process is terminated.
Whenever a message is retrieved from the buffer, this method passes it to the block. When an error occurs (while polling, or raised by the block), it is passed to error_handler, which should be a lambda, or an object with handle(error) public method. If no error_handler is provided, the error is printed out to STDERR and then ignored.
82 83 84 |
# File 'lib/dot_net_services/message_buffer.rb', line 82 def open_and_poll(name, subscription_endpoint, auth_data, error_handler=nil, &block) MessageBuffer.new(name, auth_data).open_and_poll(subscription_endpoint, error_handler, &block) end |
Instance Method Details
#delete ⇒ Object
Delete the message buffer. This removes the buffer entirely from the bus, not just the local reference to it.
201 202 203 204 205 206 207 208 |
# File 'lib/dot_net_services/message_buffer.rb', line 201 def delete response = @session.delete unless response.is_a?(Net::HTTPNoContent) raise "Deleting VMB failed. Response was #{response.class.name}" end self end |
#expires ⇒ Object
Queries the VMB management endpoint for the VMB expiry time. With every #poll (HTTP X-RETRIEVE to the VMB management endpoint) the bus sets the expiry time of this VMB to 30 minutes later. If 30 minutes pass and there are no further polls, the bus automatically delets the buffer.
213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/dot_net_services/message_buffer.rb', line 213 def expires response = @session.get_from_relay unless response.is_a?(Net::HTTPOK) raise "Querying expiry status of VMB failed. Response was #{response.class}" end expires_header = response["expires"] raise "Querying expiry status of VMB failed. Response doesn't have expires: header" unless expires_header DateTime.parse(expires_header) end |
#keep_alive ⇒ Object
Performs an empty POST to the VMB management endpoint, which extends the VMB expiry time without retrieving any messages.
227 228 229 230 231 232 233 234 235 236 |
# File 'lib/dot_net_services/message_buffer.rb', line 227 def keep_alive # if we don't pass a linefeed as a body to the VMB management endpoint, it responds with HTTP 411 Length Required response = @session.post_to_relay "\n" case response when Net::HTTPSuccess self else raise "POST to the VMB management endpoint failed. Response was #{response.class}" end end |
#open ⇒ Object
Initiates a VMB session by:
-
acquiring a security token
-
checks if there is already a VMB at the endpoint
-
creating a VMB if necessary
113 114 115 116 |
# File 'lib/dot_net_services/message_buffer.rb', line 113 def open register unless @session.get_from_relay.is_a?(Net::HTTPSuccess) self end |
#open_and_poll(subscription_endpoint, error_handler = nil, &block) ⇒ Object
Open the buffer (see #open), subscribe the buffer to receive messages posted to the subscription_endpoint, then poll it until the containing thread or process is terminated.
Whenever a message is retrieved from the buffer, this method passes it to the block. When an error occurs (while polling, or raised by the block), it is passed to error_handler, which should be a lambda, or an object with handle(error) public method. If no error_handler is provided, the error is printed out to STDERR and then ignored.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/dot_net_services/message_buffer.rb', line 126 def open_and_poll(subscription_endpoint, error_handler=nil, &block) raise "MessageBuffer#open_and_poll requires a block" unless block_given? @subscription_endpoint = subscription_endpoint open subscribe(@subscription_endpoint) begin loop do begin = poll block.call() if rescue Object => error if error_handler if error_handler.is_a?(Proc) error_handler.call(error) else error_handler.handle(error) end else STDERR.puts STDERR.puts "Message Buffer Error" STDERR.puts error. STDERR.puts error.backtrace.map { |line| " #{line}"} STDERR.puts end end end ensure @subscription_endpoint = nil end end |
#poll(timeout = 15) ⇒ Object
Poll VMB endpoint for new messages; return the message if one was retrieved, or nil if it wasn’t.
Raises an exception if the response from the bus contains an error code (which usually means that the VMB is not registered, but may mean other things, for example if the bus itself is not accessible for some reason).
timeout parameter regulates how long a polling request will be held by the bus if there are no messages.
An HTTP server that holds HTTP connections because it has nothing to respond with is somewhat uncommon, so a more detailed explanation is due.
Normally, any HTTP interaction begins by opening of a TCP socket from the client to the server. The client then writes the HTTP request into that socket and waits until the server responds back and closes the socket. If the server doesn’t respond for a long time (a minute, usually), the client drops the socket and declares timeout.
Many times, when you poll a VMB endpoint, it will have no messages. If the bus simply came back immediately with “No Content” response, this would cause a VMB subscriber needs to constantly poll the buffer, abusing the bus infrastructure.
.NET Services gets around this problem by making the client connection hang for some time, either until there is a message, or a certain number of seconds has passed, and there is still no message. That number of seconds is what the timeout parameter specifies. Microsoft suggested that 10-20 seconds is reasonable for production purposes. Default value is 15 seconds.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/dot_net_services/message_buffer.rb', line 183 def poll(timeout=15) response = @session.retrieve(:encoding => 'asreply', :timeout => timeout) case response when Net::HTTPNoContent return nil when Net::HTTPNotFound register subscribe(@subscription_endpoint) if @subscription_endpoint return nil when Net::HTTPSuccess return response else raise "Retrieving messages failed. Response was #{response.class.name}" unless response.is_a?(Net::HTTPSuccess) end end |
#register ⇒ Object
Register the message buffer (by sending X-CREATEMB to the management endpoint).
Returns the buffer. Raises an exception if the creation is not successful.
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/dot_net_services/message_buffer.rb', line 97 def register createmb_response = @session.createmb unless createmb_response.is_a?(Net::HTTPCreated) # the buffer may already be there unless @session.get_from_relay.is_a?(Net::HTTPSuccess) raise "Creating VMB failed. Service responded with #{createmb_response.class.name}" end end self end |
#subscribe(target_path) ⇒ Object
Subscribe to an endpoint.
HTTP messages sent to the endpoint will be routed to this message buffer
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/dot_net_services/message_buffer.rb', line 241 def subscribe(target_path) # Changes for .Net Services March 2009 CTP release (M5) tmpurl = @session.authenticator.username + "." + DotNetServices.root_url + "/" + target_path # Replace unwanted slashes tmpurl = tmpurl.gsub("\/\/\/", "\/") tmpurl = tmpurl.gsub("\/\/", "\/") subscription_endpoint_url = "http://" + tmpurl subscribe_response = @session.subscribe(:target => subscription_endpoint_url) case subscribe_response when Net::HTTPSuccess return self when Net::HTTPConflict unsubscribe(target_path) resubscribe_response = @session.subscribe(:target => subscription_endpoint_url) unless resubscribe_response.is_a?(Net::HTTPSuccess) raise "Second X-SUBSCRIBE to VMB management endpoint failed. Response was #{resubscribe_response.class}" end else raise "X-SUBSCRIBE to VMB management endpoint failed. Response was #{subscribe_response.class}" end end |
#unsubscribe(target_path) ⇒ Object
Unsubscribe from an endpoint.
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/dot_net_services/message_buffer.rb', line 266 def unsubscribe(target_path) # Changes for .Net Services March 2009 CTP release (M5) tmpurl = @session.authenticator.username + "." + DotNetServices.root_url + "/" + target_path # Replace unwanted slashes tmpurl = tmpurl.gsub("\/\/\/", "\/") tmpurl = tmpurl.gsub("\/\/", "\/") subscription_endpoint_url = "http://" + tmpurl unsubscribe_response = @session.unsubscribe(:target => subscription_endpoint_url) unless unsubscribe_response.is_a?(Net::HTTPSuccess) raise "X-UNSUBSCRIBE to VMB management endpoint failed. Response was #{subscribe_response.class}" end self end |