Class: Yup::State

Inherits:
Object
  • Object
show all
Defined in:
lib/yup/state.rb,
lib/yup/request_forwarder.rb

Defined Under Namespace

Classes: FeedbackHandler, RequestForwarder

Constant Summary collapse

RE_LEN =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, name, feedback_channel) ⇒ State

Returns a new instance of State.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/yup/state.rb', line 22

def initialize(path, name, feedback_channel)
  @path = path
  @name = name
  @feedback_channel = feedback_channel

  @logger = Yup.logger.clone
  @logger.progname = "Yup::State"

  FileUtils.mkdir_p(@path)
  @env   = Bdb::Env.new(0)
  @env   = @env.open(@path, Bdb::DB_CREATE | Bdb::DB_INIT_MPOOL | Bdb::DB_INIT_CDB, 0)
  @queue = @env.db
  @queue.re_len = RE_LEN
  @queue.open(nil, @name, nil, Bdb::Db::QUEUE, Bdb::DB_CREATE, 0)
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



14
15
16
# File 'lib/yup/state.rb', line 14

def queue
  @queue
end

Class Method Details

.repair_if_need(path) ⇒ Object



16
17
18
19
20
# File 'lib/yup/state.rb', line 16

def self.repair_if_need(path)
  env = Bdb::Env.new(0)
  env.open(path, Bdb::DB_CREATE | Bdb::DB_INIT_TXN | Bdb::DB_RECOVER, 0)
  env.close()
end

Instance Method Details

#bpopObject



47
48
49
50
51
# File 'lib/yup/state.rb', line 47

def bpop
  data = @queue.get(nil, "", nil, Bdb::DB_CONSUME_WAIT)
  @logger.debug { "Bpoped: #{data.strip}" }
  data
end

#closeObject



60
61
62
# File 'lib/yup/state.rb', line 60

def close
  @queue.close(0)
end

#push(data) ⇒ Object



38
39
40
41
42
43
44
45
# File 'lib/yup/state.rb', line 38

def push(data)
  @logger.debug { "Push: #{data}" }
  i = 0
  until (chunk = data.slice(i, RE_LEN)).nil?
    @queue.put(nil, "", chunk, Bdb::DB_APPEND)
    i += @queue.re_len
  end
end

#to_feedback(data) ⇒ Object



53
54
55
56
57
58
# File 'lib/yup/state.rb', line 53

def to_feedback(data)
  @logger.debug { "Push to the feedback channel: #{data.strip}" }
  sock = UNIXSocket.new(@feedback_channel)
  sock.send(data, 0)
  sock.close
end