Class: Droonga::JobQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/droonga/job_queue.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database_path, queue_name) ⇒ JobQueue

Returns a new instance of JobQueue.



36
37
38
39
# File 'lib/droonga/job_queue.rb', line 36

def initialize(database_path, queue_name)
  @database_path = database_path
  @queue_name = queue_name
end

Class Method Details

.ensure_schema(database_path, queue_name) ⇒ Object



24
25
26
27
# File 'lib/droonga/job_queue.rb', line 24

def ensure_schema(database_path, queue_name)
  schema = JobQueueSchema.new(database_path, queue_name)
  schema.ensure_created
end

.open(database_path, queue_name) ⇒ Object



29
30
31
32
33
# File 'lib/droonga/job_queue.rb', line 29

def open(database_path, queue_name)
  job_queue = new(database_path, queue_name)
  job_queue.open
  job_queue
end

Instance Method Details

#closeObject



74
75
76
77
78
79
80
81
# File 'lib/droonga/job_queue.rb', line 74

def close
  @queue = nil
  if @database
    @database.close
    @context.close
    @database = @context = nil
  end
end

#log_tagObject



83
84
85
# File 'lib/droonga/job_queue.rb', line 83

def log_tag
  "[#{Process.ppid}][#{Process.pid}] job_queue"
end

#openObject



41
42
43
44
45
46
47
# File 'lib/droonga/job_queue.rb', line 41

def open
  @context = Groonga::Context.new
  @database = @context.open_database(@database_path)
  @context.encoding = :none

  @queue = @context[@queue_name]
end

#pull_messageObject



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/droonga/job_queue.rb', line 58

def pull_message
  packed_message = nil
  @queue.pull do |record|
    if record
      packed_message = record.message
      record.delete
    end
  end
  return nil unless packed_message
  MessagePack.unpack(packed_message)
end

#push_message(message) ⇒ Object



49
50
51
52
53
54
55
56
# File 'lib/droonga/job_queue.rb', line 49

def push_message(message)
  $log.trace("#{log_tag}: push_message: start")
  packed_message = message.to_msgpack
  @queue.push do |record|
    record.message = packed_message
  end
  $log.trace("#{log_tag}: push_message: done")
end

#unblockObject



70
71
72
# File 'lib/droonga/job_queue.rb', line 70

def unblock
  @queue.unblock
end