Class: Tupelo::Archiver::Worker

Inherits:
Client::Worker show all
Includes:
Funl::HistoryWorker
Defined in:
lib/tupelo/archiver/worker.rb

Constant Summary

Constants inherited from Client::Worker

Client::Worker::GET_TUPLESPACE

Instance Attribute Summary

Attributes inherited from Client::Worker

#arc, #blobber, #client, #client_id, #cmd_queue, #delta, #global_tick, #local_tick, #message_class, #msg_reader_thread, #notify_waiters, #prep_waiters, #read_waiters, #seq, #start_tick, #subspaces, #trans_waiters, #worker_thread

Instance Method Summary collapse

Methods inherited from Client::Worker

#<<, #at, #collect_tags, #handle_matcher, #handle_one_request, #handle_seq_closed, #handle_transaction, #handle_unwaiter, #handle_waiter, #in_thread?, #is_meta_tuple?, #log, #make_template, #observe_started_client, #pot_for, #read_messages_from_seq, #record_history, #rot_for, #run_msg_reader_thread, #run_request_loop, #run_worker_thread, #send_transaction, #sniff_meta_tuple, #start, #stop!, #update_to_tick

Constructor Details

#initialize(*args, **opts) ⇒ Worker

Returns a new instance of Worker.



7
8
9
10
11
# File 'lib/tupelo/archiver/worker.rb', line 7

def initialize *args, **opts
  super *args
  @scheduled_actions = Hash.new {|h,k| h[k] = []}
  @opts = opts
end

Instance Method Details

#at_tick(tick, &action) ⇒ Object



85
86
87
# File 'lib/tupelo/archiver/worker.rb', line 85

def at_tick tick, &action
  @scheduled_actions[tick] << action
end

#fork_for_op(op, sub_delta, tick, stream, req) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/tupelo/archiver/worker.rb', line 62

def fork_for_op op, sub_delta, tick, stream, req
  fork do
    begin
      case op
      when "new client"
        raise "Unimplemented" ###
      when "get range" ### handle this in Funl::HistoryWorker
        raise "Unimplemented" ###
      when GET_TUPLESPACE
        send_tuplespace stream, sub_delta
      else
        raise "Unknown operation: #{op.inspect}"
      end
    rescue EOFError
      log.debug {"#{stream.peer_name} disconnected from archiver"}
    rescue => ex
      log.error "in fork for #{stream || req.io}: #{ex.inspect}"
    end
  end
ensure
  req.io.close
end

#handle_client_request(req) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/tupelo/archiver/worker.rb', line 28

def handle_client_request req
  case req
  when Tupelo::Archiver::ForkRequest
    handle_fork_request req
  else
    super
  end
end

#handle_fork_request(req) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/tupelo/archiver/worker.rb', line 37

def handle_fork_request req
  stream = client.arc_server_stream_for req.io

  begin
    op, sub_delta, tick = stream.read
  rescue EOFError
    log.debug {"#{stream.peer_name} disconnected from archiver"}
    return
  rescue => ex
    log.error "in fork for #{stream || req.io}: #{ex.inspect}"
  end

  log.info {
    "#{stream.peer_name} requested #{op.inspect} at tick=#{tick}" +
      (sub_delta ? " for #{sub_delta}" : "")}

  if tick <= global_tick
    fork_for_op op, sub_delta, tick, stream, req
  else
    at_tick tick do
      fork_for_op op, sub_delta, tick, stream, req
    end
  end
end

#handle_message(msg) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/tupelo/archiver/worker.rb', line 89

def handle_message msg
  super
  actions = @scheduled_actions.delete(global_tick)
  actions and actions.each do |action|
    action.call
  end
end

#send_tuplespace(stream, sub_delta) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/tupelo/archiver/worker.rb', line 97

def send_tuplespace stream, sub_delta
  log.info {
    "send_tuplespace to #{stream.peer_name} " +
    "at tick #{global_tick.inspect} " +
    (sub_delta ? " with sub_delta #{sub_delta.inspect}" : "")}
  
  stream << [global_tick]

  ## better: make use of sub_delta["subscribed_*"] to reduce what
  ## has to be sent.

  if sub_delta["request_all"]
    tuplespace.each do |tuple, count|
      count.times do ## just dump and send str * count?
        stream << tuple ## optimize this, and cache the serial
        ## optimization: use stream.write_to_buffer
      end
    end

  else
    tags = sub_delta["request_tags"] ## use set
    subs = subspaces.select {|sub| tags.include? sub.tag}

    tuplespace.each do |tuple, count|
      ## alternately, store tags with tuples (risk if dynamic spaces)
      if subs.any? {|sub| sub === tuple}
        count.times do
          stream << tuple
          ## optimization: use stream.write_to_buffer
        end
      end
      ## optimize this if templates have simple form, such as
      ##   [ [str1, nil, ...], [str2, nil, ...], ...]
    end
  end

  stream << nil # terminator
  ## stream.flush or close if write_to_buffer used above
end

#stopObject



23
24
25
26
# File 'lib/tupelo/archiver/worker.rb', line 23

def stop
  super
  tuplespace.flush global_tick
end

#tuplespaceObject



13
14
15
16
17
18
19
20
21
# File 'lib/tupelo/archiver/worker.rb', line 13

def tuplespace
  @tuplespace ||= begin
    if client.tuplespace.respond_to? :new
      client.tuplespace.new **@opts
    else
      client.tuplespace
    end
  end
end