This project is trying to help writing concurrent program with ruby a little easier.

select_chan

select channels like golang in ruby

Chan is buffered or non-buffered, just like make(chan Type,n) or make(chan Type) in golang but not statically typed in my implementation.

example:

relative 'rbgo'

include Rbgo::Channel

ch1 = Chan.new  # non-buffer channel
ch2 = Chan.new(2) # buffer channel 
ch3 = Chan.new(1)

ch3 << 'hello'

select_chan(
  on_read(chan: ch1){|obj, ok| # obj is the value read from ch1, ok indicates success or failure by close
    #do when read success
  },
  on_read(chan: ch2){
    #do when read success
  },
  on_write(chan: ch3, obj: 'world'){
    #do when write success
  }
){ puts 'call default block' }

go

create lightweight routine like golang

Routine does not create a thread instead it is picked by a thread in pool.

Routine use fiber to yield processor, so you can call Fiber.yield, just like Gosched() in golang.

If routine raise exception it does not affect other routine. Check Routine#alive? to see if it is alive, and check Routine#error to see if something went wrong.

If routine is suspended by a blocking operation for enough long time, a new thread will be created to handle other routine. Suspended thread will exit when it completes current routine.

example:

require 'rbgo'

using Rbgo::CoRunExtensions

wg = Rbgo::WaitGroup.new

wg.add(1)
go do
  puts 'start' 

  Fiber.yield  # like Gosched()

  puts 'end'

  wg.done
end

wg.add(1)
go do
  sleep 1
  puts 'sleep end'
  wg.done
end

wg.wait
puts 'wg.wait done'


# the job that takes very long time or never returns such as 'loop'.
# use Fiber.yield to let other yield jobs on the same thread to run.

go do           
  loop do
    # do some job
    # ...
    Fiber.yield 
  end
end

# Or use go!        

go! do         

# go! force a new thread be created to handle this block, and exit when finish.
# use go! to do blocking operations

end

# Once do block only once

once = Rbgo::Once.new
2.times do
  go do
    once.do { puts 'only once' }
  end
end



# Actor handle message sequentially

actor = Rbgo::Actor.new do|msg, actor|     
          case msg
          when :msg1
            # do some work 
          when :msg2
            # do some work
          when :msg3
            actor.send_msg :msg1 
          end
        end

actor.send_msg :msg1 #won't block

# TaskList do task in order but do it asynchronously, task may be executed in different thread

task_list = Rbgo::TaskList.new

task_1 = proc do
  puts 'first'
  1
end

task_2 = proc do |last_result|
  puts 'second'
  puts "last task result #{last_result}"
  sleep 5
  2
end

task_3 = proc do |last_result|
  puts 'third'
  puts "last task result #{last_result}"
  3
end

task_list << task_1
task_list.add(task_2, timeout: 2, skip_on_exception: true)
task_list << task_3

task_list.start
task_list.wait
p 'wait done.'                          
p task_list.complete?
p task_list.last_error

# Reentrant/ReadWirte Mutex and Semaphore

m = Rbgo::ReentrantMutex.new
m.synchronize do
  m.synchronize do
    puts "I'm here!"
  end
end

m2 = Rbgo::RWMutex.new
5.times do
  go do
    m2.synchronize_r do
      puts "got the read lock"
    end
  end
end
go do
  m2.synchronize_w do
    puts "got the write lock"
    m2.synchronize_r do
      puts 'now, downgrade to read lock'
    end
  end
end

sleep 2

go do
  s = Rbgo::Semaphore.new(5)

  s.acquire(4) do
    puts 'got the permits'
  end

  s.acquire_all do
    puts 'got all available permits'
  end

  ok = s.try_acquire do
    puts 'try success'
  end
  puts 'try ok!' if ok
end

IOMachine

IOMachine wrap nio4r to do IO operation asynchronously.

support platforms: MRI, JRuby.

require 'rbgo'

io_r, io_w = IO.pipe
machine = Rbgo::IOMachine.new

receipt1 = machine.do_read(io_r, length: 100) # when length > 0, result nil if have not read anything yet when read complete.
receipt2 = machine.do_read(io_r, length: 100) # when length >0, result data bytes length up to 100 if have read some when read complete.
receipt3 = machine.do_read(io_r, length: 0) # when length == 0, result ""
receipt4 = machine.do_read(io_r, length: nil) # when length == nil, read until EOF. return "" if have not read anything.

io_w.write("a"*100)
io_w.write("b"*100)
io_w.write("c"*100)
io_w.close # cause EOF

# if the same io object, and the same read/write operation, operations will complete in sequence.
# so receipt1 complete first, and the receipt2 ...
# if the same io object, but not the same read/write operation,
# or not the same io object, operations will complete in arbitrary order.
receipt1.wait
p receipt1.res # aaa...
receipt2.wait
p receipt2.res # bbb...
receipt3.wait
p receipt3.res # ""
receipt4.wait
p receipt4.res # ccc...

io_r, io_w = IO.pipe
receipt1 = machine.do_write(io_w, str: "hello world!")

receipt1.wait
p receipt1.res # number of bytes written, may be less than str.bytesize if exception raised

yield_read / yield_write

use IOMachine to do IO operation asynchronously, but write code in a sequential way.

No callback and another callback...

require 'rbgo'
using Rbgo::CoRunExtensions

io_r, io_w = IO.pipe
go do
  data = io_r.yield_read(100) # have blocking semantics, but execute asynchronously
                              # this operation will *NOT* block the current thread
                              # when io operation complete, thread will resume to execute from this point.
  p data
end

sleep 2

io_w.write("haha I'm crazy.")
io_w.close

# NOTICE yield_read / yield_write will do yield only in CoRun::Routine.new(*args, new_thread: false, &blk) block
# in other place yield_read / yield_write will do normal IO#read / IO#write
# for example: 
# go do
#   io_r.yield_read # will do yield
# end
# 
# go do
#   fiber = Fiber.new do
#     io_r.yield_read # will not do yield. just do normal IO#read
#   end
#   fiber.resume
# end
# 

NetworkService

open TCP or UDP service

Because service handles network request in async mode, it can handle many requests concurrently. If use some Non-GIL ruby implementations such as TruffleRuby or JRuby, it can utilize all your CPU cores.

Use IO#yield_read IO#yield_write to do IO operations

require 'rbgo'

using Rbgo::CoRunExtensions

#localhost, port 3000
tcp_service = Rbgo::NetworkServiceFactory.open_tcp_service(3000) do|sock, _|
  sock.yield_read_line("\r\n\r\n") # read http request
  sock.close_read
  sock.yield_write("HTTP/1.1 200 OK \r\n\r\nHello World!")
  sock.close_write
  sock.close
end                 



p "start tcp service: #{[tcp_service.host, tcp_service.port, tcp_service.type]}"           
sleep 5
tcp_service.stop



#localhost, port auto pick
udp_service = Rbgo::NetworkServiceFactory.open_udp_service(0) do|msg, reply_msg|
  p msg
  reply_msg.reply("I receive your message")
end

p "start udp service: #{[udp_service.host, udp_service.port, udp_service.type]}"      
sleep 5
udp_service.stop


sleep