This application demonstrates how to configure WSO2 Integrator: SI to send sweet production events via Kafka transport in JSON format.
- Set up Kafka as follows:
- Create a folder called
kafkaand another folder calledkafka-osgi. - Copy the following files from
{KafkaHome}/libsto the kafka folder you just created:- kafka_2.11-0.10.0.0.jar
- kafka-clients-0.10.0.0.jar
- metrics-core-2.2.0.jar
- scala-library-2.11.8.jar
- zkclient-0.8.jar
- zookeeper-3.4.6.jar
- Copy these same files to the
<SI_HOME>/samples/sample-clients/libfolder. - Navigate to
<SI_HOME>/binand issue the following command:- For Linux/macOS:
./jartobundle.sh <path/kafka> <path/kafka-osgi>
- For Windows:
./jartobundle.bat <path/kafka> <path/kafka-osgi>
- INFO: Created the OSGi bundle <kafka-lib-name>.jar for JAR file <absolute_path>/kafka/<kafka-lib-name>.jar - For Linux/macOS:
- Copy the OSGi-converted kafka libs from the
kafka-osgifolder to<SI_HOME>/lib.
- Create a folder called
- Save this sample.
- If there is no syntax error, the following message is shown on the console:
Siddhi App PublishKafkaInJsonFormat successfully deployed.
-
Navigate to
{KafkaHome}and start the zookeeper node using following command.bin/zookeeper-server-start.sh config/zookeeper.properties
-
Navigate to
{KafkaHome}and start the kafka server node using following command.bin/kafka-server-start.sh config/server.properties
-
Navigate to
<SI_HOME>/samples/sample-clients/kafka-consumerand run theantcommand without arguments. -
Start the Siddhi application by clicking on 'Run'.
-
If the Siddhi application starts successfully, the following messages are shown on the console:
- PublishKafkaInJsonFormat.siddhi - Started Successfully! - Kafka version : 0.10.0.0 - Kafka commitId : 23c69d62a0cabf06 - Kafka producer created.
Send events through one or more of the following methods.
-
Open the event simulator by clicking on the second icon or pressing Ctrl+Shift+I.
-
In the Single Simulation tab of the panel, specify the values as follows:
Siddhi App Name:PublishKafkaInJsonFormatStream Name:SweetProductionStream
-
In the batchNumber and lowTotal fields, enter the following values and then click Send to send the event.
batchNumber: 1 lowTotal: 50.50 -
Send some more events.
-
Open a new terminal and issue the following command:
curl -X POST -d '{"streamName": "SweetProductionStream", "siddhiAppName": "PublishKafkaInJsonFormat","data": [1, 50.50]}' http://localhost:9390/simulation/single -H 'content-type: text/plain'
-
If there is no error, the following messages are shown on the terminal:
{"status":"OK","message":"Single Event simulation started successfully"}
-
Install Postman from https://www.postman.com/downloads/.
-
Launch the Postman application.
-
Make a 'Post' request to the
http://localhost:9390/simulation/singleendpoint. Set theContent-Typetotext/plainand set the request body in text as follows:{"streamName": "SweetProductionStream", "siddhiAppName": "PublishKafkaInJsonFormat","data": [1, 50.50]} -
Click 'send'. If there is no error, the following messages are shown on the console:
"status": "OK", "message": "Single Event simulation started successfully"
See the output on the terminal of <SI_HOME>/samples/sample-clients/kafka-consumer:
[java] [org.wso2.si.sample.kafka.consumer.KafkaReceiver] : Event received in Kafka Event Adaptor: {"event":{"name":"chocolate cake","amount":50.50}}, offSet: 0, key: null, topic: kafka_result_topic, partition: 0
[java] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] : Committed offset 1 for partition kafka_result_topic-0
If the message "'Kafka' sink at 'LowProductionAlertStream' has successfully connected to http://localhost:9092' does not appear, it could be that port 9092 defined in the Siddhi application is already being used by a different program. To resolve this issue, do the following,
- Stop this Siddhi application (Click 'Run' on menu bar -> 'Stop').
- In this Siddhi application's source configuration, change port 9092 to an unused port.
- Start the application and check whether the specified messages appear on the console.
@App:name("PublishKafkaInJsonFormat")
@App:description('Send events via Kafka transport using JSON format')
define stream SweetProductionStream (batchNumber long, lowTotal double);
@sink(type='kafka',
topic='kafka_result_topic',
bootstrap.servers='localhost:9092',
@map(type='json'))
define stream LowProductionAlertStream (batchNumber long, lowTotal double);
@info(name='EventsPassthroughQuery')
from SweetProductionStream
select *
insert into LowProductionAlertStream;