iDempiere + Kafkaメモ

Kafka ConsumerをiDempiereのプロセスとして実装(に挑戦)。

Kafkaの非同期通信(メッセージキュー)のデータ取得側(Consumer)を
iDempiereプロセスの中に書いてみた。
Kafkaのサンプルコードをコピーしただけなので、何の役にも立たないけど・・・

参考URL:
[http://wiki.idempiere.org/en/Developing_Plug-Ins-Process]

Developing plug-ins without affecting the trunk - iDempiere en

iDempiereプラグイン新規作成

Eclipseのメニュー > File > New > Other... > Plug-in Project

Project name:
jp.adempiere.testkafka

↓選択
an OSGi framework: Equinox

Generate an activator...
のチェックを外す。

Create plug-in using one of the templates
のチェックを外す。

Finish
でとりあえずテストプラグインの元を作成完了。

iDempiereプロセス作成

New Package:jp.adempiere.testkafka.process
Name:MyProcess

package jp.adempiere.testkafka.process;

import org.compiere.process.SvrProcess;

public class MyProcess extends SvrProcess{
    
    @Override
    protected void prepare() {
        
    }

    @Override
    protected String doIt() throws Exception {
        return null;
    }
}

プラグインの依存関係を登録?

Package Explorerで、
MANIFEST.MF
をクリック。
Extensionsをクリック。
Add

Show only extension points from the required plug-ins
のチェックを外す。

Extension Detailsで、
ID, Nameに
jp.adempiere.testkafka.process.MyProcess

Extensionsタブの、
org.adempiere.base.Process
を右クリック、new > process
class*:
の右にあるBrowse...ボタンを押す。
Select entries:
を消すと、MyProcess が表示されるので選択。
Ctrl + Sで保存。

iDempiereで登録

[http://wiki.idempiere.org/en/Developing_Plug-Ins-Process#Using_Extension_Points:title]

iDempiereを立ち上げ。
SuperUserでログイン。
Open the Report & Process window and create a new entry.

Open System Admin > General Rules > System Rules > Menu
Window
create a new entry.

エラーその1

2 28, 2016 7:16:00 午後 org.adempiere.base.DefaultProcessFactory newProcessInstance
警告: jp.adempiere.process.MyProcess
java.lang.ClassNotFoundException: jp.adempiere.process.MyProcess cannot be found by org.adempiere.base_3.1.0.qualifier
    at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:432)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:345)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:337)
    at org.eclipse.osgi.internal.loader.ModuleClassLoader.loadClass(ModuleClassLoader.java:160)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at org.adempiere.base.DefaultProcessFactory.newProcessInstance(DefaultProcessFactory.java:64)
    at org.adempiere.base.Core.getProcess(Core.java:127)
    at org.adempiere.util.ProcessUtil.startJavaProcess(ProcessUtil.java:159)
    at org.compiere.apps.AbstractProcessCtl.startProcess(AbstractProcessCtl.java:466)
    at org.compiere.apps.AbstractProcessCtl.run(AbstractProcessCtl.java:234)
    at org.adempiere.webui.apps.WProcessCtl.process(WProcessCtl.java:197)
    at org.adempiere.webui.apps.AbstractProcessDialog$ProcessDialogRunnable.doRun(AbstractProcessDialog.java:1054)
    at org.adempiere.util.ContextRunnable.run(ContextRunnable.java:38)
    at org.adempiere.webui.apps.DesktopRunnable.run(DesktopRunnable.java:40)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

package名を間違えていただけだった。
jp.adempiere.process.MyProcess
じゃなくて、 jp.adempiere.testkafka.process.MyProcess
で作っていた。

Activator設定

