Class: Bio::BaseSpace::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/basespace/model/multipart_upload.rb

Overview

Multipart file upload consumer class.

TODO This file is not yet ported as the multipartFileUpload class is just mentioned in the comment section of the BaseSpaceAPI file.

Instance Method Summary collapse

Constructor Details

#initialize(task_queue, result_queue, pause_event, halt_event) ⇒ Consumer

Returns a new instance of Consumer.



85
86
87
88
89
90
91
92
93
94
# File 'lib/basespace/model/multipart_upload.rb', line 85

def initialize(task_queue, result_queue, pause_event, halt_event)
  # TODO http://stackoverflow.com/questions/710785/working-with-multiple-processes-in-ruby
  #      http://stackoverflow.com/questions/855805/please-introduce-a-multi-processing-library-in-perl-or-ruby
  #      http://docs.python.jp/2.6/library/multiprocessing.html
  #multiprocessing.Process.__init__(self)
  @task_queue    = task_queue
  @result_queue  = result_queue
  @pause         = pauseEvent
  @halt          = haltEvent
end

Instance Method Details

#runObject

TODO



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/basespace/model/multipart_upload.rb', line 97

def run
  proc_name = self.name
  while True
    unless self.pause.is_set()
      next_task = self.task_queue.get()
    end
          
    if next_task is None or self.halt.is_set() # check if we are out of jobs or have been halted
      # Poison pill means shutdown
      puts "#{proc_name}: Exiting"
      self.task_queue.task_done()
      break
    elsif self.pause.is_set()                   # if we have been paused, sleep for a bit then check back
      puts "#{proc_name}: Paused"
      time.sleep(3)                                       
    else                                       # do some work
      puts "#{proc_name}: #{next_task}"
      answer = next_task()
      self.task_queue.task_done()
      if answer.state == 1                   # case everything went well
        self.result_queue.put(answer)
      else                                   # case something sent wrong
        if next_task.attempt < 3
          self.task_queue.put(next_task)  # queue the guy for a retry
        else                               # problems, shutting down this party
          self.halt.set()                 # halt all other process
        end
      end
    end
  end
end