-
Notifications
You must be signed in to change notification settings - Fork 4.2k
ARROW-11887: [C++] Add asynchronous read to streaming CSV reader #9644
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,15 +104,22 @@ class ARROW_EXPORT Executor { | |
| template <typename T> | ||
| Future<T> Transfer(Future<T> future) { | ||
| auto transferred = Future<T>::Make(); | ||
| future.AddCallback([this, transferred](const Result<T>& result) mutable { | ||
| auto callback = [this, transferred](const Result<T>& result) mutable { | ||
| auto spawn_status = Spawn([transferred, result]() mutable { | ||
| transferred.MarkFinished(std::move(result)); | ||
| }); | ||
| if (!spawn_status.ok()) { | ||
| transferred.MarkFinished(spawn_status); | ||
| } | ||
| }); | ||
| return transferred; | ||
| }; | ||
| auto callback_factory = [&callback]() { return callback; }; | ||
| if (future.TryAddCallback(callback_factory)) { | ||
| return transferred; | ||
| } | ||
| // If the future is already finished and we aren't going to force spawn a thread | ||
| // then we don't need to add another layer of callback and can return the original | ||
| // future | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't this change the semantics? I may want
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case the future is already finished. There is nothing we can do. The old implementation suffered from the same problem. So far this isn't a problem, the only place we call transfer is from the CPU thread pool to take something off the I/O thread pool. If the future is finished then the callback will run synchronously on the CPU thread pool anyways. The only place it could really be an issue I suppose is if you were trying to transfer it onto a different thread pool than the calling thread pool. That being said I could revert all the changes to this method. They were leftover from an earlier misunderstanding and they don't change the behavior of
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Are you sure it does? That said, I do think it's a problem that this doesn't reliably transfer the callbacks to the thread pool. Perhaps this isn't the right API after all: instead of
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with the new API. C# has that API as well (Task::ContinueWith can take a scheduler) so there is some precedence. I'll add a follow-up PR. I don't believe it is necessary for the current work though. You are correct, the old method could possible introduce a new thread task spawned in some situations where this doesn't (e.g. if the callback is added very quickly after calling transfer before the TP has a chance to schedule the "mark finished" task). I don't really see any reason this is desirable as it can only mean more thread pool tasks without any benefit. So I will leave this as is for now.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sounds good.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return future; | ||
| } | ||
|
|
||
| // Submit a callable and arguments for execution. Return a future that | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like across CSV, Parquet, and Feather, we now have two distinct approaches to async reading: here we add a method to asynchronously read the next batch, while in Parquet/Feather we add a method to convert a reader to a generator of batches. We should probably pick one for consistency's sake.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was already different. The streaming CSV reader has a "read batch" method and it was used to create an iterator of batches. I think keeping the generator logic out of the readers would be ideal.