Apache Kafkaのサンプルを試す

まずは↓の記事を読んだ。
http://qiita.com/shimashima/items/06ccf4859620d2440267

/home/idempiere/kafka_2.11-0.9.0.0
homeディレクトリにkafka_2.11-0.9.0.0を解凍して、以下のコマンドを実行。

./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties  
./bin/kafka-server-start.sh -daemon config/server.properties  

kafkaのログを見たかったので、2行目の-daemonを外した。
./bin/kafka-server-start.sh config/server.properties

pip kafkaでpythonのkafkaモジュールをインストール。
記事にあった、Pythonのkafkaクライアントをを試したけど、Windows10だとうまく動かなかった。

import kafka

#kafka_client = kafka.SimpleClient('localhost:9092')  
kafka_client = kafka.SimpleClient('192.168.5.200:9092')
Traceback (most recent call last):
  File "C:\Python2710\lib\site-packages\IPython\core\interactiveshell.py", line 3066, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-4-558e71361105>", line 1, in <module>
    kafka_client = kafka.SimpleClient('localhost:9092')
  File "C:\Python2710\lib\site-packages\kafka\client.py", line 53, in __init__
    self.load_metadata_for_topics()  # bootstrap with all metadata
  File "C:\Python2710\lib\site-packages\kafka\client.py", line 492, in load_metadata_for_topics
    resp = self.send_metadata_request(topics)
  File "C:\Python2710\lib\site-packages\kafka\client.py", line 554, in send_metadata_request
    return self._send_broker_unaware_request(payloads, encoder, decoder)
  File "C:\Python2710\lib\site-packages\kafka\client.py", line 165, in _send_broker_unaware_request
    raise KafkaUnavailableError('All servers failed to process request: %s' % hosts)
KafkaUnavailableError: All servers failed to process request: [('localhost', 9092)]

python client

PyCharmのデバッグモードで動かして、ちょっとPythonのkafkaモジュールを書き換えてみたけど、動かない。
Windowsだと動かないのかもしれない。Javaのkafka Producer Consumerサンプルコードを試すことにした。

python - non-blocking socket,error is always - Stack Overflow

EWOULDBLOCK

conn.py 86行目 ret 10035

Index: ../../../../Python2710/Lib/site-packages/kafka/conn.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- ../../../../Python2710/Lib/site-packages/kafka/conn.py  (revision )
+++ ../../../../Python2710/Lib/site-packages/kafka/conn.py  (revision )
@@ -83,12 +83,15 @@
                                   self.config['send_buffer_bytes'])
             self._sock.setblocking(False)
             try:
+                while True:
-                ret = self._sock.connect_ex((self.host, self.port))
+                    ret = self._sock.connect_ex((self.host, self.port))
+                    if ret != errno.EWOULDBLOCK:
+                        break
             except socket.error as ret:
                 pass
             self.last_attempt = time.time()
 
-            if not ret or ret is errno.EISCONN:
+            if not ret or ret is errno.EISCONN or ret is errno.WSAEISCONN:
                 self.state = ConnectionStates.CONNECTED
             elif ret in (errno.EINPROGRESS, errno.EALREADY):
                 self.state = ConnectionStates.CONNECTING

kafkaログファイル関連エラー

kafkaサーバーをWindowsで動かすにはcygwinとかが必要らしい。
めんどくさいからWindowsで動かすのはやめた。

java.io.FileNotFoundException: \server.log (アクセスが拒否されました。)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:142)
        at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
        at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
        at org.apache.log4j.DailyRollingFileAppender.activateOptions(DailyRollingFileAppender.java:223)
        at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
java.io.FileNotFoundException: \controller.log (アクセスが拒否されました。)
        at java.io.FileOutputStream.open0(Native Method)
        at java.io.FileOutputStream.open(Unknown Source)


[2016-02-21 21:08:27,693] INFO Got user-level KeeperException when processing sessionid:0x15303b923ef0000 type:create cxid:0xe0 zxid:0x27 txntype:-1 reqpath:n/a Error Path:/brokers/topics/summary-markers/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/summary-markers/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-02-21 21:08:27,696] INFO Got user-level KeeperException when processing sessionid:0x15303b923ef0000 type:create cxid:0xe1 zxid:0x28 txntype:-1 reqpath:n/a Error Path:/brokers/topics/summary-markers/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/summary-markers/partitions (org.apache.zookeeper.server.PrepRequestProcessor)

Javaのkafka Producer Consumerサンプルコード

この記事のコードを試すことにした。
Getting Started with Sample Programs for Apache Kafka 0.9 | MapR

git clone https://github.com/mapr-demos/kafka-sample-programs.git

Eclipseのworkspaceにコピーしてmavenコマンドを実行。
mavenメモ1 - kubotti’s memo

VirtualBoxで動かしているCentOS6上のkafkaに対してメッセージのやり取りができたっぽい。

kafka-sample-programs/src/main/resources/consumer.props
kafka-sample-programs/src/main/resources/producer.props

bootstrap.servers=localhost:9092

bootstrap.servers=192.168.5.200:9092
に書き換え。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Sent msg number 0
Sent msg number 1000
Sent msg number 2000
Sent msg number 3000
Sent msg number 4000
Sent msg number 5000
Sent msg number 6000
Sent msg number 7000

...

Sent msg number 992000
Sent msg number 993000
Sent msg number 994000
Sent msg number 995000
Sent msg number 996000
Sent msg number 997000
Sent msg number 998000
Sent msg number 999000
1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms)
1822238 messages received overall, latency(min, max, avg, 99%) = 38400, 82431, 59220.9, 82431 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms)
1823238 messages received overall, latency(min, max, avg, 99%) = 38400, 82431, 59209.6, 82431 (ms)
Got 27956 records after 0 timeouts
1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms)
1824238 messages received overall, latency(min, max, avg, 99%) = 38400, 82431, 59198.2, 82431 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms)

...

1000 messages received in period, latency(min, max, avg, 99%) = 38400, 38655, 38528.0, 38655 (ms)
1849238 messages received overall, latency(min, max, avg, 99%) = 38400, 82431, 58918.8, 82431 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 38144, 38655, 38445.1, 38655 (ms)
1850238 messages received overall, latency(min, max, avg, 99%) = 38144, 82431, 58907.7, 82431 (ms)