Class: Tupelo::PersistentArchiver::Worker

Inherits:
Client::Worker show all
Includes:
Funl::HistoryWorker
Defined in:
lib/tupelo/tuplets/persistent-archiver/worker.rb

Constant Summary

Constants inherited from Client::Worker

Client::Worker::GET_TUPLESPACE

Instance Attribute Summary

Attributes inherited from Client::Worker

#arc, #blobber, #client, #client_id, #cmd_queue, #delta, #global_tick, #local_tick, #message_class, #msg_reader_thread, #notify_waiters, #prep_waiters, #read_waiters, #seq, #start_tick, #subspaces, #trans_waiters, #tuplespace, #worker_thread

Instance Method Summary collapse

Methods inherited from Client::Worker

#<<, #at, #collect_tags, #handle_matcher, #handle_one_request, #handle_seq_closed, #handle_transaction, #handle_unwaiter, #handle_waiter, #in_thread?, #is_meta_tuple?, #log, #make_template, #observe_started_client, #pot_for, #read_messages_from_seq, #record_history, #rot_for, #run_msg_reader_thread, #run_request_loop, #run_worker_thread, #send_transaction, #sniff_meta_tuple, #start, #stop, #stop!, #update_to_tick

Constructor Details

#initialize(*args) ⇒ Worker

Returns a new instance of Worker.



7
8
9
10
# File 'lib/tupelo/tuplets/persistent-archiver/worker.rb', line 7

def initialize *args
  super
  @scheduled_actions = Hash.new {|h,k| h[k] = []}
end

Instance Method Details

#at_tick(tick, &action) ⇒ Object



69
70
71
# File 'lib/tupelo/tuplets/persistent-archiver/worker.rb', line 69

def at_tick tick, &action
  @scheduled_actions[tick] << action
end

#fork_for_op(op, tags, tick, stream, req) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/tupelo/tuplets/persistent-archiver/worker.rb', line 46

def fork_for_op op, tags, tick, stream, req
  fork do
    begin
      case op
      when "new client"
        raise "Unimplemented" ###
      when "get range" ### handle this in Funl::HistoryWorker
        raise "Unimplemented" ###
      when GET_TUPLESPACE
        send_tuplespace stream, tags
      else
        raise "Unknown operation: #{op.inspect}"
      end
    rescue EOFError
      log.debug {"#{stream.peer_name} disconnected from archiver"}
    rescue => ex
      log.error "in fork for #{stream || req.io}: #{ex.inspect}"
    end
  end
ensure
  req.io.close
end

#handle_client_request(req) ⇒ Object



12
13
14
15
16
17
18
19
# File 'lib/tupelo/tuplets/persistent-archiver/worker.rb', line 12

def handle_client_request req
  case req
  when Tupelo::Archiver::ForkRequest
    handle_fork_request req
  else
    super
  end
end

#handle_fork_request(req) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/tupelo/tuplets/persistent-archiver/worker.rb', line 21

def handle_fork_request req
  stream = client.arc_server_stream_for req.io

  begin
    op, tags, tick = stream.read
  rescue EOFError
    log.debug {"#{stream.peer_name} disconnected from archiver"}
    return
  rescue => ex
    log.error "in fork for #{stream || req.io}: #{ex.inspect}"
  end

  log.info {
    "#{stream.peer_name} requested #{op.inspect} at tick=#{tick}" +
      (tags ? " on #{tags}" : "")}

  if tick <= global_tick
    fork_for_op op, tags, tick, stream, req
  else
    at_tick tick do
      fork_for_op op, tags, tick, stream, req
    end
  end
end

#handle_message(msg) ⇒ Object



73
74
75
76
77
78
79
# File 'lib/tupelo/tuplets/persistent-archiver/worker.rb', line 73

def handle_message msg
  super
  actions = @scheduled_actions.delete(global_tick)
  actions and actions.each do |action|
    action.call
  end
end

#send_tuplespace(stream, templates) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/tupelo/tuplets/persistent-archiver/worker.rb', line 81

def send_tuplespace stream, templates
  log.info {
    "send_tuplespace to #{stream.peer_name} " +
    "at tick #{global_tick.inspect} " +
    (templates ? " with templates #{templates.inspect}" : "")}
  
  stream << [global_tick]

  if templates
    templates = templates.map {|t| Tupelo::Client::Template.new t}
    tuplespace.each do |tuple, count|
      if templates.any? {|template| template === tuple}
        count.times do
          stream << tuple
          ## optimization: use stream.write_to_buffer
        end
      end
      ## optimize this if templates have simple form, such as
      ##   [ [str1, nil, ...], [str2, nil, ...], ...]
    end
  else
    tuplespace.each do |tuple, count|
      count.times do ## just dump and send str * count?
        stream << tuple ## optimize this, and cache the serial
        ## optimization: use stream.write_to_buffer
      end
    end
  end

  stream << nil # terminator
  ## stream.flush or close if write_to_buffer used above
end