Implement AsyncFile methods directly

This commit is contained in:
htrefil 2020-09-17 17:33:24 +02:00
parent 3e2117670b
commit 2861740760
2 changed files with 78 additions and 40 deletions

View file

@ -1,11 +1,16 @@
use libc::c_int;
use mio::event::Evented; use mio::event::Evented;
use mio::unix::EventedFd; use mio::unix::EventedFd;
use mio::{Poll, PollOpt, Ready, Token}; use mio::{Poll, PollOpt, Ready, Token};
use std::convert::AsRef; use std::convert::AsRef;
use std::convert::TryInto;
use std::ffi::CString;
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::io::ErrorKind;
use std::io::{Error, Read, Write}; use std::io::{Error, Read, Write};
use std::os::unix::ffi::OsStringExt;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::path::Path; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::task::{self, Context}; use std::task::{self, Context};
use tokio::io::{AsyncRead, AsyncWrite, PollEvented}; use tokio::io::{AsyncRead, AsyncWrite, PollEvented};
@ -16,24 +21,28 @@ pub struct AsyncFile {
impl AsyncFile { impl AsyncFile {
pub async fn open(path: impl AsRef<Path>, mode: OpenMode) -> Result<Self, Error> { pub async fn open(path: impl AsRef<Path>, mode: OpenMode) -> Result<Self, Error> {
let mut options = OpenOptions::new();
match mode {
OpenMode::Read => options.read(true),
OpenMode::Write => options.write(true),
OpenMode::ReadWrite => options.read(true).write(true),
};
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let inner = Inner(tokio::task::spawn_blocking(move || options.open(path)).await??);
Ok(Self { tokio::task::spawn_blocking(move || Self::open_sync(path, mode)).await?
file: PollEvented::new(inner)?,
})
} }
}
impl AsRawFd for AsyncFile { fn open_sync(path: PathBuf, mode: OpenMode) -> Result<Self, Error> {
fn as_raw_fd(&self) -> RawFd { let path =
self.file.get_ref().0.as_raw_fd() CString::new(path.into_os_string().into_vec()).map_err(|_| ErrorKind::InvalidInput)?;
let flags = match mode {
OpenMode::Read => libc::O_RDONLY,
OpenMode::Write => libc::O_WRONLY,
OpenMode::ReadWrite => libc::O_RDWR,
};
let fd = unsafe { libc::open(path.as_ptr(), flags | libc::O_NONBLOCK) };
if fd == -1 {
return Err(Error::last_os_error());
}
Ok(AsyncFile {
file: PollEvented::new(Inner { fd })?,
})
} }
} }
@ -71,7 +80,51 @@ impl AsyncWrite for AsyncFile {
} }
} }
struct Inner(File); #[derive(Clone, Copy, Debug)]
pub enum OpenMode {
Read,
Write,
ReadWrite,
}
struct Inner {
fd: c_int,
}
impl Read for Inner {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Error> {
let size = buffer
.len()
.try_into()
.map_err(|_| ErrorKind::InvalidInput)?;
let read = unsafe { libc::read(self.fd, buffer.as_mut_ptr() as *mut _, size) };
if read == -1 {
return Err(Error::last_os_error());
}
Ok(read.try_into().unwrap())
}
}
impl Write for Inner {
fn write(&mut self, data: &[u8]) -> Result<usize, Error> {
let size = data.len().try_into().map_err(|_| ErrorKind::InvalidInput)?;
let written = unsafe { libc::write(self.fd, data.as_ptr() as *mut _, size) };
if written == -1 {
return Err(Error::last_os_error());
}
Ok(written.try_into().unwrap())
}
fn flush(&mut self) -> Result<(), Error> {
if unsafe { libc::fsync(self.fd) == -1 } {
return Err(Error::last_os_error());
}
Ok(())
}
}
impl Evented for Inner { impl Evented for Inner {
fn register( fn register(
@ -81,7 +134,7 @@ impl Evented for Inner {
interest: Ready, interest: Ready,
opts: PollOpt, opts: PollOpt,
) -> Result<(), Error> { ) -> Result<(), Error> {
EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts) EventedFd(&self.fd).register(poll, token, interest, opts)
} }
fn reregister( fn reregister(
@ -91,33 +144,18 @@ impl Evented for Inner {
interest: Ready, interest: Ready,
opts: PollOpt, opts: PollOpt,
) -> Result<(), Error> { ) -> Result<(), Error> {
EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts) EventedFd(&self.fd).reregister(poll, token, interest, opts)
} }
fn deregister(&self, poll: &Poll) -> Result<(), Error> { fn deregister(&self, poll: &Poll) -> Result<(), Error> {
EventedFd(&self.0.as_raw_fd()).deregister(poll) EventedFd(&self.fd).deregister(poll)
} }
} }
impl Read for Inner { impl Drop for Inner {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Error> { fn drop(&mut self) {
self.0.read(buffer) unsafe {
libc::close(self.fd);
}
} }
} }
impl Write for Inner {
fn write(&mut self, data: &[u8]) -> Result<usize, Error> {
self.0.write(data)
}
fn flush(&mut self) -> Result<(), Error> {
self.0.flush()
}
}
#[derive(Clone, Copy, Debug)]
pub enum OpenMode {
Read,
Write,
ReadWrite,
}