1 /**
2  * Simple idiomatic dlang wrapper around linux io_uring
3  * (see: https://kernel.dk/io_uring.pdf) asynchronous API.
4  */
5 module during;
6 
7 version (linux) {}
8 else static assert(0, "io_uring is available on linux only");
9 
10 public import during.io_uring;
11 import during.openat2;
12 
13 import core.atomic : MemoryOrder;
14 debug import core.stdc.stdio;
15 import core.stdc.stdlib;
16 import core.sys.linux.epoll;
17 import core.sys.linux.errno;
18 import core.sys.linux.sys.mman;
19 import core.sys.linux.unistd;
20 import core.sys.posix.signal;
21 import core.sys.posix.sys.socket;
22 import core.sys.posix.sys.uio;
23 import std.algorithm.comparison : among;
24 
25 nothrow @nogc:
26 
27 /**
28  * Setup new instance of io_uring into provided `Uring` structure.
29  *
30  * Params:
31  *     uring = `Uring` structure to be initialized (must not be already initialized)
32  *     entries = Number of entries to initialize uring with
33  *     flags = `SetupFlags` to use to initialize uring.
34  *
35  * Returns: On succes it returns 0, `-errno` otherwise.
36  */
37 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE)
38 {
39     assert(uring.payload is null, "Uring is already initialized");
40     uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof);
41     if (uring.payload is null) return -errno;
42 
43     uring.payload.params.flags = flags;
44     uring.payload.refs = 1;
45     auto r = io_uring_setup(entries, uring.payload.params);
46     if (r < 0) return -errno;
47 
48     uring.payload.fd = r;
49 
50     if (uring.payload.mapRings() < 0)
51     {
52         dispose(uring);
53         return -errno;
54     }
55 
56     // debug printf("uring(%d): setup\n", uring.payload.fd);
57 
58     return 0;
59 }
60 
61 /**
62  * Main entry point to work with io_uring.
63  *
64  * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface.
65  * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries.
66  *
67  * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`.
68  *
69  * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for
70  * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry`
71  * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's
72  * reused for multiple operations.
73  */
74 struct Uring
75 {
76     nothrow @nogc:
77 
78     private UringDesc* payload;
79     private void checkInitialized() const @safe pure
80     {
81         assert(payload !is null, "Uring hasn't been initialized yet");
82     }
83 
84     /// Copy constructor
85     this(ref return scope Uring rhs) @safe pure
86     {
87         assert(rhs.payload !is null, "rhs payload is null");
88         // debug printf("uring(%d): copy\n", rhs.payload.fd);
89         this.payload = rhs.payload;
90         this.payload.refs++;
91     }
92 
93     /// Destructor
94     ~this() @safe
95     {
96         dispose(this);
97     }
98 
99     /// Native io_uring file descriptor
100     int fd() const @safe pure
101     {
102         checkInitialized();
103         return payload.fd;
104     }
105 
106     /// io_uring parameters
107     SetupParameters params() const @safe pure return
108     {
109         checkInitialized();
110         return payload.params;
111     }
112 
113     /// Check if there is some `CompletionEntry` to process.
114     bool empty() const @safe pure
115     {
116         checkInitialized();
117         return payload.cq.empty;
118     }
119 
120     /// Check if there is space for another `SubmissionEntry` to submit.
121     bool full() const @safe pure
122     {
123         checkInitialized();
124         return payload.sq.full;
125     }
126 
127     /// Available space in submission queue before it becomes full
128     size_t capacity() const @safe pure
129     {
130         checkInitialized();
131         return payload.sq.capacity;
132     }
133 
134     /// Number of entries in completion queue
135     size_t length() const @safe pure
136     {
137         checkInitialized();
138         return payload.cq.length;
139     }
140 
141     /// Get first `CompletionEntry` from cq ring
142     ref CompletionEntry front() @safe pure return
143     {
144         checkInitialized();
145         return payload.cq.front;
146     }
147 
148     /// Move to next `CompletionEntry`
149     void popFront() @safe pure
150     {
151         checkInitialized();
152         return payload.cq.popFront;
153     }
154 
155     /**
156      * Adds new entry to the `SubmissionQueue`.
157      *
158      * Note that this just adds entry to the queue and doesn't advance the tail
159      * marker kernel sees. For that `finishSq()` is needed to be called next.
160      *
161      * Also note that to actually enter new entries to kernel,
162      * it's needed to call `submit()`.
163      *
164      * Params:
165      *     FN    = Function to fill next entry in queue by `ref` (should be faster).
166      *             It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`.
167      *             Note that in this case queue entry is cleaned first before function is called.
168      *     entry = Custom built `SubmissionEntry` to be posted as is.
169      *             Note that in this case it is copied whole over one in the `SubmissionQueue`.
170      *     args  = Optional arguments passed to the function
171      *
172      * Returns: reference to `Uring` structure so it's possible to chain multiple commands.
173      */
174     ref Uring put()(auto ref SubmissionEntry entry) @safe pure return
175     {
176         checkInitialized();
177         payload.sq.put(entry);
178         return this;
179     }
180 
181     /// ditto
182     ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return
183     {
184         import std.functional : forward;
185         checkInitialized();
186         payload.sq.putWith!FN(forward!args);
187         return this;
188     }
189 
190     /**
191      * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled
192      * to next `SubmissionEntry` in queue.
193      *
194      * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied.
195      *
196      * Params:
197      *   op = Custom operation definition.
198      * Returns:
199      */
200     ref Uring put(OP)(auto ref OP op) return
201         if (!is(OP == SubmissionEntry))
202     {
203         checkInitialized();
204         payload.sq.put(op);
205         return this;
206     }
207 
208     /**
209      * Advances the userspace submision queue and returns last `SubmissionEntry`.
210      */
211     ref SubmissionEntry next()() @safe pure
212     {
213         checkInitialized();
214         return payload.sq.next();
215     }
216 
217     /**
218      * If completion queue is full, the new event maybe dropped.
219      * This value records number of dropped events.
220      */
221     uint overflow() const @safe pure
222     {
223         checkInitialized();
224         return payload.cq.overflow;
225     }
226 
227     /// Counter of invalid submissions (out-of-bound index in submission array)
228     uint dropped() const @safe pure
229     {
230         checkInitialized();
231         return payload.sq.dropped;
232     }
233 
234     /**
235      * Submits qued `SubmissionEntry` to be processed by kernel.
236      *
237      * Params:
238      *     want  = number of `CompletionEntries` to wait for.
239      *             If 0, this just submits queued entries and returns.
240      *             If > 0, it blocks until at least wanted number of entries were completed.
241      *     sig   = See io_uring_enter(2) man page
242      *
243      * Returns: Number of submitted entries on success, `-errno` on error
244      */
245     int submit(uint want = 0, const sigset_t* sig = null) @trusted
246     {
247         checkInitialized();
248 
249         auto len = cast(int)payload.sq.length;
250         if (len > 0) // anything to submit?
251         {
252             EnterFlags flags;
253             if (want > 0) flags |= EnterFlags.GETEVENTS;
254 
255             payload.sq.flushTail(); // advance queue index
256 
257             if (payload.params.flags & SetupFlags.SQPOLL)
258             {
259                 if (payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP)
260                     flags |= EnterFlags.SQ_WAKEUP;
261                 else if (want == 0) return len; // fast poll
262             }
263             auto r = io_uring_enter(payload.fd, len, want, flags, sig);
264             if (r < 0) return -errno;
265             return r;
266         }
267         else if (want > 0) return wait(want); // just simple wait
268         return 0;
269     }
270 
271     /**
272      * Simmilar to `submit` but with this method we just wait for required number
273      * of `CompletionEntries`.
274      *
275      * Returns: `0` on success, `-errno` on error
276      */
277     int wait(uint want = 1, const sigset_t* sig = null) @trusted
278     {
279         pragma(inline);
280         checkInitialized();
281         assert(want > 0, "Invalid want value");
282 
283         if (payload.cq.length >= want) return 0; // we don't need to syscall
284 
285         auto r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, sig);
286         if (r < 0) return -errno;
287         return 0;
288     }
289 
290     /**
291      * Register single buffer to be mapped into the kernel for faster buffered operations.
292      *
293      * To use the buffers, the application must specify the fixed variants for of operations,
294      * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set
295      * in entry extra data.
296      *
297      * An application can increase or decrease the size or number of registered buffers by first
298      * unregistering the existing buffers, and then issuing a new call to io_uring_register() with
299      * the new buffers.
300      *
301      * Params:
302      *   buffer = Buffers to be registered
303      *
304      * Returns: On success, returns 0. On error, `-errno` is returned.
305      */
306     int registerBuffers(T)(T buffers)
307         if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else?
308     {
309         checkInitialized();
310         assert(buffers.length, "Empty buffer");
311 
312         if (payload.regBuffers !is null)
313             return -EBUSY; // buffers were already registered
314 
315         static if (is(T == ubyte[]))
316         {
317             auto p = malloc(iovec.sizeof);
318             if (p is null) return -errno;
319             payload.regBuffers = (cast(iovec*)p)[0..1];
320             payload.regBuffers[0].iov_base = cast(void*)&buffers[0];
321             payload.regBuffers[0].iov_len = buffers.length;
322         }
323         else static if (is(T == ubyte[][]))
324         {
325             auto p = malloc(buffers.length * iovec.sizeof);
326             if (p is null) return -errno;
327             payload.regBuffers = (cast(iovec*)p)[0..buffers.length];
328 
329             foreach (i, b; buffers)
330             {
331                 assert(b.length, "Empty buffer");
332                 payload.regBuffers[i].iov_base = cast(void*)&b[0];
333                 payload.regBuffers[i].iov_len = b.length;
334             }
335         }
336 
337         auto r = io_uring_register(
338                 payload.fd,
339                 RegisterOpCode.REGISTER_BUFFERS,
340                 cast(const(void)*)&payload.regBuffers[0], 1
341             );
342 
343         if (r < 0) return -errno;
344         return 0;
345     }
346 
347     /**
348      * Releases all previously registered buffers associated with the `io_uring` instance.
349      *
350      * An application need not unregister buffers explicitly before shutting down the io_uring instance.
351      *
352      * Returns: On success, returns 0. On error, `-errno` is returned.
353      */
354     int unregisterBuffers() @trusted
355     {
356         checkInitialized();
357 
358         if (payload.regBuffers is null)
359             return -ENXIO; // no buffers were registered
360 
361         free(cast(void*)&payload.regBuffers[0]);
362         payload.regBuffers = null;
363 
364         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0);
365         if (r < 0) return -errno;
366         return 0;
367     }
368 
369     /**
370      * Register files for I/O.
371      *
372      * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags
373      * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the
374      * file descriptor array.
375      *
376      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
377      * need only unregister if it wishes to register a new set of fds.
378      *
379      * Use `-1` as a file descriptor to mark it as reserved in the array.*
380      * Params: fds = array of file descriptors to be registered
381      *
382      * Returns: On success, returns 0. On error, `-errno` is returned.
383      */
384     int registerFiles(const(int)[] fds)
385     {
386         checkInitialized();
387         assert(fds.length, "No file descriptors provided");
388         assert(fds.length < uint.max, "Too many file descriptors");
389 
390         // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers).
391         auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length);
392         if (r < 0) return -errno;
393         return 0;
394     }
395 
396     /*
397      * Register an update for an existing file set. The updates will start at
398      * `off` in the original array.
399      *
400      * Use `-1` as a file descriptor to mark it as reserved in the array.
401      *
402      * Params:
403      *      off = offset to the original registered files to be updated
404      *      files = array of file descriptors to update with
405      *
406      * Returns: number of files updated on success, -errno on failure.
407      */
408     int registerFilesUpdate(uint off, const(int)[] fds) @trusted
409     {
410         struct Update
411         {
412             uint offset;
413             uint _resv;
414             ulong pfds;
415         }
416 
417         static assert (Update.sizeof == 16);
418 
419         checkInitialized();
420         assert(fds.length, "No file descriptors provided to update");
421         assert(fds.length < uint.max, "Too many file descriptors");
422 
423         Update u = { offset: off, pfds: cast(ulong)&fds[0] };
424         auto r = io_uring_register(
425             payload.fd,
426             RegisterOpCode.REGISTER_FILES_UPDATE,
427             &u, cast(uint)fds.length);
428         if (r < 0) return -errno;
429         return 0;
430     }
431 
432     /**
433      * All previously registered files associated with the `io_uring` instance will be unregistered.
434      *
435      * Files are automatically unregistered when the `io_uring` instance is torn down. An application
436      * need only unregister if it wishes to register a new set of fds.
437      *
438      * Returns: On success, returns 0. On error, `-errno` is returned.
439      */
440     int unregisterFiles() @trusted
441     {
442         checkInitialized();
443         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0);
444         if (r < 0) return -errno;
445         return 0;
446     }
447 
448     /**
449      * Registers event file descriptor that would be used as a notification mechanism on completion
450      * queue change.
451      *
452      * Params: eventFD = event filedescriptor to be notified about change
453      *
454      * Returns: On success, returns 0. On error, `-errno` is returned.
455      */
456     int registerEventFD(int eventFD) @trusted
457     {
458         checkInitialized();
459         auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1);
460         if (r < 0) return -errno;
461         return 0;
462     }
463 
464     /**
465      * Unregister previously registered notification event file descriptor.
466      *
467      * Returns: On success, returns 0. On error, `-errno` is returned.
468      */
469     int unregisterEventFD() @trusted
470     {
471         checkInitialized();
472         auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0);
473         if (r < 0) return -errno;
474         return 0;
475     }
476 }
477 
478 /**
479  * Uses custom operation definition to fill fields of `SubmissionEntry`.
480  * Can be used in cases, when builtin prep* functions aren't enough.
481  *
482  * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work.
483  *
484  * Note: This doesn't touch previous state of the entry, just fills the corresponding fields.
485  *       So it might be needed to call `clear` first on the entry (depends on usage).
486  *
487  * Params:
488  *   entry = entry to set parameters to
489  *   op = operation to fill entry with (can be custom type)
490  */
491 ref SubmissionEntry fill(E)(return ref SubmissionEntry entry, auto ref E op)
492 {
493     pragma(inline);
494     import std.traits : hasMember, FieldNameTuple;
495 
496     // fill entry from provided operation fields (they must have same name as in SubmissionEntry)
497     foreach (m; FieldNameTuple!E)
498     {
499         static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m);
500         __traits(getMember, entry, m) = __traits(getMember, op, m);
501     }
502 
503     return entry;
504 }
505 
506 /**
507  * Template function to help set `SubmissionEntry` `user_data` field.
508  *
509  * Params:
510  *      entry = `SubmissionEntry` to prepare
511  *      data = data to set to the `SubmissionEntry`
512  *
513  * Note: data are passed by ref and must live during whole operation.
514  */
515 ref SubmissionEntry setUserData(D)(return ref SubmissionEntry entry, ref D data) @trusted
516 {
517     pragma(inline);
518     entry.user_data = cast(ulong)(cast(void*)&data);
519     return entry;
520 }
521 
522 /**
523  * Template function to help set `SubmissionEntry` `user_data` field. This differs to `setUserData`
524  * in that it emplaces the provided data directly into SQE `user_data` field and not the pointer to
525  * the data.
526  *
527  * Because of that, data must be of `ulong.sizeof`.
528  */
529 ref SubmissionEntry setUserDataRaw(D)(return ref SubmissionEntry entry, auto ref D data) @trusted
530     if (D.sizeof == ulong.sizeof)
531 {
532     pragma(inline);
533     entry.user_data = *(cast(ulong*)(cast(void*)&data));
534     return entry;
535 }
536 
537 /**
538  * Helper function to retrieve data set directly to the `CompletionEntry` user_data (set by `setUserDataRaw`).
539  */
540 D userDataAs(D)(ref CompletionEntry entry) @trusted
541     if (D.sizeof == ulong.sizeof)
542 {
543     pragma(inline);
544     return *(cast(D*)(cast(void*)&entry.user_data));
545 }
546 
547 ref SubmissionEntry prepRW(return ref SubmissionEntry entry, Operation op,
548     int fd = -1, const void* addr = null, uint len = 0, ulong offset = 0) @safe
549 {
550     pragma(inline);
551     entry.opcode = op;
552     entry.fd = fd;
553     entry.off = offset;
554     entry.flags = SubmissionEntryFlags.NONE;
555     entry.ioprio = 0;
556     entry.addr = cast(ulong)addr;
557     entry.len = len;
558     entry.rw_flags = ReadWriteFlags.NONE;
559 	entry.user_data = 0;
560     entry.__pad2[0] = entry.__pad2[1] = entry.__pad2[2] = 0;
561     return entry;
562 }
563 
564 /**
565  * Prepares `nop` operation.
566  *
567  * Params:
568  *      entry = `SubmissionEntry` to prepare
569  */
570 ref SubmissionEntry prepNop(return ref SubmissionEntry entry) @safe
571 {
572     entry.prepRW(Operation.NOP);
573     return entry;
574 }
575 
576 /**
577  * Prepares `readv` operation.
578  *
579  * Params:
580  *      entry = `SubmissionEntry` to prepare
581  *      fd = file descriptor of file we are operating on
582  *      offset = offset
583  *      buffer = iovec buffers to be used by the operation
584  */
585 ref SubmissionEntry prepReadv(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted
586     if (is(V == iovec[]) || is(V == iovec))
587 {
588     static if (is(V == iovec[]))
589     {
590         assert(buffer.length, "Empty buffer");
591         assert(buffer.length < uint.max, "Too many iovec buffers");
592         return entry.prepRW(Operation.READV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
593     }
594     else return entry.prepRW(Operation.READV, fd, cast(void*)&buffer, 1, offset);
595 }
596 
597 /**
598  * Prepares `writev` operation.
599  *
600  * Params:
601  *      entry = `SubmissionEntry` to prepare
602  *      fd = file descriptor of file we are operating on
603  *      offset = offset
604  *      buffer = iovec buffers to be used by the operation
605  */
606 ref SubmissionEntry prepWritev(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted
607     if (is(V == iovec[]) || is(V == iovec))
608 {
609     static if (is(V == iovec[]))
610     {
611         assert(buffer.length, "Empty buffer");
612         assert(buffer.length < uint.max, "Too many iovec buffers");
613         return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
614     }
615     else return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer, 1, offset);
616 }
617 
618 /**
619  * Prepares `read_fixed` operation.
620  *
621  * Params:
622  *      entry = `SubmissionEntry` to prepare
623  *      fd = file descriptor of file we are operating on
624  *      offset = offset
625  *      buffer = slice to preregistered buffer
626  *      bufferIndex = index to the preregistered buffers array buffer belongs to
627  */
628 ref SubmissionEntry prepReadFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe
629 {
630     assert(buffer.length, "Empty buffer");
631     assert(buffer.length < uint.max, "Buffer too large");
632     entry.prepRW(Operation.READ_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
633     entry.buf_index = bufferIndex;
634     return entry;
635 }
636 
637 /**
638  * Prepares `write_fixed` operation.
639  *
640  * Params:
641  *      entry = `SubmissionEntry` to prepare
642  *      fd = file descriptor of file we are operating on
643  *      offset = offset
644  *      buffer = slice to preregistered buffer
645  *      bufferIndex = index to the preregistered buffers array buffer belongs to
646  */
647 ref SubmissionEntry prepWriteFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe
648 {
649     assert(buffer.length, "Empty buffer");
650     assert(buffer.length < uint.max, "Buffer too large");
651     entry.prepRW(Operation.WRITE_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
652     entry.buf_index = bufferIndex;
653     return entry;
654 }
655 
656 /**
657  * Prepares `recvmsg(2)` operation.
658  *
659  * Params:
660  *      entry = `SubmissionEntry` to prepare
661  *      fd = file descriptor of file we are operating on
662  *      msg = message to operate with
663  *      flags = `recvmsg` operation flags
664  *
665  * Note: Available from Linux 5.3
666  *
667  * See_Also: `recvmsg(2)` man page for details.
668  */
669 ref SubmissionEntry prepRecvMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted
670 {
671     entry.prepRW(Operation.RECVMSG, fd, cast(void*)&msg, 1, 0);
672     entry.msg_flags = flags;
673     return entry;
674 }
675 
676 /**
677  * Prepares `sendmsg(2)` operation.
678  *
679  * Params:
680  *      entry = `SubmissionEntry` to prepare
681  *      fd = file descriptor of file we are operating on
682  *      msg = message to operate with
683  *      flags = `sendmsg` operation flags
684  *
685  * Note: Available from Linux 5.3
686  *
687  * See_Also: `sendmsg(2)` man page for details.
688  */
689 ref SubmissionEntry prepSendMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted
690 {
691     entry.prepRW(Operation.SENDMSG, fd, cast(void*)&msg, 1, 0);
692     entry.msg_flags = flags;
693     return entry;
694 }
695 
696 /**
697  * Prepares `fsync` operation.
698  *
699  * Params:
700  *      entry = `SubmissionEntry` to prepare
701  *      fd = file descriptor of a file to call `fsync` on
702  *      flags = `fsync` operation flags
703  */
704 ref SubmissionEntry prepFsync(return ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe
705 {
706     entry.prepRW(Operation.FSYNC, fd);
707     entry.fsync_flags = flags;
708     return entry;
709 }
710 
711 /**
712  * Poll the fd specified in the submission queue entry for the events specified in the poll_events
713  * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode.
714  * That is, once the poll operation is completed, it will have to be resubmitted.
715  *
716  * Params:
717  *      entry = `SubmissionEntry` to prepare
718  *      fd = file descriptor to poll
719  *      events = events to poll on the FD
720  */
721 ref SubmissionEntry prepPollAdd(return ref SubmissionEntry entry, int fd, PollEvents events) @safe
722 {
723     import std.system : endian, Endian;
724 
725     entry.prepRW(Operation.POLL_ADD, fd);
726     static if (endian == Endian.bigEndian)
727         entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16;
728     else
729         entry.poll_events32 = events;
730     return entry;
731 }
732 
733 /**
734  * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain
735  * `0`.  If not found, res will contain `-ENOENT`.
736  *
737  * Params:
738  *      entry = `SubmissionEntry` to prepare
739  *      userData = data with the previously issued poll operation
740  */
741 ref SubmissionEntry prepPollRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted
742 {
743     return entry.prepRW(Operation.POLL_REMOVE, -1, cast(void*)&userData);
744 }
745 
746 /**
747  * Prepares `sync_file_range(2)` operation.
748  *
749  * Sync a file segment with disk, permits fine control when synchronizing the open file referred to
750  * by the file descriptor fd with disk.
751  *
752  * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized.
753  *
754  * Params:
755  *      entry = `SubmissionEntry` to prepare
756  *      fd = is the file descriptor to sync
757  *      offset = the starting byte of the file range to be synchronized
758  *      len = the length of the range to be synchronized, in bytes
759  *      flags = the flags for the command.
760  *
761  * See_Also: `sync_file_range(2)` for the general description of the related system call.
762  *
763  * Note: available from Linux 5.2
764  */
765 ref SubmissionEntry prepSyncFileRange(return ref SubmissionEntry entry, int fd, ulong offset, uint len,
766     SyncFileRangeFlags flags = SyncFileRangeFlags.WRITE_AND_WAIT) @safe
767 {
768     entry.opcode = Operation.SYNC_FILE_RANGE;
769     entry.fd = fd;
770     entry.off = offset;
771     entry.len = len;
772     entry.sync_range_flags = flags;
773     return entry;
774 }
775 
776 /**
777  * This command will register a timeout operation.
778  *
779  * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A
780  * timeout condition is met when either the specified timeout expires, or the specified number of
781  * events have completed. Either condition will trigger the event. The request will complete with
782  * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got
783  * completed through requests completing on their own. If the timeout was cancelled before it
784  * expired, the request will complete with `-ECANCELED`.
785  *
786  * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation.
787  *
788  * Params:
789  *      entry = `SubmissionEntry` to prepare
790  *      time = reference to `time64` data structure
791  *      count = completion event count
792  *      flags = define if it's a relative or absolute time
793  *
794  * Note: Available from Linux 5.4
795  */
796 ref SubmissionEntry prepTimeout(return ref SubmissionEntry entry, ref KernelTimespec time,
797     ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) @trusted
798 {
799     entry.prepRW(Operation.TIMEOUT, -1, cast(void*)&time, 1, count);
800     entry.timeout_flags = flags;
801     return entry;
802 }
803 
804 /**
805  * Prepares operations to remove existing timeout registered using `TIMEOUT`operation.
806  *
807  * Attempt to remove an existing timeout operation. If the specified timeout request is found and
808  * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the
809  * timeout request was found but expiration was already in progress, this request will terminate
810  * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will
811  * terminate with a result value of `-ENOENT`.
812  *
813  * Params:
814  *      entry = `SubmissionEntry` to prepare
815  *      userData = user data provided with the previously issued timeout operation
816  *
817  * Note: Available from Linux 5.5
818  */
819 ref SubmissionEntry prepTimeoutRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted
820 {
821     return entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData);
822 }
823 
824 /**
825  * Prepares `accept4(2)` operation.
826  *
827  * See_Also: `accept4(2)`` for the general description of the related system call.
828  *
829  * Params:
830  *      entry = `SubmissionEntry` to prepare
831  *      fd = socket file descriptor
832  *      addr = reference to one of sockaddr structires to be filled with accepted client address
833  *      addrlen = reference to addrlen field that would be filled with accepted client address length
834  *
835  * Note: Available from Linux 5.5
836  */
837 ref SubmissionEntry prepAccept(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen,
838     AcceptFlags flags = AcceptFlags.NONE) @trusted
839 {
840     entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen));
841     entry.accept_flags = flags;
842     return entry;
843 }
844 
845 /**
846  * Prepares operation that cancels existing async work.
847  *
848  * This works with any read/write request, accept,send/recvmsg, etc. There’s an important
849  * distinction to make here with the different kinds of commands. A read/write on a regular file
850  * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore
851  * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel
852  * these operations if they haven’t yet been started. If they have been started, cancellations on
853  * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled
854  * at any time. The completion event for this request will have a result of 0 if done successfully,
855  * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request
856  * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may
857  * not cause this request to be stopped sooner. For blocking IO, the original request will complete
858  * as it originally would have. For IO that is cancellable, it will terminate sooner if at all
859  * possible.
860  *
861  * Params:
862  *      entry = `SubmissionEntry` to prepare
863  *      userData = `user_data` field of the request that should be cancelled
864  *
865  * Note: Available from Linux 5.5
866  */
867 ref SubmissionEntry prepCancel(D)(return ref SubmissionEntry entry, ref D userData, uint flags = 0) @trusted
868 {
869     entry.prepRW(Operation.ASYNC_CANCEL, -1, cast(void*)&userData);
870     entry.cancel_flags = flags;
871     return entry;
872 }
873 
874 /**
875  * Prepares linked timeout operation.
876  *
877  * This request must be linked with another request through `IOSQE_IO_LINK` which is described below.
878  * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion
879  * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no
880  * completion event count as it's tied to a specific request. If used, the timeout specified in the
881  * command will cancel the linked command, unless the linked command completes before the
882  * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was
883  * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked
884  * request.
885  *
886  * Note: Available from Linux 5.5
887  *
888  * Params:
889  *      entry = `SubmissionEntry` to prepare
890  *      time = time specification
891  *      flags = define if it's a relative or absolute time
892  */
893 ref SubmissionEntry prepLinkTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) @trusted
894 {
895     entry.prepRW(Operation.LINK_TIMEOUT, -1, cast(void*)&time, 1, 0);
896     entry.timeout_flags = flags;
897     return entry;
898 }
899 
900 /**
901  * Note: Available from Linux 5.5
902  */
903 ref SubmissionEntry prepConnect(ADDR)(return ref SubmissionEntry entry, int fd, ref const(ADDR) addr) @trusted
904 {
905     return entry.prepRW(Operation.CONNECT, fd, cast(void*)&addr, 0, ADDR.sizeof);
906 }
907 
908 /**
909  * Note: Available from Linux 5.6
910  */
911 ref SubmissionEntry prepFilesUpdate(return ref SubmissionEntry entry, int[] fds, int offset) @safe
912 {
913     return entry.prepRW(Operation.FILES_UPDATE, -1, cast(void*)&fds[0], cast(uint)fds.length, offset);
914 }
915 
916 /**
917  * Note: Available from Linux 5.6
918  */
919 ref SubmissionEntry prepFallocate(return ref SubmissionEntry entry, int fd, int mode, long offset, long len) @trusted
920 {
921     return entry.prepRW(Operation.FALLOCATE, fd, cast(void*)len, mode, offset);
922 }
923 
924 /**
925  * Note: Available from Linux 5.6
926  */
927 ref SubmissionEntry prepOpenat(return ref SubmissionEntry entry, int fd, const char* path, int flags, uint mode) @trusted
928 {
929     entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0);
930     entry.open_flags = flags;
931     return entry;
932 }
933 
934 /**
935  * Note: Available from Linux 5.6
936  */
937 ref SubmissionEntry prepClose(return ref SubmissionEntry entry, int fd) @safe
938 {
939     return entry.prepRW(Operation.CLOSE, fd);
940 }
941 
942 /**
943  * Note: Available from Linux 5.6
944  */
945 ref SubmissionEntry prepRead(return ref SubmissionEntry entry, int fd, ubyte[] buffer, long offset) @safe
946 {
947     return entry.prepRW(Operation.READ, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
948 }
949 
950 /**
951  * Note: Available from Linux 5.6
952  */
953 ref SubmissionEntry prepWrite(return ref SubmissionEntry entry, int fd, const(ubyte)[] buffer, long offset) @trusted
954 {
955     return entry.prepRW(Operation.WRITE, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset);
956 }
957 
958 /**
959  * Note: Available from Linux 5.6
960  */
961 ref SubmissionEntry prepStatx(Statx)(return ref SubmissionEntry entry, int fd, const char* path, int flags, uint mask, ref Statx statxbuf) @trusted
962 {
963     entry.prepRW(Operation.STATX, fd, cast(void*)path, mask, cast(ulong)(cast(void*)&statxbuf));
964     entry.statx_flags = flags;
965     return entry;
966 }
967 
968 /**
969  * Note: Available from Linux 5.6
970  */
971 ref SubmissionEntry prepFadvise(return ref SubmissionEntry entry, int fd, long offset, uint len, int advice) @safe
972 {
973     entry.prepRW(Operation.FADVISE, fd, null, len, offset);
974     entry.fadvise_advice = advice;
975     return entry;
976 }
977 
978 /**
979  * Note: Available from Linux 5.6
980  */
981 ref SubmissionEntry prepMadvise(return ref SubmissionEntry entry, const(ubyte)[] block, int advice) @trusted
982 {
983     entry.prepRW(Operation.MADVISE, -1, cast(void*)&block[0], cast(uint)block.length, 0);
984     entry.fadvise_advice = advice;
985     return entry;
986 }
987 
988 /**
989  * Note: Available from Linux 5.6
990  */
991 ref SubmissionEntry prepSend(return ref SubmissionEntry entry,
992     int sockfd, const(ubyte)[] buf, MsgFlags flags = MsgFlags.NONE) @trusted
993 {
994     entry.prepRW(Operation.SEND, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0);
995     entry.msg_flags = flags;
996     return entry;
997 }
998 
999 /**
1000  * Note: Available from Linux 5.6
1001  */
1002 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry,
1003     int sockfd, ubyte[] buf, MsgFlags flags = MsgFlags.NONE) @trusted
1004 {
1005     entry.prepRW(Operation.RECV, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0);
1006     entry.msg_flags = flags;
1007     return entry;
1008 }
1009 
1010 /**
1011  * Variant that uses registered buffers group.
1012  *
1013  * Note: Available from Linux 5.6
1014  */
1015 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry,
1016     int sockfd, ushort gid, uint len, MsgFlags flags = MsgFlags.NONE) @safe
1017 {
1018     entry.prepRW(Operation.RECV, sockfd, null, len, 0);
1019     entry.msg_flags = flags;
1020     entry.buf_group = gid;
1021     entry.flags |= SubmissionEntryFlags.BUFFER_SELECT;
1022     return entry;
1023 }
1024 
1025 /**
1026  * Note: Available from Linux 5.6
1027  */
1028 ref SubmissionEntry prepOpenat2(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how) @trusted
1029 {
1030     return entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how));
1031 }
1032 
1033 /**
1034  * Note: Available from Linux 5.6
1035  */
1036 ref SubmissionEntry prepEpollCtl(return ref SubmissionEntry entry, int epfd, int fd, int op, ref epoll_event ev) @trusted
1037 {
1038     return entry.prepRW(Operation.EPOLL_CTL, epfd, cast(void*)&ev, op, fd);
1039 }
1040 
1041 /**
1042  * Note: Available from Linux 5.7
1043  */
1044 ref SubmissionEntry prepSplice(return ref SubmissionEntry entry,
1045     int fd_in, ulong off_in,
1046     int fd_out, ulong off_out,
1047     uint nbytes, uint splice_flags) @safe
1048 {
1049     entry.prepRW(Operation.SPLICE, fd_out, null, nbytes, off_out);
1050     entry.splice_off_in = off_in;
1051     entry.splice_fd_in = fd_in;
1052     entry.splice_flags = splice_flags;
1053     return entry;
1054 }
1055 
1056 /**
1057  * Note: Available from Linux 5.7
1058  *
1059  * Params:
1060  *    entry = `SubmissionEntry` to prepare
1061  *    buf   = buffers to provide
1062  *    len   = length of each buffer to add
1063  *    bgid  = buffers group id
1064  *    bid   = starting buffer id
1065  */
1066 ref SubmissionEntry prepProvideBuffers(return ref SubmissionEntry entry, ubyte[][] buf, uint len, ushort bgid, int bid) @safe
1067 {
1068     assert(buf.length < int.max, "Too many buffers");
1069     entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)buf.length, cast(void*)&buf[0][0], len, bid);
1070     entry.buf_group = bgid;
1071     return entry;
1072 }
1073 
1074 /// ditto
1075 ref SubmissionEntry prepProvideBuffers(size_t M, size_t N)(return ref SubmissionEntry entry, ref ubyte[M][N] buf, ushort bgid, int bid) @safe
1076 {
1077     static assert(N < int.max, "Too many buffers");
1078     static assert(M < uint.max, "Buffer too large");
1079     entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)N, cast(void*)&buf[0][0], cast(uint)M, bid);
1080     entry.buf_group = bgid;
1081     return entry;
1082 }
1083 
1084 /// ditto
1085 ref SubmissionEntry prepProvideBuffer(size_t N)(return ref SubmissionEntry entry, ref ubyte[N] buf, ushort bgid, int bid) @safe
1086 {
1087     static assert(N < uint.max, "Buffer too large");
1088     entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)N, bid);
1089     entry.buf_group = bgid;
1090     return entry;
1091 }
1092 
1093 /// ditto
1094 ref SubmissionEntry prepProvideBuffer(return ref SubmissionEntry entry, ref ubyte[] buf, ushort bgid, int bid) @safe
1095 {
1096     assert(buf.length < uint.max, "Buffer too large");
1097     entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)buf.length, bid);
1098     entry.buf_group = bgid;
1099     return entry;
1100 }
1101 
1102 /**
1103  * Note: Available from Linux 5.7
1104  */
1105 ref SubmissionEntry prepRemoveBuffers(return ref SubmissionEntry entry, int nr, ushort bgid) @safe
1106 {
1107     entry.prepRW(Operation.REMOVE_BUFFERS, nr);
1108     entry.buf_group = bgid;
1109     return entry;
1110 }
1111 
1112 /**
1113  * Note: Available from Linux 5.8
1114  */
1115 ref SubmissionEntry prepTee(return ref SubmissionEntry entry, int fd_in, int fd_out, uint nbytes, uint flags) @safe
1116 {
1117     entry.prepRW(Operation.TEE, fd_out, null, nbytes, 0);
1118     entry.splice_off_in = 0;
1119     entry.splice_fd_in = fd_in;
1120     entry.splice_flags = flags;
1121     return entry;
1122 }
1123 
1124 private:
1125 
1126 // uring cleanup
1127 void dispose(ref Uring uring) @trusted
1128 {
1129     if (uring.payload is null) return;
1130     // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs);
1131     if (--uring.payload.refs == 0)
1132     {
1133         import std.traits : hasElaborateDestructor;
1134         // debug printf("uring(%d): free\n", uring.payload.fd);
1135         static if (hasElaborateDestructor!UringDesc)
1136             destroy(*uring.payload); // call possible destructors
1137         free(cast(void*)uring.payload);
1138     }
1139     uring.payload = null;
1140 }
1141 
1142 // system fields descriptor
1143 struct UringDesc
1144 {
1145     nothrow @nogc:
1146 
1147     int fd;
1148     size_t refs;
1149     SetupParameters params;
1150     SubmissionQueue sq;
1151     CompletionQueue cq;
1152 
1153     iovec[] regBuffers;
1154 
1155     ~this() @trusted
1156     {
1157         if (regBuffers) free(cast(void*)&regBuffers[0]);
1158         if (sq.ring) munmap(sq.ring, sq.ringSize);
1159         if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof);
1160         if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize);
1161         close(fd);
1162     }
1163 
1164     private auto mapRings() @trusted
1165     {
1166         sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof;
1167         cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof;
1168 
1169         if (params.features & SetupFeatures.SINGLE_MMAP)
1170         {
1171             if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize;
1172             cq.ringSize = sq.ringSize;
1173         }
1174 
1175         sq.ring = mmap(null, sq.ringSize,
1176             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1177             fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET
1178         );
1179 
1180         if (sq.ring == MAP_FAILED)
1181         {
1182             sq.ring = null;
1183             return -errno;
1184         }
1185 
1186         if (params.features & SetupFeatures.SINGLE_MMAP)
1187             cq.ring = sq.ring;
1188         else
1189         {
1190             cq.ring = mmap(null, cq.ringSize,
1191                 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1192                 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET
1193             );
1194 
1195             if (cq.ring == MAP_FAILED)
1196             {
1197                 cq.ring = null;
1198                 return -errno; // cleanup is done in struct destructors
1199             }
1200         }
1201 
1202         uint entries    = *cast(uint*)(sq.ring + params.sq_off.ring_entries);
1203         sq.khead        = cast(uint*)(sq.ring + params.sq_off.head);
1204         sq.ktail        = cast(uint*)(sq.ring + params.sq_off.tail);
1205         sq.localTail    = *sq.ktail;
1206         sq.ringMask     = *cast(uint*)(sq.ring + params.sq_off.ring_mask);
1207         sq.kflags       = cast(uint*)(sq.ring + params.sq_off.flags);
1208         sq.kdropped     = cast(uint*)(sq.ring + params.sq_off.dropped);
1209 
1210         // Indirection array of indexes to the sqes array (head and tail are pointing to this array).
1211         // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it.
1212         // That way, head and tail are actually indexes to our sqes array.
1213         foreach (i; 0..entries)
1214         {
1215             *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i;
1216         }
1217 
1218         auto psqes = mmap(
1219             null, entries * SubmissionEntry.sizeof,
1220             PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
1221             fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET
1222         );
1223 
1224         if (psqes == MAP_FAILED) return -errno;
1225         sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries];
1226 
1227         entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries);
1228         cq.khead        = cast(uint*)(cq.ring + params.cq_off.head);
1229         cq.localHead    = *cq.khead;
1230         cq.ktail        = cast(uint*)(cq.ring + params.cq_off.tail);
1231         cq.ringMask     = *cast(uint*)(cq.ring + params.cq_off.ring_mask);
1232         cq.koverflow    = cast(uint*)(cq.ring + params.cq_off.overflow);
1233         cq.cqes         = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries];
1234         cq.kflags       = cast(uint*)(cq.ring + params.cq_off.flags);
1235         return 0;
1236     }
1237 }
1238 
1239 /// Wraper for `SubmissionEntry` queue
1240 struct SubmissionQueue
1241 {
1242     nothrow @nogc:
1243 
1244     // mmaped fields
1245     uint* khead; // controlled by kernel
1246     uint* ktail; // controlled by us
1247     uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP)
1248     uint* kdropped; // counter of invalid submissions (out of bound index)
1249     uint ringMask; // constant mask used to determine array index from head/tail
1250 
1251     // mmap details (for cleanup)
1252     void* ring; // pointer to the mmaped region
1253     size_t ringSize; // size of mmaped memory block
1254 
1255     // mmapped list of entries (fixed length)
1256     SubmissionEntry[] sqes;
1257 
1258     uint localTail; // used for batch submission
1259 
1260     uint head() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*khead); }
1261     uint tail() const @safe pure { return localTail; }
1262 
1263     void flushTail() @safe pure
1264     {
1265         pragma(inline, true);
1266         // debug printf("SQ updating tail: %d\n", localTail);
1267         atomicStore!(MemoryOrder.rel)(*ktail, localTail);
1268     }
1269 
1270     SubmissionQueueFlags flags() const @safe pure
1271     {
1272         return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags);
1273     }
1274 
1275     bool full() const @safe pure { return sqes.length == length; }
1276 
1277     size_t length() const @safe pure { return tail - head; }
1278 
1279     size_t capacity() const @safe pure { return sqes.length - length; }
1280 
1281     ref SubmissionEntry next()() @safe pure return
1282     {
1283         return sqes[localTail++ & ringMask];
1284     }
1285 
1286     void put()(auto ref SubmissionEntry entry) @safe pure
1287     {
1288         assert(!full, "SumbissionQueue is full");
1289         sqes[localTail++ & ringMask] = entry;
1290     }
1291 
1292     void put(OP)(auto ref OP op)
1293         if (!is(OP == SubmissionEntry))
1294     {
1295         assert(!full, "SumbissionQueue is full");
1296         sqes[localTail++ & ringMask].fill(op);
1297     }
1298 
1299     private void putWith(alias FN, ARGS...)(auto ref ARGS args)
1300     {
1301         import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple;
1302 
1303         static assert(
1304             Parameters!FN.length >= 1
1305             && is(Parameters!FN[0] == SubmissionEntry)
1306             && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_,
1307             "Alias function must accept at least `ref SubmissionEntry`");
1308 
1309         static assert(
1310             is(typeof(FN(sqes[localTail & ringMask], args))),
1311             "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof);
1312 
1313         assert(!full, "SumbissionQueue is full");
1314         FN(sqes[localTail++ & ringMask], args);
1315     }
1316 
1317     uint dropped() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*kdropped); }
1318 }
1319 
1320 struct CompletionQueue
1321 {
1322     nothrow @nogc:
1323 
1324     // mmaped fields
1325     uint* khead; // controlled by us (increment after entry at head was read)
1326     uint* ktail; // updated by kernel
1327     uint* koverflow;
1328     uint* kflags;
1329     CompletionEntry[] cqes; // array of entries (fixed length)
1330 
1331     uint ringMask; // constant mask used to determine array index from head/tail
1332 
1333     // mmap details (for cleanup)
1334     void* ring;
1335     size_t ringSize;
1336 
1337     uint localHead; // used for bulk reading
1338 
1339     uint head() const @safe pure { return localHead; }
1340     uint tail() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*ktail); }
1341 
1342     void flushHead() @safe pure
1343     {
1344         pragma(inline, true);
1345         // debug printf("CQ updating head: %d\n", localHead);
1346         atomicStore!(MemoryOrder.rel)(*khead, localHead);
1347     }
1348 
1349     bool empty() const @safe pure { return head == tail; }
1350 
1351     ref CompletionEntry front() @safe pure return
1352     {
1353         assert(!empty, "CompletionQueue is empty");
1354         return cqes[localHead & ringMask];
1355     }
1356 
1357     void popFront() @safe pure
1358     {
1359         pragma(inline);
1360         assert(!empty, "CompletionQueue is empty");
1361         localHead++;
1362         flushHead();
1363     }
1364 
1365     size_t length() const @safe pure { return tail - localHead; }
1366 
1367     uint overflow() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*koverflow); }
1368 
1369     /// Runtime CQ flags - written by the application, shouldn't be modified by the kernel.
1370     void flags(CQRingFlags flags) @safe pure { atomicStore!(MemoryOrder.raw)(*kflags, flags); }
1371 }
1372 
1373 // just a helper to use atomicStore more easily with older compilers
1374 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted
1375 {
1376     pragma(inline, true);
1377     import core.atomic : store = atomicStore;
1378     static if (__VERSION__ >= 2089) store!ms(val, newVal);
1379     else store!ms(*(cast(shared T*)&val), newVal);
1380 }
1381 
1382 // just a helper to use atomicLoad more easily with older compilers
1383 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted
1384 {
1385     pragma(inline, true);
1386     import core.atomic : load = atomicLoad;
1387     static if (__VERSION__ >= 2089) return load!ms(val);
1388     else return load!ms(*(cast(const shared T*)&val));
1389 }
1390 
1391 version (assert)
1392 {
1393     import std.range.primitives : ElementType, isInputRange, isOutputRange;
1394     static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry));
1395     static assert(isOutputRange!(Uring, SubmissionEntry));
1396 }