Class: ReactiveRecord::Broadcast

Inherits:
Object
  • Object
show all
Defined in:
lib/reactive_record/broadcast.rb

Defined Under Namespace

Classes: SendPacket

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id) ⇒ Broadcast

Returns a new instance of Broadcast.



128
129
130
131
132
133
# File 'lib/reactive_record/broadcast.rb', line 128

def initialize(id)
  @id = id
  @received = Set.new
  @record = {}
  @previous_changes = {}
end

Instance Attribute Details

#recordObject (readonly)

private



118
119
120
# File 'lib/reactive_record/broadcast.rb', line 118

def record
  @record
end

Class Method Details

.after_commit(operation, model) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
# File 'lib/reactive_record/broadcast.rb', line 4

def self.after_commit(operation, model)
  Hyperloop::InternalPolicy.regulate_broadcast(model) do |data|
    if !Hyperloop.on_server? && Hyperloop::Connection.root_path
      send_to_server(operation, data) rescue nil # server no longer running so ignore
    else
      SendPacket.run(data, operation: operation)
    end
  end
rescue ActiveRecord::StatementInvalid => e
  raise e unless e.message == "Could not find table 'hyperloop_connections'"
end

.in_transitObject



124
125
126
# File 'lib/reactive_record/broadcast.rb', line 124

def self.in_transit
  @in_transit ||= Hash.new { |h, k| h[k] = new(k) }
end

.open_channelsObject



120
121
122
# File 'lib/reactive_record/broadcast.rb', line 120

def self.open_channels
  @open_channels ||= Set.new
end

.send_to_server(operation, data) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/reactive_record/broadcast.rb', line 16

def self.send_to_server(operation, data)
  salt = SecureRandom.hex
  authorization = Hyperloop.authorization(salt, data[:channel], data[:broadcast_id])
  raise 'no server running' unless Hyperloop::Connection.root_path
  SendPacket.remote(
    Hyperloop::Connection.root_path,
    data,
    operation: operation,
    salt: salt,
    authorization: authorization
  ).tap { |p| raise p.error if p.rejected? }
end

.to_self(record, data = {}) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/reactive_record/broadcast.rb', line 65

def self.to_self(record, data = {})
  # simulate incoming packet after a local save
  operation = if record.new?
                :create
              elsif record.destroyed?
                :destroy
              else
                :change
              end
  dummy_broadcast = new.local(operation, record, data)
  record.backing_record.sync! data unless operation == :destroy
  ReactiveRecord::Collection.sync_scopes dummy_broadcast
end

Instance Method Details

#complete!Object



167
168
169
# File 'lib/reactive_record/broadcast.rb', line 167

def complete!
  self.class.in_transit.delete @id
end

#destroyed?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/reactive_record/broadcast.rb', line 104

def destroyed?
  @destroyed
end

#integrity_checkObject



184
185
186
187
188
189
190
191
192
# File 'lib/reactive_record/broadcast.rb', line 184

def integrity_check
  @previous_changes.each do |attr, value|
    next if @record.key?(attr) && @record[attr] == value.last
    React::IsomorphicHelpers.log "Broadcast contained change to #{attr} -> #{value.last} "\
                                 "without corresponding value in attributes (#{@record}).\n",
                                 :error
    raise "Broadcast Integrity Error"
  end
end

#klassObject



108
109
110
# File 'lib/reactive_record/broadcast.rb', line 108

def klass
  Object.const_get(@klass)
end

#local(operation, record, data) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/reactive_record/broadcast.rb', line 135

def local(operation, record, data)
  @destroyed = operation == :destroy
  @is_new = operation == :create
  @klass = record.class.name
  @record = data
  record.backing_record.destroyed = false
  @record[:id] = record.id if record.id
  record.backing_record.destroyed = @destroyed
  @backing_record = record.backing_record
  @previous_changes = record.changes
  # attributes = record.attributes
  # data.each do |k, v|
  #   next if klass.reflect_on_association(k) || attributes[k] == v
  #   @previous_changes[k] = [attributes[k], v]
  # end
  self
end

#merge_current_values(br) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/reactive_record/broadcast.rb', line 206

def merge_current_values(br)
  current_values = Hash[*@previous_changes.collect do |attr, values|
    value = attr == :id ? record[:id] : values.first
    if br.attributes.key?(attr) &&
       br.attributes[attr] != br.convert(attr, value) &&
       br.attributes[attr] != br.convert(attr, values.last)
      React::IsomorphicHelpers.log "warning #{attr} has changed locally - will force a reload.\n"\
           "local value: #{br.attributes[attr]} remote value: #{br.convert(attr, value)}->#{br.convert(attr, values.last)}",
           :warning
      return nil
    end
    [attr, value]
  end.compact.flatten(1)]
  # TODO: verify - it used to be current_values.merge(br.attributes)
  klass._react_param_conversion(br.attributes.merge(current_values))
end

#new?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/reactive_record/broadcast.rb', line 100

def new?
  @is_new
end

#process_previous_changesObject



194
195
196
197
198
199
200
201
202
203
204
# File 'lib/reactive_record/broadcast.rb', line 194

def process_previous_changes
  return self unless @backing_record
  integrity_check
  return self if destroyed?
  @record.dup.each do |attr, value|
    next if value_changed?(attr, value)
    @record.delete(attr)
    @previous_changes.delete(attr)
  end
  self
end

#receive(params) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/reactive_record/broadcast.rb', line 153

def receive(params)
  @destroyed = params.operation == :destroy
  @channels ||= Hyperloop::IncomingBroadcast.open_channels.intersection params.channels
  @received << params.channel
  @klass ||= params.klass
  @record.merge! params.record
  @previous_changes.merge! params.previous_changes
  ReactiveRecord::Base.when_not_saving(klass) do
    @backing_record = ReactiveRecord::Base.exists?(klass, params.record[:id])
    @is_new = params.operation == :create && !@backing_record
    yield complete! if @channels == @received
  end
end

#record_with_current_valuesObject



79
80
81
82
83
84
85
86
87
88
# File 'lib/reactive_record/broadcast.rb', line 79

def record_with_current_values
  ReactiveRecord::Base.load_data do
    backing_record = @backing_record || klass.find(record[:id]).backing_record
    if destroyed?
      backing_record.ar_instance
    else
      merge_current_values(backing_record)
    end
  end
end

#record_with_new_valuesObject



90
91
92
93
94
95
96
97
98
# File 'lib/reactive_record/broadcast.rb', line 90

def record_with_new_values
  klass._react_param_conversion(record).tap do |ar_instance|
    if destroyed?
      ar_instance.backing_record.destroy_associations
    elsif new?
      ar_instance.backing_record.initialize_collections
    end
  end
end

#to_sObject



112
113
114
# File 'lib/reactive_record/broadcast.rb', line 112

def to_s
  "klass: #{klass} record: #{record} new?: #{new?} destroyed?: #{destroyed?}"
end

#value_changed?(attr, value) ⇒ Boolean

Returns:

  • (Boolean)


171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/reactive_record/broadcast.rb', line 171

def value_changed?(attr, value)
  attrs = @backing_record.synced_attributes
  return true if attr == @backing_record.primary_key
  return attrs[attr] != @backing_record.convert(attr, value) if attrs.key?(attr)

  assoc = klass.reflect_on_association_by_foreign_key attr

  return value unless assoc
  child = attrs[assoc.attribute]
  return value != child.id if child
  value
end