Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace the current file IO solution with one that uses roc buffers #282

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

180 changes: 133 additions & 47 deletions crates/roc_host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
#![allow(non_snake_case)]
#![allow(improper_ctypes)]
use core::ffi::c_void;
use roc_std::{RocBox, RocList, RocResult, RocStr, ReadOnlyRocList, ReadOnlyRocStr};
use core::panic;
use roc_std::{ReadOnlyRocList, ReadOnlyRocStr, RocBox, RocList, RocRefcounted, RocResult, RocStr};
use roc_std_heap::ThreadSafeRefcountedResourceHeap;
use std::borrow::{Borrow, Cow};
use std::ffi::OsStr;
Expand All @@ -29,10 +30,10 @@ thread_local! {
.unwrap();
}

static ARGS : OnceLock<ReadOnlyRocList<ReadOnlyRocStr>> = OnceLock::new();
static ARGS: OnceLock<ReadOnlyRocList<ReadOnlyRocStr>> = OnceLock::new();

fn file_heap() -> &'static ThreadSafeRefcountedResourceHeap<BufReader<File>> {
static FILE_HEAP: OnceLock<ThreadSafeRefcountedResourceHeap<BufReader<File>>> = OnceLock::new();
fn file_heap() -> &'static ThreadSafeRefcountedResourceHeap<File> {
static FILE_HEAP: OnceLock<ThreadSafeRefcountedResourceHeap<File>> = OnceLock::new();
FILE_HEAP.get_or_init(|| {
let DEFAULT_MAX_FILES = 65536;
let max_files = env::var("ROC_BASIC_CLI_MAX_FILES")
Expand Down Expand Up @@ -303,6 +304,7 @@ pub fn init() {
roc_fx_fileReadBytes as _,
roc_fx_fileReader as _,
roc_fx_fileReadLine as _,
roc_fx_fileReadByteBuf as _,
roc_fx_fileDelete as _,
roc_fx_cwd as _,
roc_fx_posixTime as _,
Expand Down Expand Up @@ -337,7 +339,8 @@ pub fn init() {

#[no_mangle]
pub extern "C" fn rust_main(args: ReadOnlyRocList<ReadOnlyRocStr>) -> i32 {
ARGS.set(args).unwrap_or_else(|_| panic!("only one thread running, must be able to set args"));
ARGS.set(args)
.unwrap_or_else(|_| panic!("only one thread running, must be able to set args"));
init();

extern "C" {
Expand Down Expand Up @@ -375,7 +378,9 @@ pub extern "C" fn roc_fx_envDict() -> RocList<(RocStr, RocStr)> {
#[no_mangle]
pub extern "C" fn roc_fx_args() -> ReadOnlyRocList<ReadOnlyRocStr> {
// Note: the clone here is no-op since the refcount is readonly. Just goes from &RocList to RocList.
ARGS.get().expect("args was set during init and must be here").clone()
ARGS.get()
.expect("args was set during init and must be here")
.clone()
}

#[no_mangle]
Expand Down Expand Up @@ -571,34 +576,43 @@ fn path_from_roc_path(bytes: &RocList<u8>) -> Cow<'_, Path> {

#[no_mangle]
pub extern "C" fn roc_fx_fileReadBytes(roc_path: &RocList<u8>) -> RocResult<RocList<u8>, RocStr> {
// TODO: write our own duplicate of `read_to_end` that directly fills a `RocList<u8>`.
// This adds an extra O(n) copy.
let mut bytes = Vec::new();

match File::open(path_from_roc_path(roc_path)) {
Ok(mut file) => match file.read_to_end(&mut bytes) {
Ok(_bytes_read) => RocResult::ok(RocList::from(bytes.as_slice())),
Err(err) => RocResult::err(toRocReadError(err)),
},
Ok(mut file) => {
let size = file
.metadata()
.map(|m| m.len())
.expect("TODO: make robust: file has not size?");
let mut buf_list = RocList::with_capacity(size as usize);
let buf_slice: &mut [u8] = unsafe {
std::slice::from_raw_parts_mut(buf_list.as_mut_ptr(), buf_list.capacity())
};

match file.read_exact(buf_slice) {
Ok(()) => {
let out_list = unsafe {
RocList::from_raw_parts(
buf_list.as_mut_ptr(),
buf_list.capacity(),
buf_list.capacity(),
)
};
std::mem::forget(buf_list);

RocResult::ok(out_list)
}
Err(err) => RocResult::err(toRocReadError(err)),
}
}
Err(err) => RocResult::err(toRocReadError(err)),
}
}

#[no_mangle]
pub extern "C" fn roc_fx_fileReader(
roc_path: &RocList<u8>,
size: u64,
) -> RocResult<RocBox<()>, RocStr> {
pub extern "C" fn roc_fx_fileReader(roc_path: &RocList<u8>) -> RocResult<RocBox<()>, RocStr> {
match File::open(path_from_roc_path(roc_path)) {
Ok(file) => {
let buf_reader = if size > 0 {
BufReader::with_capacity(size as usize, file)
} else {
BufReader::new(file)
};

let heap = file_heap();
let alloc_result = heap.alloc_for(buf_reader);
let alloc_result = heap.alloc_for(file);
match alloc_result {
Ok(out) => RocResult::ok(out),
Err(err) => RocResult::err(toRocReadError(err)),
Expand All @@ -609,47 +623,119 @@ pub extern "C" fn roc_fx_fileReader(
}

#[no_mangle]
pub extern "C" fn roc_fx_fileReadLine(data: RocBox<()>) -> RocResult<RocList<u8>, RocStr> {
let buf_reader: &mut BufReader<File> = ThreadSafeRefcountedResourceHeap::box_to_resource(data);
pub extern "C" fn roc_fx_fileReadLine(
data: RocBox<()>,
//TODO: this would allow the internal buffer to get much much bigger, is this acceptable? SHould we maybe include a warning about that
buffer: RocList<u8>,
) -> RocResult<RocList<u8>, RocStr> {
let file: &mut File = ThreadSafeRefcountedResourceHeap::box_to_resource(data);

let mut buffer = RocList::empty();
match read_until(buf_reader, b'\n', &mut buffer) {
Ok(..) => {
let buffer = if buffer.is_unique() {
buffer
} else {
RocList::with_capacity(8000)
};
match read_until(file, b'\n', buffer) {
Ok(mut buffer) => {
buffer.inc();
// Note: this returns an empty list when no bytes were read, e.g. End Of File
RocResult::ok(buffer)
}
Err(err) => RocResult::err(err.to_string().as_str().into()),
}
}
// We should be able to ask the user to "return" their buffer. So that if they do they get the same buffer back and we don't have to re-allocate. Should be a nice optimization.
// TODO: If the capacity is larger but the len isn't right we should be able to extend the len to match. I don't have access to a function that does that though
#[no_mangle]
pub extern "C" fn roc_fx_fileReadByteBuf(
reader: RocBox<()>,
buf: &mut RocList<u8>,
) -> RocResult<RocList<u8>, RocStr> {
let file: &mut File = ThreadSafeRefcountedResourceHeap::box_to_resource(reader);

let canUseInternal = buf.is_unique();

if canUseInternal {
unsafe {
//This ensures we always expand the buffer to the full capacity of the list
let buf_slice: &mut [u8] =
std::slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
loop {
let read = match file.read(buf_slice) {
Ok(n) => n,
Err(ref e) if matches!(e.kind(), ErrorKind::Interrupted) => continue,
Err(e) => return RocResult::err(e.to_string().as_str().into()),
};
let mut roc_list = RocList::from_raw_parts(buf.as_mut_ptr(), read, buf.capacity());
roc_list.inc();

return RocResult::ok(roc_list);
}
}
} else {
// return RocResult::err("not unique".into());
unsafe {
//Make a new list
let mut list = RocList::with_capacity(buf.capacity());
//get a slice to the full memmory of the list
let slice: &mut [u8] =
std::slice::from_raw_parts_mut(list.as_mut_ptr(), list.capacity());
loop {
let read = match file.read(slice) {
Ok(n) => n,
Err(ref e) if matches!(e.kind(), ErrorKind::Interrupted) => continue,
Err(e) => return RocResult::err(e.to_string().as_str().into()),
};
//update the length based on amount read
let roc_list = RocList::from_raw_parts(list.as_mut_ptr(), read, list.capacity());
std::mem::forget(list);
return RocResult::ok(roc_list);
}
}
}
}

fn read_until<R: BufRead + ?Sized>(
/// Reads until the provided delim expanding the roc buffer as it goes. Returns a new reference to the same roc buffer but with a length exactly as long as the
fn read_until<R: Read + ?Sized>(
r: &mut R,
delim: u8,
buf: &mut RocList<u8>,
) -> io::Result<usize> {
mut buf: RocList<u8>,
) -> io::Result<RocList<u8>> {
let mut read = 0;
let og_capacity = buf.capacity();
loop {
let (done, used) = {
let available = match r.fill_buf() {
//get a slice between the end of the last read and the end of the buffer
let buf_slice: &mut [u8] = unsafe {
std::slice::from_raw_parts_mut(buf.as_mut_ptr().add(read), buf.capacity() - read)
};
let this_read = match r.read(buf_slice) {
Ok(n) => n,
Err(ref e) if matches!(e.kind(), ErrorKind::Interrupted) => continue,
Err(e) => return Err(e),
};
match memchr::memchr(delim, available) {
Some(i) => {
buf.extend_from_slice(&available[..=i]);
(true, i + 1)
}
None => {
buf.extend_from_slice(available);
(false, available.len())
//if we read 0 bytes we are done because that's EOF
if this_read == 0 {
(true, 0)
} else {
let readSlice: &[u8] = &buf_slice[..this_read];
match memchr::memchr(delim, readSlice) {
Some(i) => (true, i + 1),
None => (false, this_read),
}
}
};
r.consume(used);
read += used;
if done || used == 0 {
return Ok(read);
let out = unsafe { RocList::from_raw_parts(buf.as_mut_ptr(), read, buf.capacity()) };
//Don't drop the buffer because we are returning it
std::mem::forget(buf);
return Ok(out);
}

// Ensure we have enough capacity for the next read
if buf.capacity() < read + og_capacity {
buf.reserve(og_capacity);
}
}
}
Expand Down Expand Up @@ -1043,9 +1129,9 @@ pub extern "C" fn roc_fx_tcpReadUntil(
let stream: &mut BufReader<TcpStream> =
ThreadSafeRefcountedResourceHeap::box_to_resource(stream);

let mut buffer = RocList::empty();
match read_until(stream, byte, &mut buffer) {
Ok(_) => RocResult::ok(buffer),
let buffer = RocList::with_capacity(8000);
match read_until(stream, byte, buffer) {
Ok(buffer) => RocResult::ok(buffer),
Err(err) => RocResult::err(to_tcp_stream_err(err)),
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/roc_host_bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use roc_std::{RocList, RocStr, ReadOnlyRocList, ReadOnlyRocStr};
use roc_std::{ReadOnlyRocList, ReadOnlyRocStr, RocList, RocStr};
use std::borrow::Borrow;

fn main() {
Expand Down
9 changes: 7 additions & 2 deletions crates/roc_host_lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use roc_std::{RocList, RocStr, ReadOnlyRocList, ReadOnlyRocStr};
use roc_std::{ReadOnlyRocList, ReadOnlyRocStr, RocList, RocStr};
use std::borrow::Borrow;

/// # Safety
/// This function is the entry point for the program, it will be linked by roc using the legacy linker
/// to produce the final executable.
///
/// Note we use argc and argv to pass arguments to the program instead of std::env::args().
#[no_mangle]
pub unsafe extern "C" fn main(argc: usize, argv: *const *const i8) -> i32 {
let args = std::slice::from_raw_parts(argv, argc);

let mut args: RocList<ReadOnlyRocStr> = args
.into_iter()
.iter()
.map(|&c_ptr| {
let c_str = std::ffi::CStr::from_ptr(c_ptr);
let roc_str = RocStr::from(c_str.to_string_lossy().borrow());
Expand Down
Loading
Loading