[http://wiki.idempiere.org/en/Developing_Plug-Ins-Get_your_Plug-In_running:title]

MANIFEST.MF
をクリック。
Depengenciesタブ
org.adempiere.plugin.utils
追加

Overviewタブ
org.adempiere.plugin.utils.
まで入力すると AdempiereActivator
が出てくる。
選択して保存。

EclipseのRun ConfiguratoinsのPlug-insタブで、
jp.adempiere.testkafka
を登録。

外部JAR

Maven - OSGiにおける依存関係の解決方法 - taketoncheir.log

C:\kafka\kafka_2.11-0.9.0.0\libs\jackson-databind-2.5.4.jar

外部JARメモ(Buckminster?)

org.adempiere.base
で使っている、bsh-engine.jarを全文検索したら、

org.adempiere.base/copyjars.xml
idempiere\migration\reduce_repository\exclude_file_map.txt

などに書かれていた。
copyjars.xmlという文字列は、
org.adempiere.base/buckminster.cspex
の中にあった。

エラーその2

重大: com/fasterxml/jackson/annotation/JsonAutoDetect
java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonAutoDetect cannot be found by jp.adempiere.testkafka_1.0.0.qualifier
    at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:432)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:345)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:337)
    at org.eclipse.osgi.internal.loader.ModuleClassLoader.loadClass(ModuleClassLoader.java:160)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at com.fasterxml.jackson.databind.introspect.VisibilityChecker$Std.<clinit>(VisibilityChecker.java:170)
    at com.fasterxml.jackson.databind.ObjectMapper.<clinit>(ObjectMapper.java:206)
    at jp.adempiere.testkafka.process.MyProcess.doIt(MyProcess.java:29)
    at org.compiere.process.SvrProcess.process(SvrProcess.java:198)
    at org.compiere.process.SvrProcess.startProcess(SvrProcess.java:144)
    at org.adempiere.util.ProcessUtil.startJavaProcess(ProcessUtil.java:172)
    at org.compiere.apps.AbstractProcessCtl.startProcess(AbstractProcessCtl.java:466)
    at org.compiere.apps.AbstractProcessCtl.run(AbstractProcessCtl.java:234)
    at org.adempiere.webui.apps.WProcessCtl.process(WProcessCtl.java:197)
    at org.adempiere.webui.apps.AbstractProcessDialog$ProcessDialogRunnable.doRun(AbstractProcessDialog.java:1054)
    at org.adempiere.util.ContextRunnable.run(ContextRunnable.java:38)
    at org.adempiere.webui.apps.DesktopRunnable.run(DesktopRunnable.java:40)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Build Pathにjackson-annotations-2.5.0.jarを足して、
MANIFEST.MFのRuntimeでClasspathにjackson-annotations-2.5.0.jarを入力したらエラーが変わった。

エラーその3

重大: resource consumer.props not found.
java.lang.IllegalArgumentException: resource consumer.props not found.
    at com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
    at com.google.common.io.Resources.getResource(Resources.java:197)
    at jp.adempiere.testkafka.process.MyProcess.doIt(MyProcess.java:35)
    at org.compiere.process.SvrProcess.process(SvrProcess.java:198)
    at org.compiere.process.SvrProcess.startProcess(SvrProcess.java:144)
    at org.adempiere.util.ProcessUtil.startJavaProcess(ProcessUtil.java:172)
    at org.compiere.apps.AbstractProcessCtl.startProcess(AbstractProcessCtl.java:466)
    at org.compiere.apps.AbstractProcessCtl.run(AbstractProcessCtl.java:234)
    at org.adempiere.webui.apps.WProcessCtl.process(WProcessCtl.java:197)
    at org.adempiere.webui.apps.AbstractProcessDialog$ProcessDialogRunnable.doRun(AbstractProcessDialog.java:1054)
    at org.adempiere.util.ContextRunnable.run(ContextRunnable.java:38)
    at org.adempiere.webui.apps.DesktopRunnable.run(DesktopRunnable.java:40)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

src/resources/consumer.props
を作って、 MANIFEST.MFのRuntimeでClasspathにresourcesをAdd...したらエラーが変わった。

エラーその4

org/slf4j/LoggerFactory
java.lang.ClassNotFoundException: org.slf4j.LoggerFactory cannot be found by jp.adempiere.testkafka_1.0.0.qualifier

jp.adempiere.testkafka\slf4j-api-1.7.6.jar
を足した?

Kafka MyProcess.java

Kafkaサンプル(www.mapr.com)が動かなくなった時のメモ - kubotti’s memo で試したKafkaサンプルコードをコピーしてiDempiere Processにする。

package jp.adempiere.testkafka.process;

import org.compiere.process.SvrProcess;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import org.HdrHistogram.Histogram;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Properties;
import java.util.Random;

public class MyProcess extends SvrProcess{
    
    @Override
    protected void prepare() {
        
    }

