Skip to content

SPARK-2201 Improve FlumeInputDStream's stability and make it scalable#1310

Closed
joyyoj wants to merge 3 commits into
apache:masterfrom
joyyoj:master
Closed

SPARK-2201 Improve FlumeInputDStream's stability and make it scalable#1310
joyyoj wants to merge 3 commits into
apache:masterfrom
joyyoj:master

Conversation

@joyyoj

@joyyoj joyyoj commented Jul 6, 2014

Copy link
Copy Markdown
Contributor

No description provided.

@AmplabJenkins

Copy link
Copy Markdown

Can one of the admins verify this patch?

@tdas

tdas commented Jul 30, 2014

Copy link
Copy Markdown
Contributor

Hey @joyyoj
This is a very interesting patch, and can be very useful! But its a little hard to understand the architecture from the code. Could you provide us with a simple design doc that explains whats the architecture, and how each class and architectural components are related?

PS: Apologies for the not having commented on this earlier. Fell through the cracks I guess.

@joyyoj

joyyoj commented Jul 31, 2014

Copy link
Copy Markdown
Contributor Author

@tdas, Thanks for noticing the PR. I’m pleased to share my design idea. I'll update it this weekend.

@tdas

tdas commented Jul 31, 2014

Copy link
Copy Markdown
Contributor

@harishreedharan Can you take a look? This looks really interesting for Flume.

@harishreedharan

Copy link
Copy Markdown
Contributor

Hmm, I don't see any code. Shows +0, -0 lines. Something went wrong in the last merge?

@tdas

tdas commented Jul 31, 2014

Copy link
Copy Markdown
Contributor

@joyyoj Something went wrong in your last merge. Its an empty patch now!

@joyyoj

joyyoj commented Aug 1, 2014

Copy link
Copy Markdown
Contributor Author

Sorry, I'll soon send a PR.
The problem of the original implementation is that the config(host:port) is static and allows only one host:port. Once host or port changed, the flume agent should be restarted to reload the conf.
To solve it, one solution is to set a virtual address instead of a real address in the flume conf. Meanwhile, a address router was introduced that can tell us all the real addresses are bound to a virtual address and notify such events that a real address is added to or removed from the virtual address.
I found the router can be easily implemented by the zookeeper. In such scenario:

  1. A spark receiver selects a free port and creates a tmp node with the path /path/to/logicalhost/host:port to zookeeper when started.
    If three receivers started, three nodes (host1:port1, host2:port2, host3:port3) will be created under /path/to/logicalhost;
  2. On the side of flume agent, the flume sink gets the children nodes (host1:port1, host2:port2, host3:port3) from /path/to/logicalhost and buffers them into a ClientPool.
    When append called, it selects a client from ClientPool in a round-robin manner and call client.append to send events.
  3. If any receiver crashed/started, the tmp zk node will be removed/added, and then ClientPool will remove/add the client from the buffer since it watched those zk children events.
    In my implementation:
    LogicalHostRouter is the implementation of the address router. You know, the spark or flume should not know the existence of zk.
    The ZkProxy is an encapsulation of the zk curator client.

@harishreedharan

Copy link
Copy Markdown
Contributor

@joyyoj Thanks for the explanation. This makes quite a lot of sense. I recently added a new Dstream + an associated Flume sink to fix the issue of receivers being hard-coded on the Flume config. Basically solves the same issue, by telling the Spark receiver where the Flume agents are running. So even if the executors die, they can come back and simply poll the same Flume agents for data. In my experience, the hosts on which the agents are running rarely change - so this solution works nicely. PR #807 - let me know what you think.

@joyyoj

joyyoj commented Aug 3, 2014

Copy link
Copy Markdown
Contributor Author

@harishreedharan The time I am confronted with this problem, PR #807 is not merged to trunk. I think PR #807 is another solution to solve the same problem and quiet good.
I think it is still valuable to solve it by introducing a host router level since this problem seems a common issue, and a host router level can be reused.
PR #1755 resubmitted
How do you think of it?

@joyyoj

joyyoj commented Aug 3, 2014

Copy link
Copy Markdown
Contributor Author

To PR #807, if some flume agent crashed and restarted from another host, spark should be restarted to reload conf ?

@harishreedharan

Copy link
Copy Markdown
Contributor

@joyyoj I will take a look at it in the next couple days. As far as #807 is concerned - yes, if the flume agent's location changes, the config needs to change. In my experience (I work for a company that has a large number of Flume customers), Flume agents are usually deployed on specific nodes and if they crash - they are restarted on the same node - since Flume has no concept of workers (every agent is a worker), so that was not a concern in my design.

The ZK-based config seems interesting. I will take a look at it soon. Thanks!

@SparkQA

SparkQA commented Sep 5, 2014

Copy link
Copy Markdown

Can one of the admins verify this patch?

@harishreedharan

Copy link
Copy Markdown
Contributor

I still don't see any code. Did a merge fail somewhere?

@JoshRosen

Copy link
Copy Markdown
Contributor

Hi @joyyoj,

Since this pull request doesn't show any code / changes, do you mind closing it? Feel free to update / re-open if you have code that you'd like us to review. Thanks!

@AmplabJenkins

Copy link
Copy Markdown

Can one of the admins verify this patch?

@pwendell

Copy link
Copy Markdown
Contributor

Let's close this issue

@asfgit asfgit closed this in 047ff57 Nov 29, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants