Class: Hoodoo::Communicators::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/hoodoo/communicators/pool.rb

Overview

Maintains a pool of object instances which are expected to be communicating with “the outside world” in some way. A message sent to the pool is replicated to all the communicators in that pool. Some communicators are fast, which means they are called synchronously and expected to return very quickly. Some communicators are slow, which means they are called asynchronously through a work queue.

See #add for more information.

Defined Under Namespace

Classes: QueueEntry, QueueWithTimeout

Constant Summary collapse

MAX_SLOW_QUEUE_SIZE =

Hoodoo::Communicators::Slow subclass communicators are called in their own Threads via a processing Queue. There is the potential for a flood of communications to cause the queue to back up considerably, so a maximum number of messages is defined. If the queue size is _equal to or greater_ than this amount when a message arrives, it will be dropped and a ‘dropped message’ count incremented.

50
THREAD_EXIT_TIMEOUT =

When asking slow communicator threads to exit, a timeout must be used in case the thread doesn’t seem to be responsive. This is the timeout value in seconds - it can take a floating point or integer value.

5
THREAD_WAIT_TIMEOUT =

Analogous to THREAD_WAIT_TIMEOUT but used when waiting for a processing Thread to drain its Queue, without asking it to exit.

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePool

Create a new pool of communicators - instances of subclasses of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow, are added with #add and called with #communicate.



59
60
61
62
# File 'lib/hoodoo/communicators/pool.rb', line 59

def initialize
  @pool  = {}
  @group = ::ThreadGroup.new
end

Instance Attribute Details

#groupObject

Retrieve the ThreadGroup instance managing the collection of slow communicator threads. This is mostly used for testing purposes and has little general purpose utility.



53
54
55
# File 'lib/hoodoo/communicators/pool.rb', line 53

def group
  @group
end

Instance Method Details

#add(communicator) ⇒ Object

Add a communicator instance to the pool. Future calls to #communicate will call the same-named method in that instance.

Subclasses of Hoodoo::Communicators::Slow are called within a processing Thread. Subclasses of Hoodoo::Communicators::Fast are called inline. The instances are called in the order of addition, but since each slow communicator runs in its own Thread, the execution order is indeterminate for such instances.

If a slow communicator’s inbound message queue length matches or exceeds MAX_SLOW_QUEUE_SIZE, messages for that specific communicator will start being dropped until the communicator clears the backlog and at last one space opens on the queue. Slow communicators can detect when this has happened by implementing Hoodoo::Communicators::Slow#dropped in the subclass.

If you pass the same instance more than once, the subsequent calls are ignored. You can add many instances of the same class if that’s useful for any reason.

Returns the passed-in communicator instance parameter, for convenience.

communicator

Instance is to be added to the pool. Must be either a Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow subclass instance.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/hoodoo/communicators/pool.rb', line 90

def add( communicator )
  unless ( communicator.class < Hoodoo::Communicators::Fast ||
           communicator.class < Hoodoo::Communicators::Slow )
    raise "Hoodoo::Communicators::Pool\#add must be called with an instance of a subclass of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow only"
  end

  return if @pool.has_key?( communicator )

  if communicator.is_a?( Hoodoo::Communicators::Fast )
    add_fast_communicator( communicator )
  else
    add_slow_communicator( communicator )
  end

  return communicator
end

#communicate(object) ⇒ Object

Call the #communicate method on each communicator instance added via #add. Each instance is called in the same order as corresponding calls are made to the pool. Across instances, fast communicators are called in the order they were added to the pool, but since each slow communicator runs in its own Thread, execution order is indeterminate.

object

Parameter passed to the communicator subclass instance #communicate methods.



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/hoodoo/communicators/pool.rb', line 149

def communicate( object )
  @pool.each do | communicator, item |

    if item.has_key?( :fast )
      begin
        communicator.communicate( object )
      rescue => exception
        handle_exception( exception, communicator, object )
      end

    else
      data       = item[ :slow       ]
      thread     = data[ :thread     ]
      work_queue = data[ :work_queue ]

      # This is inaccurate if one or more "dropped messages" reports are
      # on the queue, but since some communicators might report them in
      # the same way as other messages, it's not necessarily incorrect
      # either.
      #
      if work_queue.size < MAX_SLOW_QUEUE_SIZE
        dropped = thread[ :dropped_messages ]

        if dropped > 0
          thread[ :dropped_messages ] = 0

          # Opposite of comment above on MAX_SLOW_QUEUE_SIZE check...
          # Yes, this takes up a queue entry and the payload addition
          # afterwards might take it one above max size, but that's OK
          # since this is just a "dropped messages" report and though
          # some communicators might deal with them slowly, others may
          # just ignore them.
          #
          work_queue << QueueEntry.new( dropped: dropped )
        end

        work_queue << QueueEntry.new( payload: object )

      else
        thread[ :dropped_messages ] += 1

      end
    end

  end
