对于提交事件这一端规则仍然被保留。应用程序更新 tail 同时内核消费 head,一个重要的不同点是 CQ ring 直接索引 CQEs 的共享内存,提交端在它们中有一个 indirection 的 array,因此提交端的 ring buffer 是通过 index 直接访问 array。
一个例子如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13
structio_uring_sqe *sqe; unsigned tail, index; tail = sqring→tail; index = tail & (*sqring→ring_mask); sqe = &sqring→sqes[index]; /* this call fills in the sqe entries for this IO */ init_io(sqe); /* fill the sqe index into the SQ ring array */ sqring→array[index] = index; tail++; write_barrier(); sqring→tail = tail; write_barrier();
完成事件可能以任何顺序达到,请求的顺序和完成的顺序没有任何联系,SQ ring 和 CQ ring 独立地运行。然而,一个完成的事件将总是与一个请求的事件相适配。因此,一个完成的事件将总和一个特定的提交请求相联系。
structio_sqring_offsets { __u32 head; /* offset of ring head */ __u32 tail; /* offset of ring tail */ __u32 ring_mask; /* ring mask value */ __u32 ring_entries; /* entries in ring */ __u32 flags; /* ring flags */ __u32 dropped; /* number of sqes not submitted */ __u32 array; /* sqe index array / __u32 resv1; __u64 resv2; };
为了获取这段内存,应用程序必须使用 mmap 通过 io_uring 的文件描述符和与 SQ ring 相关联的内存偏移量,io_uring 的 API 定义了如下的 mmap 偏移量从而能被应用程序所使用:
structio_uring_sqesqe; structio_uring_cqecqe; /* get an sqe and fill in a READV operation */ sqe = io_uring_get_sqe(&ring); io_uring_prep_readv(sqe, fd, &iovec, 1, offset); /* tell the kernel we have an sqe ready for consumption */ io_uring_submit(&ring); /* wait for the sqe to complete */ io_uring_wait_cqe(&ring, &cqe); /* read and process cqe event */ app_handle_cqe(cqe); io_uring_cqe_seen(&ring, cqe);
for i in0..sockets.len() { if sockets[i] == fd { sockets.remove(i); } }
unsafe { libc::close(fd); } }else { // 读取成功,此时的结果表明读取的字节数 let len = ret asusize; // 获取用来获取 read 的缓冲区 let buf = &buf_alloc[buf_index];
let socket_len = sockets.len(); token_alloc.remove(token_index); for i in0..socket_len { // 新建write_token并将其传输给所有正在连接的socket let write_token = Token::Write { fd: sockets[i], buf_index, len, offset: 0 };
let write_token_index = token_alloc.insert(write_token);
// 注册 write 事件,实际上是注册 send syscall 的事件 let write_e = opcode::Send::new(types::Fd(sockets[i]), buf.as_ptr(), len as _) .build() .user_data(write_token_index as _); unsafe { if sq.push(&write_e).is_err() { backlog.push_back(write_e); } } }
} }
Token::Write { fd, buf_index, offset, len } => { // write(send) 事件返回,此时的结果是写字节数 let write_len = ret asusize;
// 如果写偏移量的写数据的字节数大于等于要写的长度, // 此时表明已经写完,则开始注册等待事件继续轮询socket是否传输信息 let entry = if offset + write_len >= len { bufpool.push(buf_index);
*token = Token::Poll { fd };
opcode::PollAdd::new(types::Fd(fd), libc::POLLIN as _) .build() .user_data(token_index as _) }else { // 如果没写完的话则更新参数重新写 // 将写偏移量加上写字节数 let offset = offset + write_len; // 将要写的数据长度减去偏移量 let len = len - offset; // 通过偏移量获取缓冲区的指针 let buf = &buf_alloc[buf_index][offset..];
*token = Token::Write { fd, buf_index, offset, len };
opcode::Write::new(types::Fd(fd), buf.as_ptr(), len as _) .build() .user_data(token_index as _) };