Class: Concord::Computation

Inherits:
Object
  • Object
show all
Defined in:
lib/concord.rb

Overview

Thrift service definition. Wraps a user-defined computation.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation

Initialize a new ‘Computation` and register it with the proxy

Parameters:

  • handler (Object) (defaults to: nil)

    The user-defined computation

  • proxy_host (String) (defaults to: nil)

    The address the proxy is listening on

  • proxy_port (FixNum) (defaults to: nil)

    The port the proxy is listening on



87
88
89
90
91
# File 'lib/concord.rb', line 87

def initialize(handler: nil, proxy_host: nil, proxy_port: nil)
  self.handler = handler
  self.proxy_host = proxy_host
  self.proxy_port = proxy_port
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



81
82
83
# File 'lib/concord.rb', line 81

def handler
  @handler
end

#proxy_hostObject

Returns the value of attribute proxy_host.



81
82
83
# File 'lib/concord.rb', line 81

def proxy_host
  @proxy_host
end

#proxy_portObject

Returns the value of attribute proxy_port.



81
82
83
# File 'lib/concord.rb', line 81

def proxy_port
  @proxy_port
end

Class Method Details

.serve(computation) ⇒ Object

Initialize a new ‘Computation` and start serving it. This is the only method directly called by users.

Parameters:

  • computation (Object)

    The user-defined computation



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/concord.rb', line 102

def self.serve(computation)
  listen_address = ENV[::Concord::Thrift::KConcordEnvKeyClientListenAddr]
  proxy_address = ENV[::Concord::Thrift::KConcordEnvKeyClientProxyAddr]
  listen_host, listen_port = listen_address.split(':')
  proxy_host, proxy_port = proxy_address.split(':')

  handler = self.new(handler: computation,
                     proxy_host: proxy_host,
                     proxy_port: Integer(proxy_port))

  processor = Thrift::ComputationService::Processor.new(handler)
  transport = ::Thrift::ServerSocket.new(listen_host, Integer(listen_port))
  transport_factory = ::Thrift::FramedTransportFactory.new
  protocol_factory = ::Thrift::BinaryProtocolAcceleratedFactory.new
  # The reason the client computations MUST use a simple blocking server
  # is that we have process_timer and process_record both which exec as
  # a callback in the work thread pool which means that you might get
  # 2 callbacks whichs makes the code multi threaded - we guarantee single
  # thread for each callback
  server = ::Thrift::SimpleServer.new(processor,
                                      transport,
                                      transport_factory,
                                      protocol_factory)
  # Register with localhost proxy. Note that this method is `oneway'
  # which means after final TCP 'ack' it finishes.
  handler.register_with_scheduler
  server.serve
end

Instance Method Details

#boltMetadataConcord::Thrift::ComputationMetadata

Returns The user-defined computation metadata.

Returns:



178
179
180
181
182
183
184
# File 'lib/concord.rb', line 178

def 
   = nil
  log_failure do
     = handler.
  end
  ()
end

#boltProcessRecords(records) ⇒ Object

Process records from upstream. Wraps the user method in transactions, which are returned to the proxy upon completion.

Parameters:



134
135
136
137
138
139
140
141
142
143
144
# File 'lib/concord.rb', line 134

def boltProcessRecords(records)
  txs = []
  records.each do |record|
    ctx = ComputationContext.new(self)
    log_failure do
      handler.process_record(ctx, record)
    end
    txs.push(ctx.transaction)
  end
  txs
end

#boltProcessTimer(key, time) ⇒ Object

Process a timer callback from the proxy. Wraps the user method in a transaction, which is returned to the proxy upon completion.

Parameters:

  • key (String)

    Callback identifier

  • time (FixNum)

    Time this callback was scheduled to trigger.



150
151
152
153
154
155
156
# File 'lib/concord.rb', line 150

def boltProcessTimer(key, time)
  ctx = ComputationContext.new(self)
  log_failure do
    handler.process_timer(ctx, key, time)
  end
  ctx.transaction
end

#destroyObject

The class destructor, use this method to perform any cleanup before the proxy kills the process this instance resides in.



171
172
173
174
175
# File 'lib/concord.rb', line 171

def destroy
  log_failure do
    handler.destroy
  end
end

#enrich_stream(stream) ⇒ Object



230
231
232
233
234
235
236
237
238
239
# File 'lib/concord.rb', line 230

def enrich_stream(stream)
  sm = ::Concord::Thrift::StreamMetadata.new
  if stream.is_a?(Array)
    sm.name = stream.first
    sm.grouping = stream.last
  else
    sm.name = stream
  end
  sm
end

#get_state(key) ⇒ String

Retrieve a binary blob stored in the proxy state

Parameters:

  • key (String)

    Key to fetch from data store

Returns:

  • (String)

    Binary blob of data



189
190
191
# File 'lib/concord.rb', line 189

def get_state(key)
  proxy.getState(key)
end

#initObject

The initialization function, called when the framework is ready to start sending the computation records. Wraps the user method in a transaction, which is returned to the proxy upon completion.



161
162
163
164
165
166
167
# File 'lib/concord.rb', line 161

def init
  ctx = ComputationContext.new(self)
  log_failure do
    handler.init(ctx)
  end
  ctx.transaction
end

#register_with_schedulerObject



93
94
95
96
97
# File 'lib/concord.rb', line 93

def register_with_scheduler
  log_failure do
    proxy.registerWithScheduler()
  end
end

#set_state(key, value) ⇒ Object

Store a binary blob, identified by a key, in the proxy state

Parameters:

  • key (String)

    Key to set in data store

  • value (String)

    Binary blob



196
197
198
# File 'lib/concord.rb', line 196

def set_state(key, value)
  proxy.setState(key, value)
end