Skip to content
This repository was archived by the owner on Jan 26, 2022. It is now read-only.

Commit c960ca8

Browse files
authored
Merge pull request #12 from jipengzhu/master
update code for kafka-python 1.3.4
2 parents 5d31120 + ffb4817 commit c960ca8

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

stormkafkamon/processor.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def emit(self, record):
1414
import struct
1515
import socket
1616
from collections import namedtuple
17-
from kafka.client import KafkaClient
17+
from kafka.client import SimpleClient
1818
from kafka.common import OffsetRequestPayload
1919

2020

@@ -62,15 +62,15 @@ def process(spouts):
6262
for s in spouts:
6363
for p in s.partitions:
6464
try:
65-
k = KafkaClient(p['broker']['host'], p['broker']['port'])
65+
k = SimpleClient([p['broker']['host'] + ":" + str(p['broker']['port'])])
6666
except socket.gaierror as e:
67-
raise ProcessorError('Failed to contact Kafka broker %s (%s)' %
68-
(p['broker']['host'], str(e)))
69-
earliest_off = OffsetRequest(p['topic'], p['partition'], -2, 1)
70-
latest_off = OffsetRequest(p['topic'], p['partition'], -1, 1)
67+
raise ProcessorError('Failed to contact Kafka broker %s (%s)' % (p['broker']['host'], str(e)))
7168

72-
earliest = k.get_offsets(earliest_off)[0]
73-
latest = k.get_offsets(latest_off)[0]
69+
earliest_off = OffsetRequestPayload(p['topic'], p['partition'], -2, 1)
70+
latest_off = OffsetRequestPayload(p['topic'], p['partition'], -1, 1)
71+
72+
earliest = k.send_offset_request([earliest_off])[0].offsets[0]
73+
latest = k.send_offset_request([latest_off])[0].offsets[0]
7474
current = p['offset']
7575

7676
brokers.append(p['broker']['host'])

0 commit comments

Comments
 (0)