1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
//! Blocking queue implementation aimed primarily for message passing
//! between threads.
const std = @import("std");
const builtin = @import("builtin");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
/// Returns a blocking queue implementation for type T.
///
/// This is tailor made for ghostty usage so it isn't meant to be maximally
/// generic, but I'm happy to make it more generic over time. Traits of this
/// queue that are specific to our usage:
///
/// - Fixed size. We expect our queue to quickly drain and also not be
/// too large so we prefer a fixed size queue for now.
/// - No blocking pop. We use an external event loop mechanism such as
/// eventfd to notify our waiter that there is no data available so
/// we don't need to implement a blocking pop.
/// - Drain function. Most queues usually pop one at a time. We have
/// a mechanism for draining since on every IO loop our TTY drains
/// the full queue so we can get rid of the overhead of a ton of
/// locks and bounds checking and do a one-time drain.
///
/// One key usage pattern is that our blocking queues are single producer
/// single consumer (SPSC). This should let us do some interesting optimizations
/// in the future. At the time of writing this, the blocking queue implementation
/// is purposely naive to build something quickly, but we should benchmark
/// and make this more optimized as necessary.
pub fn BlockingQueue(
comptime T: type,
comptime capacity: usize,
) type {
return struct {
const Self = @This();
// The type we use for queue size types. We can optimize this
// in the future to be the correct bit-size for our preallocated
// size for this queue.
pub const Size = u32;
// The bounds of this queue. We recast this to Size so we can do math.
const bounds: Size = @intCast(capacity);
/// Specifies the timeout for an operation.
pub const Timeout = union(enum) {
/// Fail instantly (non-blocking).
instant: void,
/// Run forever or until interrupted
forever: void,
/// Nanoseconds
ns: u64,
};
/// Our data. The values are undefined until they are written.
data: [bounds]T = undefined,
/// The next location to write (next empty loc) and next location
/// to read (next non-empty loc). The number of written elements.
write: Size = 0,
read: Size = 0,
len: Size = 0,
/// The big mutex that must be held to read/write.
mutex: std.Thread.Mutex = .{},
/// A CV for being notified when the queue is no longer full. This is
/// used for writing. Note we DON'T have a CV for waiting on the
/// queue not being EMPTY because we use external notifiers for that.
cond_not_full: std.Thread.Condition = .{},
not_full_waiters: usize = 0,
/// Allocate the blocking queue on the heap.
pub fn create(alloc: Allocator) !*Self {
const ptr = try alloc.create(Self);
errdefer alloc.destroy(ptr);
ptr.* = .{
.data = undefined,
.len = 0,
.write = 0,
.read = 0,
.mutex = .{},
.cond_not_full = .{},
.not_full_waiters = 0,
};
return ptr;
}
/// Free all the resources for this queue. This should only be
/// called once all producers and consumers have quit.
pub fn destroy(self: *Self, alloc: Allocator) void {
self.* = undefined;
alloc.destroy(self);
}
/// Push a value to the queue. This returns the total size of the
/// queue (unread items) after the push. A return value of zero
/// means that the push failed.
pub fn push(self: *Self, value: T, timeout: Timeout) Size {
self.mutex.lock();
defer self.mutex.unlock();
// The
if (self.full()) {
switch (timeout) {
// If we're not waiting, then we failed to write.
.instant => return 0,
.forever => {
self.not_full_waiters += 1;
defer self.not_full_waiters -= 1;
self.cond_not_full.wait(&self.mutex);
},
.ns => |ns| {
self.not_full_waiters += 1;
defer self.not_full_waiters -= 1;
self.cond_not_full.timedWait(&self.mutex, ns) catch return 0;
},
}
// If we're still full, then we failed to write. This can
// happen in situations where we are interrupted.
if (self.full()) return 0;
}
// Add our data and update our accounting
self.data[self.write] = value;
self.write += 1;
if (self.write >= bounds) self.write -= bounds;
self.len += 1;
return self.len;
}
/// Pop a value from the queue without blocking.
pub fn pop(self: *Self) ?T {
self.mutex.lock();
defer self.mutex.unlock();
// If we're empty we have nothing
if (self.len == 0) return null;
// Get the index we're going to read data from and do some
// accounting. We don't copy the value here to avoid copying twice.
const n = self.read;
self.read += 1;
if (self.read >= bounds) self.read -= bounds;
self.len -= 1;
// If we have consumers waiting on a full queue, notify.
if (self.not_full_waiters > 0) self.cond_not_full.signal();
return self.data[n];
}
/// Pop all values from the queue. This will hold the big mutex
/// until `deinit` is called on the return value. This is used if
/// you know you're going to "pop" and utilize all the values
/// quickly to avoid many locks, bounds checks, and cv signals.
pub fn drain(self: *Self) DrainIterator {
self.mutex.lock();
return .{ .queue = self };
}
pub const DrainIterator = struct {
queue: *Self,
pub fn next(self: *DrainIterator) ?T {
if (self.queue.len == 0) return null;
// Read and account
const n = self.queue.read;
self.queue.read += 1;
if (self.queue.read >= bounds) self.queue.read -= bounds;
self.queue.len -= 1;
return self.queue.data[n];
}
pub fn deinit(self: *DrainIterator) void {
// If we have consumers waiting on a full queue, notify.
if (self.queue.not_full_waiters > 0) self.queue.cond_not_full.signal();
// Unlock
self.queue.mutex.unlock();
}
};
/// Returns true if the queue is full. This is not public because
/// it requires the lock to be held.
inline fn full(self: *Self) bool {
return self.len == bounds;
}
};
}
test "basic push and pop" {
const testing = std.testing;
const alloc = testing.allocator;
const Q = BlockingQueue(u64, 4);
const q = try Q.create(alloc);
defer q.destroy(alloc);
// Should have no values
try testing.expect(q.pop() == null);
// Push until we're full
try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 2), q.push(2, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 3), q.push(3, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 4), q.push(4, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 0), q.push(5, .{ .instant = {} }));
// Pop!
try testing.expect(q.pop().? == 1);
try testing.expect(q.pop().? == 2);
try testing.expect(q.pop().? == 3);
try testing.expect(q.pop().? == 4);
try testing.expect(q.pop() == null);
// Drain does nothing
var it = q.drain();
try testing.expect(it.next() == null);
it.deinit();
// Verify we can still push
try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} }));
}
test "timed push" {
const testing = std.testing;
const alloc = testing.allocator;
const Q = BlockingQueue(u64, 1);
const q = try Q.create(alloc);
defer q.destroy(alloc);
// Push
try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 0), q.push(2, .{ .instant = {} }));
// Timed push should fail
try testing.expectEqual(@as(Q.Size, 0), q.push(2, .{ .ns = 1000 }));
}
|