Skip to content

Amp\disperse() function#460

Open
rela589n wants to merge 1 commit into
amphp:3.xfrom
rela589n:feature-disperse-function
Open

Amp\disperse() function#460
rela589n wants to merge 1 commit into
amphp:3.xfrom
rela589n:feature-disperse-function

Conversation

@rela589n
Copy link
Copy Markdown

In this pull request, I want to introduce a disperse($closures) function.

It is created to asynchronously execute an array of closures when the client needs all results.

$results = \Amp\disperse([
    fn () => $httpClient->request(new Request('https://www.google.com', 'HEAD')),
    fn () => $httpClient->request(new Request('https://www.bing.com', 'HEAD')),
]);

Initially, I considered placing this function in Amp\Future namespace, but later I opted in favor of Amp, since this function doesn't have Futures in the signature.

Comment thread src/functions.php Outdated
@xpader
Copy link
Copy Markdown
Contributor

xpader commented Mar 27, 2026

What is that??? await(iterable $futures) race order version ???
I don't think we need add this to amp library.

@rela589n rela589n force-pushed the feature-disperse-function branch from 4270738 to 3a32eb7 Compare March 27, 2026 06:54
@rela589n rela589n force-pushed the feature-disperse-function branch from 3a32eb7 to 85eb948 Compare March 27, 2026 06:55
@rela589n
Copy link
Copy Markdown
Author

@xpader , if you don't find this useful at all, you are the contributor, and I'm a layman - it's in your authority to close.

@rela589n
Copy link
Copy Markdown
Author

rela589n commented Apr 6, 2026

Maybe this's not the best name for such a function.
Maybe naming it some other way would bring clarity?
What's on your mind?

@rela589n rela589n requested review from kelunik and xpader April 9, 2026 16:29
webpatser added a commit to webpatser/fledge-fiber that referenced this pull request Apr 10, 2026
Security:
- Fix HTTP/2 ping flood on active streams (amphp/http-server#386)

Bug fixes:
- Use VarString for string params in binary protocol (amphp/mysql#142)
- Decode BIT columns as int instead of string (amphp/mysql#138)
- Close connections on pool destruct (amphp/http-client#396)
- Fix duplicate keys in byte-stream split() (amphp/byte-stream#113)
- Fix Closure type annotation for static analysis (amphp/amp#451)
- Safely handle DisposedException on unsubscribe (amphp/redis#100)

Features:
- Add TLS support for Redis connections (amphp/redis#98)
- Add disperse() for concurrent closure execution (amphp/amp#460)
@rela589n
Copy link
Copy Markdown
Author

Hi, @webpatser, do you think it's a good thing to include?

@rela589n
Copy link
Copy Markdown
Author

Hi, guys
Can we move toward some logical conclusion about this?

@rela589n
Copy link
Copy Markdown
Author

Hi, @xpader !
How is it going?

@xpader
Copy link
Copy Markdown
Contributor

xpader commented Apr 26, 2026

@rela589n I don't thing we shoud add this to amp library, you can use it yourself.

@rela589n
Copy link
Copy Markdown
Author

I hope you're doing well.

Of course, I could have added this to my repo; I would not have opened the PR then.

Yet, I think you wouldn't want to bother yourself with async() and await() functions when you just want to run something "in parallel".

For example, consider a cache warmup scenario with disperse:

disperse($cacheWarmupFunctions);

Without disperse we'd have to wrap those into async/await:

$futures = array_map(async(...), $cacheWarmupFunctions);
awaitAll($futures);

Which obscures the high picture.

I don't think you'd want to write this every time if you can avoid writing this all the time.

@webpatser
Copy link
Copy Markdown

Hey @rela589n, sorry for the slow turnaround, your ping slipped past me. Cheers @xpader for chiming in.

My honest take: I don't think disperse() earns a spot in core. The wrapper is two lines today:

Future\await(array_map(async(...), $closures));

That already reads cleanly once async()/await() are familiar, and a third name for the same thing grows the API surface without unlocking new behavior. The naming is also tricky; "disperse" didn't immediately read as "run all in parallel and collect results" to me. awaitAll or parallel are closer, but both come with their own history.

The cache-warmup example is fair. That's a great fit for a small helper in app code or a userland package, where you can name it whatever fits your team.

Not a hard no. If the maintainers want it in, I won't push back. But my vote leans toward closing and keeping the helper in user land.

@kelunik
Copy link
Copy Markdown
Member

kelunik commented May 3, 2026

I think there's value in having such a function, you don't want to write that array_map repeatedly. I'm however not sure on the name:

Maybe this's not the best name for such a function.
Maybe naming it some other way would bring clarity?

There likely is a better name. And we should be really sure it's a good name if we add it to the core library, because we can't just release another major version of it if we're wrong.

I'm totally happy to have this sit around for a while, if someone comes good with naming suggestions and it's still good after a while, we can proceed and merge this.

@rela589n
Copy link
Copy Markdown
Author

rela589n commented May 3, 2026

What do you think about "scatter"?

It means: (of a group of people or animals) separate and move off quickly in different directions.

@kelunik
Copy link
Copy Markdown
Member

kelunik commented May 3, 2026

I think it needs to somehow cover the dispersion and then collecting the results again, thought about forkJoin (but don't like it because fork usually refers to processes) and runConcurrently so far.

@rela589n
Copy link
Copy Markdown
Author

rela589n commented May 4, 2026

If we consider forkJoin, at first, I didn't really think of "fork" as of a verb, but of a noun.

cover the dispersion and then collecting the results again

TBH, it reminds me of Scatter-Gather (Not that I ever used it, but it clicked in me) enterprise pattern:

broadcasts a message to multiple recipients and re-aggregates the responses back into a single message.

In our case, it is:

$results = \Amp\scatterGather([
    fn () => $httpClient->request(new Request('https://www.google.com', 'HEAD')),
    fn () => $httpClient->request(new Request('https://www.bing.com', 'HEAD')),
]);

@webpatser
Copy link
Copy Markdown

Hey @rela589n, @kelunik, on naming.

scatterGather reads as enterprise service bus to me. Accurate to the pattern, but it sets the wrong expectation for a small concurrent-execute helper.

My vote is runConcurrently. Says what it does, no metaphor. Amp\runConcurrently([...]) reads fine at the call site. If the length grates, concurrent as a shorter alternative.

@rela589n
Copy link
Copy Markdown
Author

rela589n commented May 5, 2026

IMO concurrent is better compared to runConcurrently

$results = \Amp\concurrent([
    fn () => $httpClient->request(new Request('https://www.google.com', 'HEAD')),
    fn () => $httpClient->request(new Request('https://www.bing.com', 'HEAD')),
]);

@rela589n
Copy link
Copy Markdown
Author

rela589n commented May 8, 2026

Another option I think of is reduce with the idea that we accept the array of nullary closures, and evaluate them:

$results = \Amp\reduce([
    fn () => $httpClient->request(new Request('https://www.google.com', 'HEAD')),
    fn () => $httpClient->request(new Request('https://www.bing.com', 'HEAD')),
]);

Or apply:

$results = \Amp\apply([
    fn () => $httpClient->request(new Request('https://www.google.com', 'HEAD')),
    fn () => $httpClient->request(new Request('https://www.bing.com', 'HEAD')),
]);

Or, talking about evaluation, I would even think about collapse (or something like this):

$results = \Amp\collapse([
    fn () => $httpClient->request(new Request('https://www.google.com', 'HEAD')),
    fn () => $httpClient->request(new Request('https://www.bing.com', 'HEAD')),
]);

In such a case, the concurrency is the internal detail. We know that Amp is all about concurrency in the first place, and collapse would mean to collapse (or splash, squash) concurrency.

@xpader
Copy link
Copy Markdown
Contributor

xpader commented May 8, 2026

Why are we only talk about name? I still don't get why just not useawait([Futures..]), what is your use scenario?

@webpatser
Copy link
Copy Markdown

@xpader The use case is when you start from closures rather than Futures. Cache warmup, fan-out HTTP heads, batch job dispatch. Today that's:

Future\await(array_map(async(...), $closures))

The helper collapses the array_map(async(...)) step. That's the whole delta.

For the record I'm not sold it earns a core slot either, said as much earlier in the thread. But the "why not just await([Futures..])" framing misses that you don't have Futures yet, you have callables.

@xpader
Copy link
Copy Markdown
Contributor

xpader commented May 11, 2026

@webpatser I always thought there were more complicated scenarios that I couldn’t understand. So it's just turn closures into Futures and await them? Just because don't want write that code like array_map()? It's mean name like awaitClosures??

@rela589n
Copy link
Copy Markdown
Author

@xpader , do you want me to close it up and shut down?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

4 participants