1 /** 2 * Simple idiomatic dlang wrapper around linux io_uring 3 * (see: https://kernel.dk/io_uring.pdf) asynchronous API. 4 */ 5 module during; 6 7 version (linux) {} 8 else static assert(0, "io_uring is available on linux only"); 9 10 public import during.io_uring; 11 import during.openat2; 12 13 import core.atomic : MemoryOrder; 14 debug import core.stdc.stdio; 15 import core.stdc.stdlib; 16 import core.sys.linux.epoll; 17 import core.sys.linux.errno; 18 import core.sys.linux.sys.mman; 19 import core.sys.linux.unistd; 20 import core.sys.posix.signal; 21 import core.sys.posix.sys.socket; 22 import core.sys.posix.sys.uio; 23 import std.algorithm.comparison : among; 24 25 nothrow @nogc: 26 27 /** 28 * Setup new instance of io_uring into provided `Uring` structure. 29 * 30 * Params: 31 * uring = `Uring` structure to be initialized (must not be already initialized) 32 * entries = Number of entries to initialize uring with 33 * flags = `SetupFlags` to use to initialize uring. 34 * 35 * Returns: On succes it returns 0, `-errno` otherwise. 36 */ 37 int setup(ref Uring uring, uint entries = 128, SetupFlags flags = SetupFlags.NONE) 38 { 39 assert(uring.payload is null, "Uring is already initialized"); 40 uring.payload = cast(UringDesc*)calloc(1, UringDesc.sizeof); 41 if (uring.payload is null) return -errno; 42 43 uring.payload.params.flags = flags; 44 uring.payload.refs = 1; 45 auto r = io_uring_setup(entries, uring.payload.params); 46 if (r < 0) return -errno; 47 48 uring.payload.fd = r; 49 50 if (uring.payload.mapRings() < 0) 51 { 52 dispose(uring); 53 return -errno; 54 } 55 56 // debug printf("uring(%d): setup\n", uring.payload.fd); 57 58 return 0; 59 } 60 61 /** 62 * Main entry point to work with io_uring. 63 * 64 * It hides `SubmissionQueue` and `CompletionQueue` behind standard range interface. 65 * We put in `SubmissionEntry` entries and take out `CompletionEntry` entries. 66 * 67 * Use predefined `prepXX` methods to fill required fields of `SubmissionEntry` before `put` or during `putWith`. 68 * 69 * Note: `prepXX` functions doesn't touch previous entry state, just fills in operation properties. This is because for 70 * less error prone interface it is cleared automatically when prepared using `putWith`. So when using on own `SubmissionEntry` 71 * (outside submission queue), that would be added to the submission queue using `put`, be sure its cleared if it's 72 * reused for multiple operations. 73 */ 74 struct Uring 75 { 76 nothrow @nogc: 77 78 private UringDesc* payload; 79 private void checkInitialized() const @safe pure 80 { 81 assert(payload !is null, "Uring hasn't been initialized yet"); 82 } 83 84 /// Copy constructor 85 this(ref return scope Uring rhs) @safe pure 86 { 87 assert(rhs.payload !is null, "rhs payload is null"); 88 // debug printf("uring(%d): copy\n", rhs.payload.fd); 89 this.payload = rhs.payload; 90 this.payload.refs++; 91 } 92 93 /// Destructor 94 ~this() @safe 95 { 96 dispose(this); 97 } 98 99 /// Native io_uring file descriptor 100 int fd() const @safe pure 101 { 102 checkInitialized(); 103 return payload.fd; 104 } 105 106 /// io_uring parameters 107 SetupParameters params() const @safe pure return 108 { 109 checkInitialized(); 110 return payload.params; 111 } 112 113 /// Check if there is some `CompletionEntry` to process. 114 bool empty() const @safe pure 115 { 116 checkInitialized(); 117 return payload.cq.empty; 118 } 119 120 /// Check if there is space for another `SubmissionEntry` to submit. 121 bool full() const @safe pure 122 { 123 checkInitialized(); 124 return payload.sq.full; 125 } 126 127 /// Available space in submission queue before it becomes full 128 size_t capacity() const @safe pure 129 { 130 checkInitialized(); 131 return payload.sq.capacity; 132 } 133 134 /// Number of entries in completion queue 135 size_t length() const @safe pure 136 { 137 checkInitialized(); 138 return payload.cq.length; 139 } 140 141 /// Get first `CompletionEntry` from cq ring 142 ref CompletionEntry front() @safe pure return 143 { 144 checkInitialized(); 145 return payload.cq.front; 146 } 147 148 /// Move to next `CompletionEntry` 149 void popFront() @safe pure 150 { 151 checkInitialized(); 152 return payload.cq.popFront; 153 } 154 155 /** 156 * Adds new entry to the `SubmissionQueue`. 157 * 158 * Note that this just adds entry to the queue and doesn't advance the tail 159 * marker kernel sees. For that `finishSq()` is needed to be called next. 160 * 161 * Also note that to actually enter new entries to kernel, 162 * it's needed to call `submit()`. 163 * 164 * Params: 165 * FN = Function to fill next entry in queue by `ref` (should be faster). 166 * It is expected to be in a form of `void function(ARGS)(ref SubmissionEntry, auto ref ARGS)`. 167 * Note that in this case queue entry is cleaned first before function is called. 168 * entry = Custom built `SubmissionEntry` to be posted as is. 169 * Note that in this case it is copied whole over one in the `SubmissionQueue`. 170 * args = Optional arguments passed to the function 171 * 172 * Returns: reference to `Uring` structure so it's possible to chain multiple commands. 173 */ 174 ref Uring put()(auto ref SubmissionEntry entry) @safe pure return 175 { 176 checkInitialized(); 177 payload.sq.put(entry); 178 return this; 179 } 180 181 /// ditto 182 ref Uring putWith(alias FN, ARGS...)(auto ref ARGS args) return 183 { 184 import std.functional : forward; 185 checkInitialized(); 186 payload.sq.putWith!FN(forward!args); 187 return this; 188 } 189 190 /** 191 * Similar to `put(SubmissionEntry)` but in this case we can provide our custom type (args) to be filled 192 * to next `SubmissionEntry` in queue. 193 * 194 * Fields in the provided type must use the same names as in `SubmissionEntry` to be automagically copied. 195 * 196 * Params: 197 * op = Custom operation definition. 198 * Returns: 199 */ 200 ref Uring put(OP)(auto ref OP op) return 201 if (!is(OP == SubmissionEntry)) 202 { 203 checkInitialized(); 204 payload.sq.put(op); 205 return this; 206 } 207 208 /** 209 * Advances the userspace submision queue and returns last `SubmissionEntry`. 210 */ 211 ref SubmissionEntry next()() @safe pure 212 { 213 checkInitialized(); 214 return payload.sq.next(); 215 } 216 217 /** 218 * If completion queue is full, the new event maybe dropped. 219 * This value records number of dropped events. 220 */ 221 uint overflow() const @safe pure 222 { 223 checkInitialized(); 224 return payload.cq.overflow; 225 } 226 227 /// Counter of invalid submissions (out-of-bound index in submission array) 228 uint dropped() const @safe pure 229 { 230 checkInitialized(); 231 return payload.sq.dropped; 232 } 233 234 /** 235 * Submits qued `SubmissionEntry` to be processed by kernel. 236 * 237 * Params: 238 * want = number of `CompletionEntries` to wait for. 239 * If 0, this just submits queued entries and returns. 240 * If > 0, it blocks until at least wanted number of entries were completed. 241 * sig = See io_uring_enter(2) man page 242 * 243 * Returns: Number of submitted entries on success, `-errno` on error 244 */ 245 int submit(uint want = 0, const sigset_t* sig = null) @trusted 246 { 247 checkInitialized(); 248 249 auto len = cast(int)payload.sq.length; 250 if (len > 0) // anything to submit? 251 { 252 EnterFlags flags; 253 if (want > 0) flags |= EnterFlags.GETEVENTS; 254 255 payload.sq.flushTail(); // advance queue index 256 257 if (payload.params.flags & SetupFlags.SQPOLL) 258 { 259 if (payload.sq.flags & SubmissionQueueFlags.NEED_WAKEUP) 260 flags |= EnterFlags.SQ_WAKEUP; 261 else if (want == 0) return len; // fast poll 262 } 263 auto r = io_uring_enter(payload.fd, len, want, flags, sig); 264 if (r < 0) return -errno; 265 return r; 266 } 267 else if (want > 0) return wait(want); // just simple wait 268 return 0; 269 } 270 271 /** 272 * Simmilar to `submit` but with this method we just wait for required number 273 * of `CompletionEntries`. 274 * 275 * Returns: `0` on success, `-errno` on error 276 */ 277 int wait(uint want = 1, const sigset_t* sig = null) @trusted 278 { 279 pragma(inline); 280 checkInitialized(); 281 assert(want > 0, "Invalid want value"); 282 283 if (payload.cq.length >= want) return 0; // we don't need to syscall 284 285 auto r = io_uring_enter(payload.fd, 0, want, EnterFlags.GETEVENTS, sig); 286 if (r < 0) return -errno; 287 return 0; 288 } 289 290 /** 291 * Register single buffer to be mapped into the kernel for faster buffered operations. 292 * 293 * To use the buffers, the application must specify the fixed variants for of operations, 294 * `READ_FIXED` or `WRITE_FIXED` in the `SubmissionEntry` also with used `buf_index` set 295 * in entry extra data. 296 * 297 * An application can increase or decrease the size or number of registered buffers by first 298 * unregistering the existing buffers, and then issuing a new call to io_uring_register() with 299 * the new buffers. 300 * 301 * Params: 302 * buffer = Buffers to be registered 303 * 304 * Returns: On success, returns 0. On error, `-errno` is returned. 305 */ 306 int registerBuffers(T)(T buffers) 307 if (is(T == ubyte[]) || is(T == ubyte[][])) // TODO: something else? 308 { 309 checkInitialized(); 310 assert(buffers.length, "Empty buffer"); 311 312 if (payload.regBuffers !is null) 313 return -EBUSY; // buffers were already registered 314 315 static if (is(T == ubyte[])) 316 { 317 auto p = malloc(iovec.sizeof); 318 if (p is null) return -errno; 319 payload.regBuffers = (cast(iovec*)p)[0..1]; 320 payload.regBuffers[0].iov_base = cast(void*)&buffers[0]; 321 payload.regBuffers[0].iov_len = buffers.length; 322 } 323 else static if (is(T == ubyte[][])) 324 { 325 auto p = malloc(buffers.length * iovec.sizeof); 326 if (p is null) return -errno; 327 payload.regBuffers = (cast(iovec*)p)[0..buffers.length]; 328 329 foreach (i, b; buffers) 330 { 331 assert(b.length, "Empty buffer"); 332 payload.regBuffers[i].iov_base = cast(void*)&b[0]; 333 payload.regBuffers[i].iov_len = b.length; 334 } 335 } 336 337 auto r = io_uring_register( 338 payload.fd, 339 RegisterOpCode.REGISTER_BUFFERS, 340 cast(const(void)*)&payload.regBuffers[0], 1 341 ); 342 343 if (r < 0) return -errno; 344 return 0; 345 } 346 347 /** 348 * Releases all previously registered buffers associated with the `io_uring` instance. 349 * 350 * An application need not unregister buffers explicitly before shutting down the io_uring instance. 351 * 352 * Returns: On success, returns 0. On error, `-errno` is returned. 353 */ 354 int unregisterBuffers() @trusted 355 { 356 checkInitialized(); 357 358 if (payload.regBuffers is null) 359 return -ENXIO; // no buffers were registered 360 361 free(cast(void*)&payload.regBuffers[0]); 362 payload.regBuffers = null; 363 364 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_BUFFERS, null, 0); 365 if (r < 0) return -errno; 366 return 0; 367 } 368 369 /** 370 * Register files for I/O. 371 * 372 * To make use of the registered files, the `IOSQE_FIXED_FILE` flag must be set in the flags 373 * member of the `SubmissionEntry`, and the `fd` member is set to the index of the file in the 374 * file descriptor array. 375 * 376 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 377 * need only unregister if it wishes to register a new set of fds. 378 * 379 * Use `-1` as a file descriptor to mark it as reserved in the array.* 380 * Params: fds = array of file descriptors to be registered 381 * 382 * Returns: On success, returns 0. On error, `-errno` is returned. 383 */ 384 int registerFiles(const(int)[] fds) 385 { 386 checkInitialized(); 387 assert(fds.length, "No file descriptors provided"); 388 assert(fds.length < uint.max, "Too many file descriptors"); 389 390 // arg contains a pointer to an array of nr_args file descriptors (signed 32 bit integers). 391 auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_FILES, &fds[0], cast(uint)fds.length); 392 if (r < 0) return -errno; 393 return 0; 394 } 395 396 /* 397 * Register an update for an existing file set. The updates will start at 398 * `off` in the original array. 399 * 400 * Use `-1` as a file descriptor to mark it as reserved in the array. 401 * 402 * Params: 403 * off = offset to the original registered files to be updated 404 * files = array of file descriptors to update with 405 * 406 * Returns: number of files updated on success, -errno on failure. 407 */ 408 int registerFilesUpdate(uint off, const(int)[] fds) @trusted 409 { 410 struct Update 411 { 412 uint offset; 413 uint _resv; 414 ulong pfds; 415 } 416 417 static assert (Update.sizeof == 16); 418 419 checkInitialized(); 420 assert(fds.length, "No file descriptors provided to update"); 421 assert(fds.length < uint.max, "Too many file descriptors"); 422 423 Update u = { offset: off, pfds: cast(ulong)&fds[0] }; 424 auto r = io_uring_register( 425 payload.fd, 426 RegisterOpCode.REGISTER_FILES_UPDATE, 427 &u, cast(uint)fds.length); 428 if (r < 0) return -errno; 429 return 0; 430 } 431 432 /** 433 * All previously registered files associated with the `io_uring` instance will be unregistered. 434 * 435 * Files are automatically unregistered when the `io_uring` instance is torn down. An application 436 * need only unregister if it wishes to register a new set of fds. 437 * 438 * Returns: On success, returns 0. On error, `-errno` is returned. 439 */ 440 int unregisterFiles() @trusted 441 { 442 checkInitialized(); 443 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_FILES, null, 0); 444 if (r < 0) return -errno; 445 return 0; 446 } 447 448 /** 449 * Registers event file descriptor that would be used as a notification mechanism on completion 450 * queue change. 451 * 452 * Params: eventFD = event filedescriptor to be notified about change 453 * 454 * Returns: On success, returns 0. On error, `-errno` is returned. 455 */ 456 int registerEventFD(int eventFD) @trusted 457 { 458 checkInitialized(); 459 auto r = io_uring_register(payload.fd, RegisterOpCode.REGISTER_EVENTFD, &eventFD, 1); 460 if (r < 0) return -errno; 461 return 0; 462 } 463 464 /** 465 * Unregister previously registered notification event file descriptor. 466 * 467 * Returns: On success, returns 0. On error, `-errno` is returned. 468 */ 469 int unregisterEventFD() @trusted 470 { 471 checkInitialized(); 472 auto r = io_uring_register(payload.fd, RegisterOpCode.UNREGISTER_EVENTFD, null, 0); 473 if (r < 0) return -errno; 474 return 0; 475 } 476 } 477 478 /** 479 * Uses custom operation definition to fill fields of `SubmissionEntry`. 480 * Can be used in cases, when builtin prep* functions aren't enough. 481 * 482 * Custom definition fields must correspond to fields of `SubmissionEntry` for this to work. 483 * 484 * Note: This doesn't touch previous state of the entry, just fills the corresponding fields. 485 * So it might be needed to call `clear` first on the entry (depends on usage). 486 * 487 * Params: 488 * entry = entry to set parameters to 489 * op = operation to fill entry with (can be custom type) 490 */ 491 ref SubmissionEntry fill(E)(return ref SubmissionEntry entry, auto ref E op) 492 { 493 pragma(inline); 494 import std.traits : hasMember, FieldNameTuple; 495 496 // fill entry from provided operation fields (they must have same name as in SubmissionEntry) 497 foreach (m; FieldNameTuple!E) 498 { 499 static assert(hasMember!(SubmissionEntry, m), "unknown member: " ~ E.stringof ~ "." ~ m); 500 __traits(getMember, entry, m) = __traits(getMember, op, m); 501 } 502 503 return entry; 504 } 505 506 /** 507 * Template function to help set `SubmissionEntry` `user_data` field. 508 * 509 * Params: 510 * entry = `SubmissionEntry` to prepare 511 * data = data to set to the `SubmissionEntry` 512 * 513 * Note: data are passed by ref and must live during whole operation. 514 */ 515 ref SubmissionEntry setUserData(D)(return ref SubmissionEntry entry, ref D data) @trusted 516 { 517 pragma(inline); 518 entry.user_data = cast(ulong)(cast(void*)&data); 519 return entry; 520 } 521 522 /** 523 * Template function to help set `SubmissionEntry` `user_data` field. This differs to `setUserData` 524 * in that it emplaces the provided data directly into SQE `user_data` field and not the pointer to 525 * the data. 526 * 527 * Because of that, data must be of `ulong.sizeof`. 528 */ 529 ref SubmissionEntry setUserDataRaw(D)(return ref SubmissionEntry entry, auto ref D data) @trusted 530 if (D.sizeof == ulong.sizeof) 531 { 532 pragma(inline); 533 entry.user_data = *(cast(ulong*)(cast(void*)&data)); 534 return entry; 535 } 536 537 /** 538 * Helper function to retrieve data set directly to the `CompletionEntry` user_data (set by `setUserDataRaw`). 539 */ 540 D userDataAs(D)(ref CompletionEntry entry) @trusted 541 if (D.sizeof == ulong.sizeof) 542 { 543 pragma(inline); 544 return *(cast(D*)(cast(void*)&entry.user_data)); 545 } 546 547 ref SubmissionEntry prepRW(return ref SubmissionEntry entry, Operation op, 548 int fd = -1, const void* addr = null, uint len = 0, ulong offset = 0) @safe 549 { 550 pragma(inline); 551 entry.opcode = op; 552 entry.fd = fd; 553 entry.off = offset; 554 entry.flags = SubmissionEntryFlags.NONE; 555 entry.ioprio = 0; 556 entry.addr = cast(ulong)addr; 557 entry.len = len; 558 entry.rw_flags = ReadWriteFlags.NONE; 559 entry.user_data = 0; 560 entry.__pad2[0] = entry.__pad2[1] = entry.__pad2[2] = 0; 561 return entry; 562 } 563 564 /** 565 * Prepares `nop` operation. 566 * 567 * Params: 568 * entry = `SubmissionEntry` to prepare 569 */ 570 ref SubmissionEntry prepNop(return ref SubmissionEntry entry) @safe 571 { 572 entry.prepRW(Operation.NOP); 573 return entry; 574 } 575 576 /** 577 * Prepares `readv` operation. 578 * 579 * Params: 580 * entry = `SubmissionEntry` to prepare 581 * fd = file descriptor of file we are operating on 582 * offset = offset 583 * buffer = iovec buffers to be used by the operation 584 */ 585 ref SubmissionEntry prepReadv(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted 586 if (is(V == iovec[]) || is(V == iovec)) 587 { 588 static if (is(V == iovec[])) 589 { 590 assert(buffer.length, "Empty buffer"); 591 assert(buffer.length < uint.max, "Too many iovec buffers"); 592 return entry.prepRW(Operation.READV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 593 } 594 else return entry.prepRW(Operation.READV, fd, cast(void*)&buffer, 1, offset); 595 } 596 597 /** 598 * Prepares `writev` operation. 599 * 600 * Params: 601 * entry = `SubmissionEntry` to prepare 602 * fd = file descriptor of file we are operating on 603 * offset = offset 604 * buffer = iovec buffers to be used by the operation 605 */ 606 ref SubmissionEntry prepWritev(V)(return ref SubmissionEntry entry, int fd, ref const V buffer, long offset) @trusted 607 if (is(V == iovec[]) || is(V == iovec)) 608 { 609 static if (is(V == iovec[])) 610 { 611 assert(buffer.length, "Empty buffer"); 612 assert(buffer.length < uint.max, "Too many iovec buffers"); 613 return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 614 } 615 else return entry.prepRW(Operation.WRITEV, fd, cast(void*)&buffer, 1, offset); 616 } 617 618 /** 619 * Prepares `read_fixed` operation. 620 * 621 * Params: 622 * entry = `SubmissionEntry` to prepare 623 * fd = file descriptor of file we are operating on 624 * offset = offset 625 * buffer = slice to preregistered buffer 626 * bufferIndex = index to the preregistered buffers array buffer belongs to 627 */ 628 ref SubmissionEntry prepReadFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe 629 { 630 assert(buffer.length, "Empty buffer"); 631 assert(buffer.length < uint.max, "Buffer too large"); 632 entry.prepRW(Operation.READ_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 633 entry.buf_index = bufferIndex; 634 return entry; 635 } 636 637 /** 638 * Prepares `write_fixed` operation. 639 * 640 * Params: 641 * entry = `SubmissionEntry` to prepare 642 * fd = file descriptor of file we are operating on 643 * offset = offset 644 * buffer = slice to preregistered buffer 645 * bufferIndex = index to the preregistered buffers array buffer belongs to 646 */ 647 ref SubmissionEntry prepWriteFixed(return ref SubmissionEntry entry, int fd, long offset, ubyte[] buffer, ushort bufferIndex) @safe 648 { 649 assert(buffer.length, "Empty buffer"); 650 assert(buffer.length < uint.max, "Buffer too large"); 651 entry.prepRW(Operation.WRITE_FIXED, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 652 entry.buf_index = bufferIndex; 653 return entry; 654 } 655 656 /** 657 * Prepares `recvmsg(2)` operation. 658 * 659 * Params: 660 * entry = `SubmissionEntry` to prepare 661 * fd = file descriptor of file we are operating on 662 * msg = message to operate with 663 * flags = `recvmsg` operation flags 664 * 665 * Note: Available from Linux 5.3 666 * 667 * See_Also: `recvmsg(2)` man page for details. 668 */ 669 ref SubmissionEntry prepRecvMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 670 { 671 entry.prepRW(Operation.RECVMSG, fd, cast(void*)&msg, 1, 0); 672 entry.msg_flags = flags; 673 return entry; 674 } 675 676 /** 677 * Prepares `sendmsg(2)` operation. 678 * 679 * Params: 680 * entry = `SubmissionEntry` to prepare 681 * fd = file descriptor of file we are operating on 682 * msg = message to operate with 683 * flags = `sendmsg` operation flags 684 * 685 * Note: Available from Linux 5.3 686 * 687 * See_Also: `sendmsg(2)` man page for details. 688 */ 689 ref SubmissionEntry prepSendMsg(return ref SubmissionEntry entry, int fd, ref msghdr msg, MsgFlags flags = MsgFlags.NONE) @trusted 690 { 691 entry.prepRW(Operation.SENDMSG, fd, cast(void*)&msg, 1, 0); 692 entry.msg_flags = flags; 693 return entry; 694 } 695 696 /** 697 * Prepares `fsync` operation. 698 * 699 * Params: 700 * entry = `SubmissionEntry` to prepare 701 * fd = file descriptor of a file to call `fsync` on 702 * flags = `fsync` operation flags 703 */ 704 ref SubmissionEntry prepFsync(return ref SubmissionEntry entry, int fd, FsyncFlags flags = FsyncFlags.NORMAL) @safe 705 { 706 entry.prepRW(Operation.FSYNC, fd); 707 entry.fsync_flags = flags; 708 return entry; 709 } 710 711 /** 712 * Poll the fd specified in the submission queue entry for the events specified in the poll_events 713 * field. Unlike poll or epoll without `EPOLLONESHOT`, this interface always works in one shot mode. 714 * That is, once the poll operation is completed, it will have to be resubmitted. 715 * 716 * Params: 717 * entry = `SubmissionEntry` to prepare 718 * fd = file descriptor to poll 719 * events = events to poll on the FD 720 */ 721 ref SubmissionEntry prepPollAdd(return ref SubmissionEntry entry, int fd, PollEvents events) @safe 722 { 723 import std.system : endian, Endian; 724 725 entry.prepRW(Operation.POLL_ADD, fd); 726 static if (endian == Endian.bigEndian) 727 entry.poll_events32 = (events & 0x0000ffffUL) << 16 | (events & 0xffff0000) >> 16; 728 else 729 entry.poll_events32 = events; 730 return entry; 731 } 732 733 /** 734 * Remove an existing poll request. If found, the res field of the `CompletionEntry` will contain 735 * `0`. If not found, res will contain `-ENOENT`. 736 * 737 * Params: 738 * entry = `SubmissionEntry` to prepare 739 * userData = data with the previously issued poll operation 740 */ 741 ref SubmissionEntry prepPollRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted 742 { 743 return entry.prepRW(Operation.POLL_REMOVE, -1, cast(void*)&userData); 744 } 745 746 /** 747 * Prepares `sync_file_range(2)` operation. 748 * 749 * Sync a file segment with disk, permits fine control when synchronizing the open file referred to 750 * by the file descriptor fd with disk. 751 * 752 * If `len` is 0, then all bytes from `offset` through to the end of file are synchronized. 753 * 754 * Params: 755 * entry = `SubmissionEntry` to prepare 756 * fd = is the file descriptor to sync 757 * offset = the starting byte of the file range to be synchronized 758 * len = the length of the range to be synchronized, in bytes 759 * flags = the flags for the command. 760 * 761 * See_Also: `sync_file_range(2)` for the general description of the related system call. 762 * 763 * Note: available from Linux 5.2 764 */ 765 ref SubmissionEntry prepSyncFileRange(return ref SubmissionEntry entry, int fd, ulong offset, uint len, 766 SyncFileRangeFlags flags = SyncFileRangeFlags.WRITE_AND_WAIT) @safe 767 { 768 entry.opcode = Operation.SYNC_FILE_RANGE; 769 entry.fd = fd; 770 entry.off = offset; 771 entry.len = len; 772 entry.sync_range_flags = flags; 773 return entry; 774 } 775 776 /** 777 * This command will register a timeout operation. 778 * 779 * A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A 780 * timeout condition is met when either the specified timeout expires, or the specified number of 781 * events have completed. Either condition will trigger the event. The request will complete with 782 * `-ETIME` if the timeout got completed through expiration of the timer, or `0` if the timeout got 783 * completed through requests completing on their own. If the timeout was cancelled before it 784 * expired, the request will complete with `-ECANCELED`. 785 * 786 * Applications may delete existing timeouts before they occur with `TIMEOUT_REMOVE` operation. 787 * 788 * Params: 789 * entry = `SubmissionEntry` to prepare 790 * time = reference to `time64` data structure 791 * count = completion event count 792 * flags = define if it's a relative or absolute time 793 * 794 * Note: Available from Linux 5.4 795 */ 796 ref SubmissionEntry prepTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, 797 ulong count = 0, TimeoutFlags flags = TimeoutFlags.REL) @trusted 798 { 799 entry.prepRW(Operation.TIMEOUT, -1, cast(void*)&time, 1, count); 800 entry.timeout_flags = flags; 801 return entry; 802 } 803 804 /** 805 * Prepares operations to remove existing timeout registered using `TIMEOUT`operation. 806 * 807 * Attempt to remove an existing timeout operation. If the specified timeout request is found and 808 * cancelled successfully, this request will terminate with a result value of `-ECANCELED`. If the 809 * timeout request was found but expiration was already in progress, this request will terminate 810 * with a result value of `-EALREADY`. If the timeout request wasn't found, the request will 811 * terminate with a result value of `-ENOENT`. 812 * 813 * Params: 814 * entry = `SubmissionEntry` to prepare 815 * userData = user data provided with the previously issued timeout operation 816 * 817 * Note: Available from Linux 5.5 818 */ 819 ref SubmissionEntry prepTimeoutRemove(D)(return ref SubmissionEntry entry, ref D userData) @trusted 820 { 821 return entry.prepRW(Operation.TIMEOUT_REMOVE, -1, cast(void*)&userData); 822 } 823 824 /** 825 * Prepares `accept4(2)` operation. 826 * 827 * See_Also: `accept4(2)`` for the general description of the related system call. 828 * 829 * Params: 830 * entry = `SubmissionEntry` to prepare 831 * fd = socket file descriptor 832 * addr = reference to one of sockaddr structires to be filled with accepted client address 833 * addrlen = reference to addrlen field that would be filled with accepted client address length 834 * 835 * Note: Available from Linux 5.5 836 */ 837 ref SubmissionEntry prepAccept(ADDR)(return ref SubmissionEntry entry, int fd, ref ADDR addr, ref socklen_t addrlen, 838 AcceptFlags flags = AcceptFlags.NONE) @trusted 839 { 840 entry.prepRW(Operation.ACCEPT, fd, cast(void*)&addr, 0, cast(ulong)(cast(void*)&addrlen)); 841 entry.accept_flags = flags; 842 return entry; 843 } 844 845 /** 846 * Prepares operation that cancels existing async work. 847 * 848 * This works with any read/write request, accept,send/recvmsg, etc. There’s an important 849 * distinction to make here with the different kinds of commands. A read/write on a regular file 850 * will generally be waiting for IO completion in an uninterruptible state. This means it’ll ignore 851 * any signals or attempts to cancel it, as these operations are uncancellable. io_uring can cancel 852 * these operations if they haven’t yet been started. If they have been started, cancellations on 853 * these will fail. Network IO will generally be waiting interruptibly, and can hence be cancelled 854 * at any time. The completion event for this request will have a result of 0 if done successfully, 855 * `-EALREADY` if the operation is already in progress, and `-ENOENT` if the original request 856 * specified cannot be found. For cancellation requests that return `-EALREADY`, io_uring may or may 857 * not cause this request to be stopped sooner. For blocking IO, the original request will complete 858 * as it originally would have. For IO that is cancellable, it will terminate sooner if at all 859 * possible. 860 * 861 * Params: 862 * entry = `SubmissionEntry` to prepare 863 * userData = `user_data` field of the request that should be cancelled 864 * 865 * Note: Available from Linux 5.5 866 */ 867 ref SubmissionEntry prepCancel(D)(return ref SubmissionEntry entry, ref D userData, uint flags = 0) @trusted 868 { 869 entry.prepRW(Operation.ASYNC_CANCEL, -1, cast(void*)&userData); 870 entry.cancel_flags = flags; 871 return entry; 872 } 873 874 /** 875 * Prepares linked timeout operation. 876 * 877 * This request must be linked with another request through `IOSQE_IO_LINK` which is described below. 878 * Unlike `IORING_OP_TIMEOUT`, `IORING_OP_LINK_TIMEOUT` acts on the linked request, not the completion 879 * queue. The format of the command is otherwise like `IORING_OP_TIMEOUT`, except there's no 880 * completion event count as it's tied to a specific request. If used, the timeout specified in the 881 * command will cancel the linked command, unless the linked command completes before the 882 * timeout. The timeout will complete with `-ETIME` if the timer expired and the linked request was 883 * attempted cancelled, or `-ECANCELED` if the timer got cancelled because of completion of the linked 884 * request. 885 * 886 * Note: Available from Linux 5.5 887 * 888 * Params: 889 * entry = `SubmissionEntry` to prepare 890 * time = time specification 891 * flags = define if it's a relative or absolute time 892 */ 893 ref SubmissionEntry prepLinkTimeout(return ref SubmissionEntry entry, ref KernelTimespec time, TimeoutFlags flags = TimeoutFlags.REL) @trusted 894 { 895 entry.prepRW(Operation.LINK_TIMEOUT, -1, cast(void*)&time, 1, 0); 896 entry.timeout_flags = flags; 897 return entry; 898 } 899 900 /** 901 * Note: Available from Linux 5.5 902 */ 903 ref SubmissionEntry prepConnect(ADDR)(return ref SubmissionEntry entry, int fd, ref const(ADDR) addr) @trusted 904 { 905 return entry.prepRW(Operation.CONNECT, fd, cast(void*)&addr, 0, ADDR.sizeof); 906 } 907 908 /** 909 * Note: Available from Linux 5.6 910 */ 911 ref SubmissionEntry prepFilesUpdate(return ref SubmissionEntry entry, int[] fds, int offset) @safe 912 { 913 return entry.prepRW(Operation.FILES_UPDATE, -1, cast(void*)&fds[0], cast(uint)fds.length, offset); 914 } 915 916 /** 917 * Note: Available from Linux 5.6 918 */ 919 ref SubmissionEntry prepFallocate(return ref SubmissionEntry entry, int fd, int mode, long offset, long len) @trusted 920 { 921 return entry.prepRW(Operation.FALLOCATE, fd, cast(void*)len, mode, offset); 922 } 923 924 /** 925 * Note: Available from Linux 5.6 926 */ 927 ref SubmissionEntry prepOpenat(return ref SubmissionEntry entry, int fd, const char* path, int flags, uint mode) @trusted 928 { 929 entry.prepRW(Operation.OPENAT, fd, cast(void*)path, mode, 0); 930 entry.open_flags = flags; 931 return entry; 932 } 933 934 /** 935 * Note: Available from Linux 5.6 936 */ 937 ref SubmissionEntry prepClose(return ref SubmissionEntry entry, int fd) @safe 938 { 939 return entry.prepRW(Operation.CLOSE, fd); 940 } 941 942 /** 943 * Note: Available from Linux 5.6 944 */ 945 ref SubmissionEntry prepRead(return ref SubmissionEntry entry, int fd, ubyte[] buffer, long offset) @safe 946 { 947 return entry.prepRW(Operation.READ, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 948 } 949 950 /** 951 * Note: Available from Linux 5.6 952 */ 953 ref SubmissionEntry prepWrite(return ref SubmissionEntry entry, int fd, const(ubyte)[] buffer, long offset) @trusted 954 { 955 return entry.prepRW(Operation.WRITE, fd, cast(void*)&buffer[0], cast(uint)buffer.length, offset); 956 } 957 958 /** 959 * Note: Available from Linux 5.6 960 */ 961 ref SubmissionEntry prepStatx(Statx)(return ref SubmissionEntry entry, int fd, const char* path, int flags, uint mask, ref Statx statxbuf) @trusted 962 { 963 entry.prepRW(Operation.STATX, fd, cast(void*)path, mask, cast(ulong)(cast(void*)&statxbuf)); 964 entry.statx_flags = flags; 965 return entry; 966 } 967 968 /** 969 * Note: Available from Linux 5.6 970 */ 971 ref SubmissionEntry prepFadvise(return ref SubmissionEntry entry, int fd, long offset, uint len, int advice) @safe 972 { 973 entry.prepRW(Operation.FADVISE, fd, null, len, offset); 974 entry.fadvise_advice = advice; 975 return entry; 976 } 977 978 /** 979 * Note: Available from Linux 5.6 980 */ 981 ref SubmissionEntry prepMadvise(return ref SubmissionEntry entry, const(ubyte)[] block, int advice) @trusted 982 { 983 entry.prepRW(Operation.MADVISE, -1, cast(void*)&block[0], cast(uint)block.length, 0); 984 entry.fadvise_advice = advice; 985 return entry; 986 } 987 988 /** 989 * Note: Available from Linux 5.6 990 */ 991 ref SubmissionEntry prepSend(return ref SubmissionEntry entry, 992 int sockfd, const(ubyte)[] buf, MsgFlags flags = MsgFlags.NONE) @trusted 993 { 994 entry.prepRW(Operation.SEND, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 995 entry.msg_flags = flags; 996 return entry; 997 } 998 999 /** 1000 * Note: Available from Linux 5.6 1001 */ 1002 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry, 1003 int sockfd, ubyte[] buf, MsgFlags flags = MsgFlags.NONE) @trusted 1004 { 1005 entry.prepRW(Operation.RECV, sockfd, cast(void*)&buf[0], cast(uint)buf.length, 0); 1006 entry.msg_flags = flags; 1007 return entry; 1008 } 1009 1010 /** 1011 * Variant that uses registered buffers group. 1012 * 1013 * Note: Available from Linux 5.6 1014 */ 1015 ref SubmissionEntry prepRecv(return ref SubmissionEntry entry, 1016 int sockfd, ushort gid, uint len, MsgFlags flags = MsgFlags.NONE) @safe 1017 { 1018 entry.prepRW(Operation.RECV, sockfd, null, len, 0); 1019 entry.msg_flags = flags; 1020 entry.buf_group = gid; 1021 entry.flags |= SubmissionEntryFlags.BUFFER_SELECT; 1022 return entry; 1023 } 1024 1025 /** 1026 * Note: Available from Linux 5.6 1027 */ 1028 ref SubmissionEntry prepOpenat2(return ref SubmissionEntry entry, int fd, const char *path, ref OpenHow how) @trusted 1029 { 1030 return entry.prepRW(Operation.OPENAT2, fd, cast(void*)path, cast(uint)OpenHow.sizeof, cast(ulong)(cast(void*)&how)); 1031 } 1032 1033 /** 1034 * Note: Available from Linux 5.6 1035 */ 1036 ref SubmissionEntry prepEpollCtl(return ref SubmissionEntry entry, int epfd, int fd, int op, ref epoll_event ev) @trusted 1037 { 1038 return entry.prepRW(Operation.EPOLL_CTL, epfd, cast(void*)&ev, op, fd); 1039 } 1040 1041 /** 1042 * Note: Available from Linux 5.7 1043 */ 1044 ref SubmissionEntry prepSplice(return ref SubmissionEntry entry, 1045 int fd_in, ulong off_in, 1046 int fd_out, ulong off_out, 1047 uint nbytes, uint splice_flags) @safe 1048 { 1049 entry.prepRW(Operation.SPLICE, fd_out, null, nbytes, off_out); 1050 entry.splice_off_in = off_in; 1051 entry.splice_fd_in = fd_in; 1052 entry.splice_flags = splice_flags; 1053 return entry; 1054 } 1055 1056 /** 1057 * Note: Available from Linux 5.7 1058 * 1059 * Params: 1060 * entry = `SubmissionEntry` to prepare 1061 * buf = buffers to provide 1062 * len = length of each buffer to add 1063 * bgid = buffers group id 1064 * bid = starting buffer id 1065 */ 1066 ref SubmissionEntry prepProvideBuffers(return ref SubmissionEntry entry, ubyte[][] buf, uint len, ushort bgid, int bid) @safe 1067 { 1068 assert(buf.length < int.max, "Too many buffers"); 1069 entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)buf.length, cast(void*)&buf[0][0], len, bid); 1070 entry.buf_group = bgid; 1071 return entry; 1072 } 1073 1074 /// ditto 1075 ref SubmissionEntry prepProvideBuffers(size_t M, size_t N)(return ref SubmissionEntry entry, ref ubyte[M][N] buf, ushort bgid, int bid) @safe 1076 { 1077 static assert(N < int.max, "Too many buffers"); 1078 static assert(M < uint.max, "Buffer too large"); 1079 entry.prepRW(Operation.PROVIDE_BUFFERS, cast(int)N, cast(void*)&buf[0][0], cast(uint)M, bid); 1080 entry.buf_group = bgid; 1081 return entry; 1082 } 1083 1084 /// ditto 1085 ref SubmissionEntry prepProvideBuffer(size_t N)(return ref SubmissionEntry entry, ref ubyte[N] buf, ushort bgid, int bid) @safe 1086 { 1087 static assert(N < uint.max, "Buffer too large"); 1088 entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)N, bid); 1089 entry.buf_group = bgid; 1090 return entry; 1091 } 1092 1093 /// ditto 1094 ref SubmissionEntry prepProvideBuffer(return ref SubmissionEntry entry, ref ubyte[] buf, ushort bgid, int bid) @safe 1095 { 1096 assert(buf.length < uint.max, "Buffer too large"); 1097 entry.prepRW(Operation.PROVIDE_BUFFERS, 1, cast(void*)&buf[0], cast(uint)buf.length, bid); 1098 entry.buf_group = bgid; 1099 return entry; 1100 } 1101 1102 /** 1103 * Note: Available from Linux 5.7 1104 */ 1105 ref SubmissionEntry prepRemoveBuffers(return ref SubmissionEntry entry, int nr, ushort bgid) @safe 1106 { 1107 entry.prepRW(Operation.REMOVE_BUFFERS, nr); 1108 entry.buf_group = bgid; 1109 return entry; 1110 } 1111 1112 /** 1113 * Note: Available from Linux 5.8 1114 */ 1115 ref SubmissionEntry prepTee(return ref SubmissionEntry entry, int fd_in, int fd_out, uint nbytes, uint flags) @safe 1116 { 1117 entry.prepRW(Operation.TEE, fd_out, null, nbytes, 0); 1118 entry.splice_off_in = 0; 1119 entry.splice_fd_in = fd_in; 1120 entry.splice_flags = flags; 1121 return entry; 1122 } 1123 1124 private: 1125 1126 // uring cleanup 1127 void dispose(ref Uring uring) @trusted 1128 { 1129 if (uring.payload is null) return; 1130 // debug printf("uring(%d): dispose(%d)\n", uring.payload.fd, uring.payload.refs); 1131 if (--uring.payload.refs == 0) 1132 { 1133 import std.traits : hasElaborateDestructor; 1134 // debug printf("uring(%d): free\n", uring.payload.fd); 1135 static if (hasElaborateDestructor!UringDesc) 1136 destroy(*uring.payload); // call possible destructors 1137 free(cast(void*)uring.payload); 1138 } 1139 uring.payload = null; 1140 } 1141 1142 // system fields descriptor 1143 struct UringDesc 1144 { 1145 nothrow @nogc: 1146 1147 int fd; 1148 size_t refs; 1149 SetupParameters params; 1150 SubmissionQueue sq; 1151 CompletionQueue cq; 1152 1153 iovec[] regBuffers; 1154 1155 ~this() @trusted 1156 { 1157 if (regBuffers) free(cast(void*)®Buffers[0]); 1158 if (sq.ring) munmap(sq.ring, sq.ringSize); 1159 if (sq.sqes) munmap(cast(void*)&sq.sqes[0], sq.sqes.length * SubmissionEntry.sizeof); 1160 if (cq.ring && cq.ring != sq.ring) munmap(cq.ring, cq.ringSize); 1161 close(fd); 1162 } 1163 1164 private auto mapRings() @trusted 1165 { 1166 sq.ringSize = params.sq_off.array + params.sq_entries * uint.sizeof; 1167 cq.ringSize = params.cq_off.cqes + params.cq_entries * CompletionEntry.sizeof; 1168 1169 if (params.features & SetupFeatures.SINGLE_MMAP) 1170 { 1171 if (cq.ringSize > sq.ringSize) sq.ringSize = cq.ringSize; 1172 cq.ringSize = sq.ringSize; 1173 } 1174 1175 sq.ring = mmap(null, sq.ringSize, 1176 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1177 fd, SetupParameters.SUBMISSION_QUEUE_RING_OFFSET 1178 ); 1179 1180 if (sq.ring == MAP_FAILED) 1181 { 1182 sq.ring = null; 1183 return -errno; 1184 } 1185 1186 if (params.features & SetupFeatures.SINGLE_MMAP) 1187 cq.ring = sq.ring; 1188 else 1189 { 1190 cq.ring = mmap(null, cq.ringSize, 1191 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1192 fd, SetupParameters.COMPLETION_QUEUE_RING_OFFSET 1193 ); 1194 1195 if (cq.ring == MAP_FAILED) 1196 { 1197 cq.ring = null; 1198 return -errno; // cleanup is done in struct destructors 1199 } 1200 } 1201 1202 uint entries = *cast(uint*)(sq.ring + params.sq_off.ring_entries); 1203 sq.khead = cast(uint*)(sq.ring + params.sq_off.head); 1204 sq.ktail = cast(uint*)(sq.ring + params.sq_off.tail); 1205 sq.localTail = *sq.ktail; 1206 sq.ringMask = *cast(uint*)(sq.ring + params.sq_off.ring_mask); 1207 sq.kflags = cast(uint*)(sq.ring + params.sq_off.flags); 1208 sq.kdropped = cast(uint*)(sq.ring + params.sq_off.dropped); 1209 1210 // Indirection array of indexes to the sqes array (head and tail are pointing to this array). 1211 // As we don't need some fancy mappings, just initialize it with constant indexes and forget about it. 1212 // That way, head and tail are actually indexes to our sqes array. 1213 foreach (i; 0..entries) 1214 { 1215 *((cast(uint*)(sq.ring + params.sq_off.array)) + i) = i; 1216 } 1217 1218 auto psqes = mmap( 1219 null, entries * SubmissionEntry.sizeof, 1220 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, 1221 fd, SetupParameters.SUBMISSION_QUEUE_ENTRIES_OFFSET 1222 ); 1223 1224 if (psqes == MAP_FAILED) return -errno; 1225 sq.sqes = (cast(SubmissionEntry*)psqes)[0..entries]; 1226 1227 entries = *cast(uint*)(cq.ring + params.cq_off.ring_entries); 1228 cq.khead = cast(uint*)(cq.ring + params.cq_off.head); 1229 cq.localHead = *cq.khead; 1230 cq.ktail = cast(uint*)(cq.ring + params.cq_off.tail); 1231 cq.ringMask = *cast(uint*)(cq.ring + params.cq_off.ring_mask); 1232 cq.koverflow = cast(uint*)(cq.ring + params.cq_off.overflow); 1233 cq.cqes = (cast(CompletionEntry*)(cq.ring + params.cq_off.cqes))[0..entries]; 1234 cq.kflags = cast(uint*)(cq.ring + params.cq_off.flags); 1235 return 0; 1236 } 1237 } 1238 1239 /// Wraper for `SubmissionEntry` queue 1240 struct SubmissionQueue 1241 { 1242 nothrow @nogc: 1243 1244 // mmaped fields 1245 uint* khead; // controlled by kernel 1246 uint* ktail; // controlled by us 1247 uint* kflags; // controlled by kernel (ie IORING_SQ_NEED_WAKEUP) 1248 uint* kdropped; // counter of invalid submissions (out of bound index) 1249 uint ringMask; // constant mask used to determine array index from head/tail 1250 1251 // mmap details (for cleanup) 1252 void* ring; // pointer to the mmaped region 1253 size_t ringSize; // size of mmaped memory block 1254 1255 // mmapped list of entries (fixed length) 1256 SubmissionEntry[] sqes; 1257 1258 uint localTail; // used for batch submission 1259 1260 uint head() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*khead); } 1261 uint tail() const @safe pure { return localTail; } 1262 1263 void flushTail() @safe pure 1264 { 1265 pragma(inline, true); 1266 // debug printf("SQ updating tail: %d\n", localTail); 1267 atomicStore!(MemoryOrder.rel)(*ktail, localTail); 1268 } 1269 1270 SubmissionQueueFlags flags() const @safe pure 1271 { 1272 return cast(SubmissionQueueFlags)atomicLoad!(MemoryOrder.raw)(*kflags); 1273 } 1274 1275 bool full() const @safe pure { return sqes.length == length; } 1276 1277 size_t length() const @safe pure { return tail - head; } 1278 1279 size_t capacity() const @safe pure { return sqes.length - length; } 1280 1281 ref SubmissionEntry next()() @safe pure return 1282 { 1283 return sqes[localTail++ & ringMask]; 1284 } 1285 1286 void put()(auto ref SubmissionEntry entry) @safe pure 1287 { 1288 assert(!full, "SumbissionQueue is full"); 1289 sqes[localTail++ & ringMask] = entry; 1290 } 1291 1292 void put(OP)(auto ref OP op) 1293 if (!is(OP == SubmissionEntry)) 1294 { 1295 assert(!full, "SumbissionQueue is full"); 1296 sqes[localTail++ & ringMask].fill(op); 1297 } 1298 1299 private void putWith(alias FN, ARGS...)(auto ref ARGS args) 1300 { 1301 import std.traits : Parameters, ParameterStorageClass, ParameterStorageClassTuple; 1302 1303 static assert( 1304 Parameters!FN.length >= 1 1305 && is(Parameters!FN[0] == SubmissionEntry) 1306 && ParameterStorageClassTuple!FN[0] == ParameterStorageClass.ref_, 1307 "Alias function must accept at least `ref SubmissionEntry`"); 1308 1309 static assert( 1310 is(typeof(FN(sqes[localTail & ringMask], args))), 1311 "Provided function is not callable with " ~ (Parameters!((ref SubmissionEntry e, ARGS args) {})).stringof); 1312 1313 assert(!full, "SumbissionQueue is full"); 1314 FN(sqes[localTail++ & ringMask], args); 1315 } 1316 1317 uint dropped() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*kdropped); } 1318 } 1319 1320 struct CompletionQueue 1321 { 1322 nothrow @nogc: 1323 1324 // mmaped fields 1325 uint* khead; // controlled by us (increment after entry at head was read) 1326 uint* ktail; // updated by kernel 1327 uint* koverflow; 1328 uint* kflags; 1329 CompletionEntry[] cqes; // array of entries (fixed length) 1330 1331 uint ringMask; // constant mask used to determine array index from head/tail 1332 1333 // mmap details (for cleanup) 1334 void* ring; 1335 size_t ringSize; 1336 1337 uint localHead; // used for bulk reading 1338 1339 uint head() const @safe pure { return localHead; } 1340 uint tail() const @safe pure { return atomicLoad!(MemoryOrder.acq)(*ktail); } 1341 1342 void flushHead() @safe pure 1343 { 1344 pragma(inline, true); 1345 // debug printf("CQ updating head: %d\n", localHead); 1346 atomicStore!(MemoryOrder.rel)(*khead, localHead); 1347 } 1348 1349 bool empty() const @safe pure { return head == tail; } 1350 1351 ref CompletionEntry front() @safe pure return 1352 { 1353 assert(!empty, "CompletionQueue is empty"); 1354 return cqes[localHead & ringMask]; 1355 } 1356 1357 void popFront() @safe pure 1358 { 1359 pragma(inline); 1360 assert(!empty, "CompletionQueue is empty"); 1361 localHead++; 1362 flushHead(); 1363 } 1364 1365 size_t length() const @safe pure { return tail - localHead; } 1366 1367 uint overflow() const @safe pure { return atomicLoad!(MemoryOrder.raw)(*koverflow); } 1368 1369 /// Runtime CQ flags - written by the application, shouldn't be modified by the kernel. 1370 void flags(CQRingFlags flags) @safe pure { atomicStore!(MemoryOrder.raw)(*kflags, flags); } 1371 } 1372 1373 // just a helper to use atomicStore more easily with older compilers 1374 void atomicStore(MemoryOrder ms, T, V)(ref T val, V newVal) @trusted 1375 { 1376 pragma(inline, true); 1377 import core.atomic : store = atomicStore; 1378 static if (__VERSION__ >= 2089) store!ms(val, newVal); 1379 else store!ms(*(cast(shared T*)&val), newVal); 1380 } 1381 1382 // just a helper to use atomicLoad more easily with older compilers 1383 T atomicLoad(MemoryOrder ms, T)(ref const T val) @trusted 1384 { 1385 pragma(inline, true); 1386 import core.atomic : load = atomicLoad; 1387 static if (__VERSION__ >= 2089) return load!ms(val); 1388 else return load!ms(*(cast(const shared T*)&val)); 1389 } 1390 1391 version (assert) 1392 { 1393 import std.range.primitives : ElementType, isInputRange, isOutputRange; 1394 static assert(isInputRange!Uring && is(ElementType!Uring == CompletionEntry)); 1395 static assert(isOutputRange!(Uring, SubmissionEntry)); 1396 }