end

#remove(communicator) ⇒ Object

Remove a communicator previously added by #add. See that for details.

It is harmless to try and remove communicator instances more than once or to try to remove something that was never added in the first place; the call simply has no side effects.

If removing a slow communicator, its thread will be terminated with default timeout value of THREAD_EXIT_TIMEOUT seconds. For this reason, removing a slow communicator may take a long time.

Returns the passed-in communicator instance parameter, for convenience.

communicator

Instance is to be removed from the pool. Must be either a Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow subclass instance.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/hoodoo/communicators/pool.rb', line 123

def remove( communicator )
  unless ( communicator.class < Hoodoo::Communicators::Fast ||
           communicator.class < Hoodoo::Communicators::Slow )
    raise "Hoodoo::Communicators::Pool\#remove must be called with an instance of a subclass of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow only"
  end

  return unless @pool.has_key?( communicator )

  if communicator.is_a?( Hoodoo::Communicators::Fast )
    remove_fast_communicator( communicator )
  else
    remove_slow_communicator( communicator )
  end

  return communicator
end

#terminate(per_instance_timeout: THREAD_EXIT_TIMEOUT) ⇒ Object

The communication pool is “emptied” by this call, going back to a clean state as if just initialised. New workers can be added via #add and then called via #communicate if you so wish.

Hoodoo::Communciators::Fast subclass instances are removed immediately without complications.

Hoodoo::Communicators::Slow subclass instances in the communication pool are called via a worker Thread; this method shuts down all such worker Threads, clearing their work queues and asking each one to exit (politely). There is no mechanism (other than overall Ruby process exit) available to shut down the Threads by force, so some Threads may not respond and time out.

When this method exits, all workers will have either exited or timed out and possibly still be running, but are considered too slow or dead. No further communications are made to them.

The following named parameters are supported:

per_instance_timeout

Timeout for each slow communicator Thread in seconds. Optional. Default is the value in THREAD_EXIT_TIMEOUT. For example, with three slow communicators in the pool and all three reached a 5 second timeout, the termination method would not return for 15 seconds (3 * 5 seconds full timeout).



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/hoodoo/communicators/pool.rb', line 277

def terminate( per_instance_timeout: THREAD_EXIT_TIMEOUT )
  loop do
    klass, item = @pool.shift() # Hash#shift -> remove a key/value pair.
    break if klass.nil?

    next unless item.has_key?( :slow )
    data = item[ :slow ]

    request_termination_for(
      thread:     data[ :thread     ],
      work_queue: data[ :work_queue ],
      timeout:    per_instance_timeout
    )
  end
end

#wait(per_instance_timeout: THREAD_WAIT_TIMEOUT, communicator: nil) ⇒ Object

This method is only useful if there are any Hoodoo::Communicators::Slow subclass instances in the communication pool. Each instance is called via a worker Thread; this method waits for each communicator to drain its queue before returning. This is useful if you have a requirement to wait for all communications to finish on all threads, presumably for wider synchronisation reasons.

Since fast communicators are called synchronously there is never any need to wait for them, so this call ignores such pool entries.

The following named parameters are supported:

per_instance_timeout

Timeout for each slow communicator Thread in seconds. Optional. Default is the value in THREAD_WAIT_TIMEOUT.

communicator

If you want to wait for specific instance only (see #add), pass it here. If the instance is a fast communicator, or any object not added to the pool, then there is no error raised. The method simply returns immediately.



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/hoodoo/communicators/pool.rb', line 218

def wait( per_instance_timeout: THREAD_WAIT_TIMEOUT,
          communicator:         nil )

  if communicator.nil?
    @pool.each do | communicator, item |
      next unless item.has_key?( :slow )
      data = item[ :slow ]

      wait_for(
        work_queue: data[ :work_queue ],
        sync_queue: data[ :sync_queue ],
        timeout:    per_instance_timeout
      )
    end

  else
    return unless @pool.has_key?( communicator )
    item = @pool[ communicator ]

    return unless item.has_key?( :slow )
    data = item[ :slow ]

    wait_for(
      work_queue: data[ :work_queue ],
      sync_queue: data[ :sync_queue ],
      timeout:    per_instance_timeout
    )

  end
end