iDempiere + Kafkaメモ
Kafka ConsumerをiDempiereのプロセスとして実装(に挑戦)。
Kafkaの非同期通信(メッセージキュー)のデータ取得側(Consumer)を
iDempiereプロセスの中に書いてみた。
Kafkaのサンプルコードをコピーしただけなので、何の役にも立たないけど・・・
参考URL:
[http://wiki.idempiere.org/en/Developing_Plug-Ins-Process]
Developing plug-ins without affecting the trunk - iDempiere en
iDempiereプラグイン新規作成
Eclipseのメニュー > File > New > Other... > Plug-in Project
Project name:
jp.adempiere.testkafka
↓選択
an OSGi framework: Equinox
Generate an activator...
のチェックを外す。
Create plug-in using one of the templates
のチェックを外す。
Finish
でとりあえずテストプラグインの元を作成完了。
iDempiereプロセス作成
New
Package:jp.adempiere.testkafka.process
Name:MyProcess
package jp.adempiere.testkafka.process; import org.compiere.process.SvrProcess; public class MyProcess extends SvrProcess{ @Override protected void prepare() { } @Override protected String doIt() throws Exception { return null; } }
プラグインの依存関係を登録?
Package Explorerで、
MANIFEST.MF
をクリック。
Extensionsをクリック。
Add
Show only extension points from the required plug-ins
のチェックを外す。
Extension Detailsで、
ID, Nameに
jp.adempiere.testkafka.process.MyProcess
Extensionsタブの、
org.adempiere.base.Process
を右クリック、new > process
class*:
の右にあるBrowse...ボタンを押す。
Select entries:
を消すと、MyProcess が表示されるので選択。
Ctrl + Sで保存。
iDempiereで登録
[http://wiki.idempiere.org/en/Developing_Plug-Ins-Process#Using_Extension_Points:title]
iDempiereを立ち上げ。
SuperUserでログイン。
Open the Report & Process window and create a new entry.
Open System Admin > General Rules > System Rules > Menu
Window
create a new entry.
エラーその1
2 28, 2016 7:16:00 午後 org.adempiere.base.DefaultProcessFactory newProcessInstance 警告: jp.adempiere.process.MyProcess java.lang.ClassNotFoundException: jp.adempiere.process.MyProcess cannot be found by org.adempiere.base_3.1.0.qualifier at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:432) at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:345) at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:337) at org.eclipse.osgi.internal.loader.ModuleClassLoader.loadClass(ModuleClassLoader.java:160) at java.lang.ClassLoader.loadClass(Unknown Source) at org.adempiere.base.DefaultProcessFactory.newProcessInstance(DefaultProcessFactory.java:64) at org.adempiere.base.Core.getProcess(Core.java:127) at org.adempiere.util.ProcessUtil.startJavaProcess(ProcessUtil.java:159) at org.compiere.apps.AbstractProcessCtl.startProcess(AbstractProcessCtl.java:466) at org.compiere.apps.AbstractProcessCtl.run(AbstractProcessCtl.java:234) at org.adempiere.webui.apps.WProcessCtl.process(WProcessCtl.java:197) at org.adempiere.webui.apps.AbstractProcessDialog$ProcessDialogRunnable.doRun(AbstractProcessDialog.java:1054) at org.adempiere.util.ContextRunnable.run(ContextRunnable.java:38) at org.adempiere.webui.apps.DesktopRunnable.run(DesktopRunnable.java:40) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
package名を間違えていただけだった。
jp.adempiere.process.MyProcess
じゃなくて、
jp.adempiere.testkafka.process.MyProcess
で作っていた。
Activator設定
[http://wiki.idempiere.org/en/Developing_Plug-Ins-Get_your_Plug-In_running:title]
MANIFEST.MF
をクリック。
Depengenciesタブ
org.adempiere.plugin.utils
追加
Overviewタブ
org.adempiere.plugin.utils.
まで入力すると
AdempiereActivator
が出てくる。
選択して保存。
EclipseのRun ConfiguratoinsのPlug-insタブで、
jp.adempiere.testkafka
を登録。
外部JAR
Maven - OSGiにおける依存関係の解決方法 - taketoncheir.log
C:\kafka\kafka_2.11-0.9.0.0\libs\jackson-databind-2.5.4.jar
外部JARメモ(Buckminster?)
org.adempiere.base
で使っている、bsh-engine.jarを全文検索したら、
org.adempiere.base/copyjars.xml idempiere\migration\reduce_repository\exclude_file_map.txt
などに書かれていた。
copyjars.xmlという文字列は、
org.adempiere.base/buckminster.cspex
の中にあった。
エラーその2
重大: com/fasterxml/jackson/annotation/JsonAutoDetect java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonAutoDetect cannot be found by jp.adempiere.testkafka_1.0.0.qualifier at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:432) at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:345) at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:337) at org.eclipse.osgi.internal.loader.ModuleClassLoader.loadClass(ModuleClassLoader.java:160) at java.lang.ClassLoader.loadClass(Unknown Source) at com.fasterxml.jackson.databind.introspect.VisibilityChecker$Std.<clinit>(VisibilityChecker.java:170) at com.fasterxml.jackson.databind.ObjectMapper.<clinit>(ObjectMapper.java:206) at jp.adempiere.testkafka.process.MyProcess.doIt(MyProcess.java:29) at org.compiere.process.SvrProcess.process(SvrProcess.java:198) at org.compiere.process.SvrProcess.startProcess(SvrProcess.java:144) at org.adempiere.util.ProcessUtil.startJavaProcess(ProcessUtil.java:172) at org.compiere.apps.AbstractProcessCtl.startProcess(AbstractProcessCtl.java:466) at org.compiere.apps.AbstractProcessCtl.run(AbstractProcessCtl.java:234) at org.adempiere.webui.apps.WProcessCtl.process(WProcessCtl.java:197) at org.adempiere.webui.apps.AbstractProcessDialog$ProcessDialogRunnable.doRun(AbstractProcessDialog.java:1054) at org.adempiere.util.ContextRunnable.run(ContextRunnable.java:38) at org.adempiere.webui.apps.DesktopRunnable.run(DesktopRunnable.java:40) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
Build Pathにjackson-annotations-2.5.0.jarを足して、
MANIFEST.MFのRuntimeでClasspathにjackson-annotations-2.5.0.jarを入力したらエラーが変わった。
エラーその3
重大: resource consumer.props not found. java.lang.IllegalArgumentException: resource consumer.props not found. at com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) at com.google.common.io.Resources.getResource(Resources.java:197) at jp.adempiere.testkafka.process.MyProcess.doIt(MyProcess.java:35) at org.compiere.process.SvrProcess.process(SvrProcess.java:198) at org.compiere.process.SvrProcess.startProcess(SvrProcess.java:144) at org.adempiere.util.ProcessUtil.startJavaProcess(ProcessUtil.java:172) at org.compiere.apps.AbstractProcessCtl.startProcess(AbstractProcessCtl.java:466) at org.compiere.apps.AbstractProcessCtl.run(AbstractProcessCtl.java:234) at org.adempiere.webui.apps.WProcessCtl.process(WProcessCtl.java:197) at org.adempiere.webui.apps.AbstractProcessDialog$ProcessDialogRunnable.doRun(AbstractProcessDialog.java:1054) at org.adempiere.util.ContextRunnable.run(ContextRunnable.java:38) at org.adempiere.webui.apps.DesktopRunnable.run(DesktopRunnable.java:40) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
src/resources/consumer.props
を作って、
MANIFEST.MFのRuntimeでClasspathにresourcesをAdd...したらエラーが変わった。
エラーその4
org/slf4j/LoggerFactory java.lang.ClassNotFoundException: org.slf4j.LoggerFactory cannot be found by jp.adempiere.testkafka_1.0.0.qualifier
jp.adempiere.testkafka\slf4j-api-1.7.6.jar
を足した?
Kafka MyProcess.java
Kafkaサンプル(www.mapr.com)が動かなくなった時のメモ - kubotti’s memo で試したKafkaサンプルコードをコピーしてiDempiere Processにする。
package jp.adempiere.testkafka.process; import org.compiere.process.SvrProcess; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.Resources; import org.HdrHistogram.Histogram; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.Properties; import java.util.Random; public class MyProcess extends SvrProcess{ @Override protected void prepare() { } @Override protected String doIt() throws Exception { // set up house-keeping ObjectMapper mapper = new ObjectMapper(); Histogram stats = new Histogram(1, 10000000, 2); Histogram global = new Histogram(1, 10000000, 2); // and the consumer KafkaConsumer<String, String> consumer; try (InputStream props = Resources.getResource("consumer.props").openStream()) { Properties properties = new Properties(); properties.load(props); if (properties.getProperty("group.id") == null) { properties.setProperty("group.id", "group-" + new Random().nextInt(100000)); } consumer = new KafkaConsumer<>(properties); } //consumer.subscribe(Arrays.asList("fast-messages", "summary-markers")); consumer.subscribe(Arrays.asList("fast-messages", "summary-markers", "test01-messages")); int timeouts = 0; //noinspection InfiniteLoopStatement while (timeouts < 50) { // read records with a short timeout. If we time out, we don't really care. ConsumerRecords<String, String> records = consumer.poll(200); if (records.count() == 0) { timeouts++; } else { System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts); timeouts = 0; } for (ConsumerRecord<String, String> record : records) { switch (record.topic()) { case "fast-messages": // the send time is encoded inside the message JsonNode msg = mapper.readTree(record.value()); switch (msg.get("type").asText()) { case "test": long latency = (long) ((System.nanoTime() * 1e-9 - msg.get("t").asDouble()) * 1000); stats.recordValue(latency); global.recordValue(latency); break; case "marker": // whenever we get a marker message, we should dump out the stats // note that the number of fast messages won't necessarily be quite constant System.out.printf("%d messages received in period, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n", stats.getTotalCount(), stats.getValueAtPercentile(0), stats.getValueAtPercentile(100), stats.getMean(), stats.getValueAtPercentile(99)); System.out.printf("%d messages received overall, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n", global.getTotalCount(), global.getValueAtPercentile(0), global.getValueAtPercentile(100), global.getMean(), global.getValueAtPercentile(99)); stats.reset(); break; default: throw new IllegalArgumentException("Illegal message type: " + msg.get("type")); } break; case "summary-markers": break; case "test01-messages": System.out.printf("kubotti \n"); //System.exit(0); break; default: throw new IllegalStateException("Shouldn't be possible to get message on topic " + record.topic()); } } } System.out.println("kafka consumer processed."); return "processed"; } }
試す
VirtualBoxのゲストOS
CentOS6 (IP:192.168.5.200)
でZooKeeperとKafkaを起動
ホストOS
Windows10(IP:192.168.5.1)
で、iDempiere用のEclipseと
MapR Sample Apache Kafka用Eclipseを起動。
Kafkaサンプル(www.mapr.com)が動かなくなった時のメモ - kubotti’s memo
の記事の通り、
Kafka ProducerをEclipseから実行。
EclipseからiDempiereウェブサーバーを立ち上げて、作ったiDempiereのKafka Consumerプロセスを実行。
EclipseのConsoleウィンドウに、Consumerプロセスの出力が表示された。
osgi> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. kafka consumer processed. Got 5010 records after 0 timeouts 1 messages received in period, latency(min, max, avg, 99%) = 35840, 36095, 35968.0, 36095 (ms) 1 messages received overall, latency(min, max, avg, 99%) = 35840, 36095, 35968.0, 36095 (ms) 1000 messages received in period, latency(min, max, avg, 99%) = 35584, 36095, 35831.8, 36095 (ms) 1001 messages received overall, latency(min, max, avg, 99%) = 35584, 36095, 35831.9, 36095 (ms) 1000 messages received in period, latency(min, max, avg, 99%) = 35584, 35839, 35712.0, 35839 (ms) 2001 messages received overall, latency(min, max, avg, 99%) = 35584, 36095, 35772.0, 36095 (ms) 1000 messages received in period, latency(min, max, avg, 99%) = 35584, 35839, 35712.0, 35839 (ms) 3001 messages received overall, latency(min, max, avg, 99%) = 35584, 36095, 35752.0, 36095 (ms) 1000 messages received in period, latency(min, max, avg, 99%) = 35584, 35839, 35712.0, 35839 (ms) 4001 messages received overall, latency(min, max, avg, 99%) = 35584, 36095, 35742.0, 36095 (ms) kafka consumer processed.
とりあえず目標達成。