Apache Kafkaのサンプルを試す
まずは↓の記事を読んだ。
http://qiita.com/shimashima/items/06ccf4859620d2440267
/home/idempiere/kafka_2.11-0.9.0.0
homeディレクトリにkafka_2.11-0.9.0.0を解凍して、以下のコマンドを実行。
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties ./bin/kafka-server-start.sh -daemon config/server.properties
kafkaのログを見たかったので、2行目の-daemonを外した。
./bin/kafka-server-start.sh config/server.properties
pip kafkaでpythonのkafkaモジュールをインストール。
記事にあった、Pythonのkafkaクライアントをを試したけど、Windows10だとうまく動かなかった。
import kafka #kafka_client = kafka.SimpleClient('localhost:9092') kafka_client = kafka.SimpleClient('192.168.5.200:9092')
Traceback (most recent call last): File "C:\Python2710\lib\site-packages\IPython\core\interactiveshell.py", line 3066, in run_code exec(code_obj, self.user_global_ns, self.user_ns) File "<ipython-input-4-558e71361105>", line 1, in <module> kafka_client = kafka.SimpleClient('localhost:9092') File "C:\Python2710\lib\site-packages\kafka\client.py", line 53, in __init__ self.load_metadata_for_topics() # bootstrap with all metadata File "C:\Python2710\lib\site-packages\kafka\client.py", line 492, in load_metadata_for_topics resp = self.send_metadata_request(topics) File "C:\Python2710\lib\site-packages\kafka\client.py", line 554, in send_metadata_request return self._send_broker_unaware_request(payloads, encoder, decoder) File "C:\Python2710\lib\site-packages\kafka\client.py", line 165, in _send_broker_unaware_request raise KafkaUnavailableError('All servers failed to process request: %s' % hosts) KafkaUnavailableError: All servers failed to process request: [('localhost', 9092)]
python client
PyCharmのデバッグモードで動かして、ちょっとPythonのkafkaモジュールを書き換えてみたけど、動かない。
Windowsだと動かないのかもしれない。Javaのkafka Producer Consumerサンプルコードを試すことにした。
python - non-blocking socket,error is always - Stack Overflow
EWOULDBLOCK
conn.py 86行目 ret 10035
Index: ../../../../Python2710/Lib/site-packages/kafka/conn.py IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../../../../Python2710/Lib/site-packages/kafka/conn.py (revision ) +++ ../../../../Python2710/Lib/site-packages/kafka/conn.py (revision ) @@ -83,12 +83,15 @@ self.config['send_buffer_bytes']) self._sock.setblocking(False) try: + while True: - ret = self._sock.connect_ex((self.host, self.port)) + ret = self._sock.connect_ex((self.host, self.port)) + if ret != errno.EWOULDBLOCK: + break except socket.error as ret: pass self.last_attempt = time.time() - if not ret or ret is errno.EISCONN: + if not ret or ret is errno.EISCONN or ret is errno.WSAEISCONN: self.state = ConnectionStates.CONNECTED elif ret in (errno.EINPROGRESS, errno.EALREADY): self.state = ConnectionStates.CONNECTING
kafkaログファイル関連エラー
kafkaサーバーをWindowsで動かすにはcygwinとかが必要らしい。
めんどくさいからWindowsで動かすのはやめた。
java.io.FileNotFoundException: \server.log (アクセスが拒否されました。) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.<init>(FileOutputStream.java:221) at java.io.FileOutputStream.<init>(FileOutputStream.java:142) at org.apache.log4j.FileAppender.setFile(FileAppender.java:294) at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165) at org.apache.log4j.DailyRollingFileAppender.activateOptions(DailyRollingFileAppender.java:223) at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
java.io.FileNotFoundException: \controller.log (アクセスが拒否されました。) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(Unknown Source) [2016-02-21 21:08:27,693] INFO Got user-level KeeperException when processing sessionid:0x15303b923ef0000 type:create cxid:0xe0 zxid:0x27 txntype:-1 reqpath:n/a Error Path:/brokers/topics/summary-markers/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/summary-markers/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor) [2016-02-21 21:08:27,696] INFO Got user-level KeeperException when processing sessionid:0x15303b923ef0000 type:create cxid:0xe1 zxid:0x28 txntype:-1 reqpath:n/a Error Path:/brokers/topics/summary-markers/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/summary-markers/partitions (org.apache.zookeeper.server.PrepRequestProcessor)
Javaのkafka Producer Consumerサンプルコード
この記事のコードを試すことにした。
Getting Started with Sample Programs for Apache Kafka 0.9 | MapR
git clone https://github.com/mapr-demos/kafka-sample-programs.git
Eclipseのworkspaceにコピーしてmavenコマンドを実行。
mavenメモ1 - kubotti’s memo
VirtualBoxで動かしているCentOS6上のkafkaに対してメッセージのやり取りができたっぽい。
kafka-sample-programs/src/main/resources/consumer.props
kafka-sample-programs/src/main/resources/producer.props
の
bootstrap.servers=localhost:9092
を
bootstrap.servers=192.168.5.200:9092
に書き換え。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Sent msg number 0 Sent msg number 1000 Sent msg number 2000 Sent msg number 3000 Sent msg number 4000 Sent msg number 5000 Sent msg number 6000 Sent msg number 7000 ... Sent msg number 992000 Sent msg number 993000 Sent msg number 994000 Sent msg number 995000 Sent msg number 996000 Sent msg number 997000 Sent msg number 998000 Sent msg number 999000
1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms) 1822238 messages received overall, latency(min, max, avg, 99%) = 38400, 82431, 59220.9, 82431 (ms) 1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms) 1823238 messages received overall, latency(min, max, avg, 99%) = 38400, 82431, 59209.6, 82431 (ms) Got 27956 records after 0 timeouts 1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms) 1824238 messages received overall, latency(min, max, avg, 99%) = 38400, 82431, 59198.2, 82431 (ms) 1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms) ... 1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms) 1849238 messages received overall, latency(min, max, avg, 99%) = 38400, 82431, 58918.8, 82431 (ms) 1000 messages received in period, latency(min, max, avg, 99%) = 38144, 38655, 38445.1, 38655 (ms) 1850238 messages received overall, latency(min, max, avg, 99%) = 38144, 82431, 58907.7, 82431 (ms)