Class: Libcouchbase::ResultsFiber

Inherits:
Results
  • Object
show all
Defined in:
lib/libcouchbase/results_fiber.rb

Direct Known Subclasses

ResultsEM, ResultsLibuv

Instance Attribute Summary

Attributes inherited from Results

#complete_result_set, #metadata, #query_completed, #query_in_progress

Instance Method Summary collapse

Constructor Details

#initialize(query, &row_modifier) ⇒ ResultsFiber

Returns a new instance of ResultsFiber.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/libcouchbase/results_fiber.rb', line 7

def initialize(query, &row_modifier)
    @query_in_progress = false
    @query_completed = false
    @complete_result_set = false

    @results = []
    @fiber = nil

    # We don't want to resume a fiber that is waiting 
    # in a yield to user code as then the Fiber might
    # end before we've finished processing and this is
    # very much not desirable - dead fiber errors
    @resume_results = true

    # This could be a view or n1ql query
    @query = query
    @row_modifier = row_modifier
end

Instance Method Details

#cancelObject



158
159
160
161
162
# File 'lib/libcouchbase/results_fiber.rb', line 158

def cancel
    @cancelled = true
    @query.cancel
    resume
end

#countObject



124
125
126
127
# File 'lib/libcouchbase/results_fiber.rb', line 124

def count
    first unless @metadata
    @query.get_count @metadata
end

#each(&blk) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/libcouchbase/results_fiber.rb', line 69

def each(&blk)
    # return a valid enumerator
    return load_all.each unless block_given?

    if @complete_result_set
        @results.each &blk
    else
        perform
        @fiber = Fiber.current

        begin
            index = 0
            remaining = index < @results.length
            while !@query_completed || remaining do
                if remaining
                    @resume_results = false
                    yield @results[index]
                    index += 1
                else
                    @resume_results = true
                    resume
                end

                remaining = index < @results.length
            end
        ensure
            # cancel is executed on break or error
            @resume_results = true
            cancel unless @query_completed
            @fiber = nil
        end
    end
    self
end

#firstObject



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/libcouchbase/results_fiber.rb', line 104

def first
    if @complete_result_set || @results.length > 0
        @results[0]
    else
        perform is_complete: false, limit: 1

        @fiber = Fiber.current
        begin
            while not @query_completed do
                resume
            end
        ensure
            @fiber = nil
        end

        result = @results[0]
        result
    end
end

#options(**opts) ⇒ Object



26
27
28
29
# File 'lib/libcouchbase/results_fiber.rb', line 26

def options(**opts)
    reset
    @query.options.merge!(opts)
end

#resetObject



62
63
64
65
66
67
# File 'lib/libcouchbase/results_fiber.rb', line 62

def reset
    raise 'query in progress' if @query_in_progress
    @query_in_progress = false
    @complete_result_set = false
    @results.clear
end

#stream(&blk) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/libcouchbase/results_fiber.rb', line 32

def stream(&blk)
    if @complete_result_set
        @results.each &blk
    else
        perform is_complete: false
        @fiber = Fiber.current

        begin
            remaining = @results.length > 0
            while !@query_completed || remaining do
                if remaining
                    @resume_results = false
                    yield @results.shift
                else
                    @resume_results = true
                    resume
                end

                remaining = @results.length > 0
            end
        ensure
            # cancel is executed on break or error
            @resume_results = true
            cancel unless @query_completed
            @fiber = nil
        end
    end
    self
end

#take(num) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/libcouchbase/results_fiber.rb', line 129

def take(num)
    if @complete_result_set || @results.length >= num
        @results[0...num]
    else
        perform is_complete: false, limit: num
        @fiber = Fiber.current
        result = []

        begin
            index = 0
            remaining = index < @results.length && index < num
            while !@query_completed || remaining do
                if remaining
                    result << @results[index]
                    index += 1
                else
                    resume
                end

                remaining = index < @results.length && index < num
            end
        ensure
            @fiber = nil
        end

        result
    end
end