    @Override
    protected String doIt() throws Exception {
        // set up house-keeping
        ObjectMapper mapper = new ObjectMapper();
        Histogram stats = new Histogram(1, 10000000, 2);
        Histogram global = new Histogram(1, 10000000, 2);

        // and the consumer
        KafkaConsumer<String, String> consumer;
        try (InputStream props = Resources.getResource("consumer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            if (properties.getProperty("group.id") == null) {
                properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
            }
            consumer = new KafkaConsumer<>(properties);
        }
        //consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));
        consumer.subscribe(Arrays.asList("fast-messages", "summary-markers", "test01-messages"));
        
        int timeouts = 0;
        //noinspection InfiniteLoopStatement
        while (timeouts < 50) {
            // read records with a short timeout. If we time out, we don't really care.
            ConsumerRecords<String, String> records = consumer.poll(200);
            if (records.count() == 0) {
                timeouts++;
            } else {
                System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts);
                timeouts = 0;
            }
            for (ConsumerRecord<String, String> record : records) {
                switch (record.topic()) {
                    case "fast-messages":
                        // the send time is encoded inside the message
                        JsonNode msg = mapper.readTree(record.value());
                        switch (msg.get("type").asText()) {
                            case "test":
                                long latency = (long) ((System.nanoTime() * 1e-9 - msg.get("t").asDouble()) * 1000);
                                stats.recordValue(latency);
                                global.recordValue(latency);
                                break;
                            case "marker":
                                // whenever we get a marker message, we should dump out the stats
                                // note that the number of fast messages won't necessarily be quite constant
                                System.out.printf("%d messages received in period, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
                                        stats.getTotalCount(),
                                        stats.getValueAtPercentile(0), stats.getValueAtPercentile(100),
                                        stats.getMean(), stats.getValueAtPercentile(99));
                                System.out.printf("%d messages received overall, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
                                        global.getTotalCount(),
                                        global.getValueAtPercentile(0), global.getValueAtPercentile(100),
                                        global.getMean(), global.getValueAtPercentile(99));

                                stats.reset();
                                break;
                            default:
                                throw new IllegalArgumentException("Illegal message type: " + msg.get("type"));
                        }
                        break;
                    case "summary-markers":
                        break;
                    case "test01-messages":
                        System.out.printf("kubotti \n");
                        //System.exit(0);
                        break;
                    default:
                        throw new IllegalStateException("Shouldn't be possible to get message on topic " + record.topic());
                }
            }
        }
        System.out.println("kafka consumer processed.");
        return "processed";
    }
}

試す

VirtualBoxのゲストOS
CentOS6 (IP:192.168.5.200)
でZooKeeperとKafkaを起動

ホストOS
Windows10(IP:192.168.5.1)
で、iDempiere用のEclipse
MapR Sample Apache Kafka用Eclipseを起動。

Kafkaサンプル(www.mapr.com)が動かなくなった時のメモ - kubotti’s memo
の記事の通り、
Kafka ProducerをEclipseから実行。

EclipseからiDempiereウェブサーバーを立ち上げて、作ったiDempiereのKafka Consumerプロセスを実行。

EclipseのConsoleウィンドウに、Consumerプロセスの出力が表示された。

osgi> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
kafka consumer processed.
Got 5010 records after 0 timeouts
1 messages received in period, latency(min, max, avg, 99%) = 35840, 36095, 35968.0, 36095 (ms)
1 messages received overall, latency(min, max, avg, 99%) = 35840, 36095, 35968.0, 36095 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 35584, 36095, 35831.8, 36095 (ms)
1001 messages received overall, latency(min, max, avg, 99%) = 35584, 36095, 35831.9, 36095 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 35584, 35839, 35712.0, 35839 (ms)
2001 messages received overall, latency(min, max, avg, 99%) = 35584, 36095, 35772.0, 36095 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 35584, 35839, 35712.0, 35839 (ms)
3001 messages received overall, latency(min, max, avg, 99%) = 35584, 36095, 35752.0, 36095 (ms)
1000 messages received in period, latency(min, max, avg, 99%) = 35584, 35839, 35712.0, 35839 (ms)
4001 messages received overall, latency(min, max, avg, 99%) = 35584, 36095, 35742.0, 36095 (ms)
kafka consumer processed.

とりあえず目標達成。