Class: Concord::Computation

Inherits:
Object
  • Object
show all
Defined in:
lib/concord.rb

Overview

Thrift service definition. Wraps a user-defined computation.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation

Initialize a new Computation and register it with the proxy

Parameters:

  • handler (Object) (defaults to: nil)

    The user-defined computation

  • proxy_host (String) (defaults to: nil)

    The address the proxy is listening on

  • proxy_port (FixNum) (defaults to: nil)

    The port the proxy is listening on



87
88
89
90
91
# File 'lib/concord.rb', line 87

def initialize(handler: nil, proxy_host: nil, proxy_port: nil)
  self.handler = handler
  self.proxy_host = proxy_host
  self.proxy_port = proxy_port
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



81
82
83
# File 'lib/concord.rb', line 81

def handler
  @handler
end

#proxy_hostObject

Returns the value of attribute proxy_host.



81
82
83
# File 'lib/concord.rb', line 81

def proxy_host
  @proxy_host
end

#proxy_portObject

Returns the value of attribute proxy_port.



81
82
83
# File 'lib/concord.rb', line 81

def proxy_port
  @proxy_port
end

Class Method Details

.serve(computation) ⇒ Object

Initialize a new Computation and start serving it. This is the only method directly called by users.

Parameters:

  • computation (Object)

    The user-defined computation



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/concord.rb', line 102

def self.serve(computation)
  listen_address = ENV[::Concord::Thrift::KConcordEnvKeyClientListenAddr]
  proxy_address = ENV[::Concord::Thrift::KConcordEnvKeyClientProxyAddr]
  listen_host, listen_port = listen_address.split(':')
  proxy_host, proxy_port = proxy_address.split(':')

  handler = self.new(handler: computation,
                     proxy_host: proxy_host,
                     proxy_port: Integer(proxy_port))

  processor = Thrift::ComputationService::Processor.new(handler)
  transport = ::Thrift::ServerSocket.new(listen_host, Integer(listen_port))
  transport_factory = ::Thrift::FramedTransportFactory.new
  protocol_factory = ::Thrift::BinaryProtocolAcceleratedFactory.new
  server = ::Thrift::ThreadedServer.new(processor,
                                        transport,
                                        transport_factory,
                                        protocol_factory)
  t = Thread.new{
    server.serve
  }
  handler.register_with_scheduler
  t.join
end

Instance Method Details

#boltMetadataConcord::Thrift::ComputationMetadata

Returns The user-defined computation metadata.

Returns:



162
163
164
165
166
167
168
# File 'lib/concord.rb', line 162

def 
   = nil
  log_failure do
     = handler.
  end
  ()
end

#boltProcessRecord(record) ⇒ Object

Process an upstream record. Wraps the user method in a transaction, which is returned to the proxy upon completion.

Parameters:



130
131
132
133
134
135
136
# File 'lib/concord.rb', line 130

def boltProcessRecord(record)
  ctx = ComputationContext.new(self)
  log_failure do
    handler.process_record(ctx, record)
  end
  ctx.transaction
end

#boltProcessTimer(key, time) ⇒ Object

Process a timer callback from the proxy. Wraps the user method in a transaction, which is returned to the proxy upon completion.

Parameters:

  • key (String)

    Callback identifier

  • time (FixNum)

    Time this callback was scheduled to trigger.



142
143
144
145
146
147
148
# File 'lib/concord.rb', line 142

def boltProcessTimer(key, time)
  ctx = ComputationContext.new(self)
  log_failure do
    handler.process_timer(ctx, key, time)
  end
  ctx.transaction
end

#get_state(key) ⇒ String

Retrieve a binary blob stored in the proxy state

Parameters:

  • key (String)

    Key to fetch from data store

Returns:

  • (String)

    Binary blob of data



173
174
175
# File 'lib/concord.rb', line 173

def get_state(key)
  proxy.getState(key)
end

#initObject

The initialization function, called when the framework is ready to start sending the computation records. Wraps the user method in a transaction, which is returned to the proxy upon completion.



153
154
155
156
157
158
159
# File 'lib/concord.rb', line 153

def init
  ctx = ComputationContext.new(self)
  log_failure do
    handler.init(ctx)
  end
  ctx.transaction
end

#register_with_schedulerObject



93
94
95
96
97
# File 'lib/concord.rb', line 93

def register_with_scheduler
  log_failure do
    proxy.registerWithScheduler()
  end
end

#set_state(key, value) ⇒ Object

Store a binary blob, identified by a key, in the proxy state

Parameters:

  • key (String)

    Key to set in data store

  • value (String)

    Binary blob



180
181
182
# File 'lib/concord.rb', line 180

def set_state(key, value)
  proxy.setState(key, value)
end