• Archive by category "JAVA语言实践"
  • (Page 5)

Blog Archives

Java现实WebSocket

无所不能的Java系列文章,涵盖了Java的思想,应用开发,设计模式,程序架构等,通过我的经验去诠释Java的强大。

说起Java,真的有点不知道从何说起。Java是一门全领域发展的语言,从基础的来讲有4大块,Java语法,JDK,JVM,第三方类库。官方又以面向不同应用的角度,又把JDK分为JavaME,JavaSE,JavaEE三个部分。Java可以做客户端界面,可以做中间件,可以做手机系统,可以做应用,可以做工具,可以做游戏,可以做算法…,Java几乎无所不能。

在Java的世界里,Java就是一切。

关于作者

转载请注明出处:
http://blog.fens.me/java-websocket-intro/

java-websocket

前言

伴随着HTML5技术的新起,WebSocket 作为一种浏览器与服务器的核心通信技术,被嵌入到了浏览器的内核中。WebSocket 的出现使得浏览器提供对 Socket 的支持成为可能,从而在浏览器和服务器之间提供了一个基于 TCP 连接的双向通道。

所有新的技术都会第一时间在Java社区,出现对应的开源项目!WebSocket也被实现在多种Java的开源库中。WebSocket实现列表:https://java.net/projects/websocket-spec/pages/WebSocketAPIs/text。

今天就让我们用Java来解密一下WebSocket的服务器端和客户端 实现。

目录

  1. 服务器端实现(Tomcat)
  2. 客户端实现(Java-WebSocket)
  3. 客户端实现(Javascript原生API)

1. 服务器端实现(Tomcat)

用Java实现的websocket,在Server端是通过Tomcat内嵌支持的,我们需要开发一个继承WebSocketServlet 的servlet就可以了,与普通的HttpServlet没有太大区别。

1). JAVA环境:

  • Java: jdk 1.6.0_45, Server VM 64bit
  • Maven: 3.0.5
  • Tomcat: 7.0.39.0

~ D:\workspace\java>java -version
java version "1.6.0_45"
Java(TM) SE Runtime Environment (build 1.6.0_45-b06)
Java HotSpot(TM) 64-Bit Server VM (build 20.45-b01, mixed mode)

~ D:\workspace\java>mvn -version
Apache Maven 3.0.5 (r01de14724cdef164cd33c7c8c2fe155faf9602da; 2013-02-19 21:51:28+0800)
Maven home: D:\toolkit\maven3\bin\..
Java version: 1.6.0_45, vendor: Sun Microsystems Inc.
Java home: D:\toolkit\java\jdk6\jre
Default locale: zh_CN, platform encoding: GBK
OS name: "windows 7", version: "6.1", arch: "amd64", family: "windows"

~ D:\toolkit\tomcat7\bin>catalina.bat version
Using CATALINA_BASE:   "D:\toolkit\tomcat7"
Using CATALINA_HOME:   "D:\toolkit\tomcat7"
Using CATALINA_TMPDIR: "D:\toolkit\tomcat7\temp"
Using JRE_HOME:        "D:\toolkit\java\jdk6"
Using CLASSPATH:       "D:\toolkit\tomcat7\bin\bootstrap.jar;D:\toolkit\tomcat7\bin\tomcat-juli.jar"
Server version: Apache Tomcat/7.0.39
Server built:   Mar 22 2013 12:37:24
Server number:  7.0.39.0
OS Name:        Windows 7
OS Version:     6.1
Architecture:   amd64
JVM Version:    1.6.0_45-b06
JVM Vendor:     Sun Microsystems Inc.

2). maven构建一个简单的webapp项目。


~ D:\workspace\java>mvn archetype:generate -DgroupId=org.conan.websocket -DartifactId=websocketServer -DarchetypeArtifactId=maven-archetype-webapp

[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Old (1.x) Archetype: maven-archetype-webapp:1.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.conan.websocket
[INFO] Parameter: packageName, Value: org.conan.websocket
[INFO] Parameter: package, Value: org.conan.websocket
[INFO] Parameter: artifactId, Value: websocketServer
[INFO] Parameter: basedir, Value: D:\workspace\java
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] project created from Old (1.x) Archetype in dir: D:\workspace\java\websocketServer
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1:42.200s
[INFO] Finished at: Tue Aug 20 13:57:05 CST 2013
[INFO] Final Memory: 9M/179M
[INFO] ------------------------------------------------------------------------

3). 配置项目目录


~ D:\workspace\java>cd websocketServer
~ D:\workspace\java\websocketServer>mkdir src\main\java
~ D:\workspace\java\websocketServer>rm src\main\webapp\index.jsp

导入到Eclipse的项目截图

ws1

4). 编辑pom.xml配置文件,增加tomcat的依赖


~ vi pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.websocket</groupId>
<artifactId>websocketServer</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>websocketServer Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-catalina</artifactId>
<version>7.0.27</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-coyote</artifactId>
<version>7.0.39</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>websocketServer</finalName>
</build>
</project>

下载并安装类库

~ D:\workspace\java\websocketServer>mvn clean install

5). 创建DemoServlet,服务器端运行类


~ vi src/main/java/org/conan/websocket/DemoServlet.java

package org.conan.websocket;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.ArrayList;

import javax.servlet.http.HttpServletRequest;

import org.apache.catalina.websocket.MessageInbound;
import org.apache.catalina.websocket.StreamInbound;
import org.apache.catalina.websocket.WebSocketServlet;
import org.apache.catalina.websocket.WsOutbound;

public class DemoServlet extends WebSocketServlet {

    private static final long serialVersionUID = -4853540828121130946L;
    private static ArrayList mmiList = new ArrayList();

    @Override
    protected StreamInbound createWebSocketInbound(String str, HttpServletRequest request) {
        return new MyMessageInbound();
    }

    private class MyMessageInbound extends MessageInbound {
        WsOutbound myoutbound;

        @Override
        public void onOpen(WsOutbound outbound) {
            try {
                System.out.println("Open Client.");
                this.myoutbound = outbound;
                mmiList.add(this);
                outbound.writeTextMessage(CharBuffer.wrap("Hello!"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onClose(int status) {
            System.out.println("Close Client.");
            mmiList.remove(this);
        }

        @Override
        public void onTextMessage(CharBuffer cb) throws IOException {
            System.out.println("Accept Message : " + cb);
            for (MyMessageInbound mmib : mmiList) {
                CharBuffer buffer = CharBuffer.wrap(cb);
                mmib.myoutbound.writeTextMessage(buffer);
                mmib.myoutbound.flush();
            }
        }

        @Override
        public void onBinaryMessage(ByteBuffer bb) throws IOException {
        }
    }

}

6). 修改web.xml文件


~ vi src/main/webapp/WEB-INF/web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
<display-name>Archetype Created Web Application</display-name>
<servlet>
<servlet-name>wsServlet</servlet-name>
<servlet-class>org.conan.websocket.DemoServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>wsServlet</servlet-name>
<url-pattern>/wsServlet</url-pattern>
</servlet-mapping>
</web-app>

7). 编译,打包,部署到tomcat


~ D:\workspace\java\websocketServer>mvn clean install
~ D:\workspace\java\websocketServer>cp target\websocketServer.war D:\toolkit\tomcat7\webapps

启动tomcat


~ D:\toolkit\tomcat7>bin\catalina.bat run
Using CATALINA_BASE:   "D:\toolkit\tomcat7"
Using CATALINA_HOME:   "D:\toolkit\tomcat7"
Using CATALINA_TMPDIR: "D:\toolkit\tomcat7\temp"
Using JRE_HOME:        "D:\toolkit\java\jdk6"
Using CLASSPATH:       "D:\toolkit\tomcat7\bin\bootstrap.jar;D:\toolkit\tomcat7\bin\tomcat-juli.jar"
2013-8-20 14:43:29 org.apache.catalina.core.AprLifecycleListener init
信息: The APR based Apache Tomcat Native library which allows optimal performance in production environments was not fou
nd on the java.library.path: D:\toolkit\java\jdk6\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;D:\toolkit\
Rtools\bin;D:\toolkit\Rtools\gcc-4.6.3\bin;C:\Program Files (x86)\Common Files\NetSarang;C:\Windows\system32;C:\Windows;
C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;D:\toolkit\Git\cmd;D:\toolkit\Git\bin;C:\Program Fi
les (x86)\Microsoft SQL Server\100\Tools\Binn\;C:\Program Files\Microsoft SQL Server\100\Tools\Binn\;C:\Program Files\Mi
crosoft SQL Server\100\DTS\Binn\;c:\Program Files (x86)\Common Files\Ulead Systems\MPEG;C:\Program Files (x86)\QuickTime
\QTSystem\;D:\toolkit\MiKTex\miktex\bin\x64\;D:\toolkit\sshclient;D:\toolkit\ant19\bin;D:\toolkit\eclipse;D:\toolkit\gra
dle15\bin;D:\toolkit\java\jdk6\bin;D:\toolkit\maven3\bin;D:\toolkit\mysql56\bin;D:\toolkit\python27;D:\toolkit\putty;C:\
Program Files\R\R-3.0.1\bin\x64;D:\toolkit\mongodb243\bin;D:\toolkit\php54;D:\toolkit\nginx140;D:\toolkit\nodejs;D:\tool
kit\npm12\bin;D:\toolkit\java\jdk6\jre\bin\server;.
2013-8-20 14:43:30 org.apache.coyote.AbstractProtocol init
信息: Initializing ProtocolHandler ["http-bio-8080"]
2013-8-20 14:43:30 org.apache.coyote.AbstractProtocol init
信息: Initializing ProtocolHandler ["ajp-bio-8009"]
2013-8-20 14:43:30 org.apache.catalina.startup.Catalina load
信息: Initialization processed in 1409 ms
2013-8-20 14:43:30 org.apache.catalina.core.StandardService startInternal
信息: Starting service Catalina
2013-8-20 14:43:30 org.apache.catalina.core.StandardEngine startInternal
信息: Starting Servlet Engine: Apache Tomcat/7.0.39
2013-8-20 14:43:30 org.apache.catalina.startup.HostConfig deployWAR
信息: Deploying web application archive D:\toolkit\tomcat7\webapps\websocketServer.war
2013-8-20 14:43:30 org.apache.catalina.startup.HostConfig deployDirectory
信息: Deploying web application directory D:\toolkit\tomcat7\webapps\docs
2013-8-20 14:43:30 org.apache.catalina.startup.HostConfig deployDirectory
信息: Deploying web application directory D:\toolkit\tomcat7\webapps\examples
2013-8-20 14:43:31 org.apache.catalina.startup.HostConfig deployDirectory
信息: Deploying web application directory D:\toolkit\tomcat7\webapps\host-manager
2013-8-20 14:43:31 org.apache.catalina.startup.HostConfig deployDirectory
信息: Deploying web application directory D:\toolkit\tomcat7\webapps\manager
2013-8-20 14:43:31 org.apache.catalina.startup.HostConfig deployDirectory
信息: Deploying web application directory D:\toolkit\tomcat7\webapps\ROOT
2013-8-20 14:43:31 org.apache.coyote.AbstractProtocol start
信息: Starting ProtocolHandler ["http-bio-8080"]
2013-8-20 14:43:31 org.apache.coyote.AbstractProtocol start
信息: Starting ProtocolHandler ["ajp-bio-8009"]
2013-8-20 14:43:31 org.apache.catalina.startup.Catalina start
信息: Server startup in 996 ms

websocket的服务地址:
ws://localhost:8080/websocketServer/wsServlet

2. 客户端实现(Java-WebSocket)

通过Java实现websocket的客户端,这里将介绍的是”Java-WebSocket”。另外,我发现Java7已经原生支持了websocket, “JSR 365, Java API for WebSocket” (看来要开始学学java7和java8了,我在java6的时代停滞3-4年了。)

现在我们使用“Java-WebSocket”

1). 修改pom.xml文件,增加jetty websocket依赖库


~ vi pom.xml
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.3.0</version>
</dependency>

下载依赖库

~ D:\workspace\java\websocketServer>mvn clean install

2). 新建文件,ChatClient.java


~ vi src/main/java/org/conan/websocket/ChatClient.java

package org.conan.websocket;

import java.awt.Container;
import java.awt.GridLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowEvent;
import java.net.URI;
import java.net.URISyntaxException;

import javax.swing.JButton;
import javax.swing.JComboBox;
import javax.swing.JFrame;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.JTextField;

import org.java_websocket.WebSocketImpl;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
import org.java_websocket.drafts.Draft_10;
import org.java_websocket.drafts.Draft_17;
import org.java_websocket.drafts.Draft_75;
import org.java_websocket.drafts.Draft_76;
import org.java_websocket.handshake.ServerHandshake;

public class ChatClient extends JFrame implements ActionListener {
    private static final long serialVersionUID = -6056260699202978657L;

    private final JTextField uriField;
    private final JButton connect;
    private final JButton close;
    private final JTextArea ta;
    private final JTextField chatField;
    private final JComboBox draft;
    private WebSocketClient cc;

    public ChatClient( String defaultlocation ) {
        super( "WebSocket Chat Client" );
        Container c = getContentPane();
        GridLayout layout = new GridLayout();
        layout.setColumns( 1 );
        layout.setRows( 6 );
        c.setLayout( layout );

        Draft[] drafts = { new Draft_17(), new Draft_10(), new Draft_76(), new Draft_75() };
        draft = new JComboBox( drafts );
        c.add( draft );

        uriField = new JTextField();
        uriField.setText( defaultlocation );
        c.add( uriField );

        connect = new JButton( "Connect" );
        connect.addActionListener( this );
        c.add( connect );

        close = new JButton( "Close" );
        close.addActionListener( this );
        close.setEnabled( false );
        c.add( close );

        JScrollPane scroll = new JScrollPane();
        ta = new JTextArea();
        scroll.setViewportView( ta );
        c.add( scroll );

        chatField = new JTextField();
        chatField.setText( "" );
        chatField.addActionListener( this );
        c.add( chatField );

        java.awt.Dimension d = new java.awt.Dimension( 300, 400 );
        setPreferredSize( d );
        setSize( d );

        addWindowListener( new java.awt.event.WindowAdapter() {
            @Override
            public void windowClosing( WindowEvent e ) {
                if( cc != null ) {
                    cc.close();
                }
                dispose();
            }
        } );

        setLocationRelativeTo( null );
        setVisible( true );
    }

    public void actionPerformed( ActionEvent e ) {

        if( e.getSource() == chatField ) {
            if( cc != null ) {
                cc.send( chatField.getText() );
                chatField.setText( "" );
                chatField.requestFocus();
            }

        } else if( e.getSource() == connect ) {
            try {
                // cc = new ChatClient(new URI(uriField.getText()), area, ( Draft ) draft.getSelectedItem() );
                cc = new WebSocketClient( new URI( uriField.getText() ), (Draft) draft.getSelectedItem() ) {

                    @Override
                    public void onMessage( String message ) {
                        ta.append( "got: " + message + "\n" );
                        ta.setCaretPosition( ta.getDocument().getLength() );
                    }

                    @Override
                    public void onOpen( ServerHandshake handshake ) {
                        ta.append( "You are connected to ChatServer: " + getURI() + "\n" );
                        ta.setCaretPosition( ta.getDocument().getLength() );
                    }

                    @Override
                    public void onClose( int code, String reason, boolean remote ) {
                        ta.append( "You have been disconnected from: " + getURI() + "; Code: " + code + " " + reason + "\n" );
                        ta.setCaretPosition( ta.getDocument().getLength() );
                        connect.setEnabled( true );
                        uriField.setEditable( true );
                        draft.setEditable( true );
                        close.setEnabled( false );
                    }

                    @Override
                    public void onError( Exception ex ) {
                        ta.append( "Exception occured ...\n" + ex + "\n" );
                        ta.setCaretPosition( ta.getDocument().getLength() );
                        ex.printStackTrace();
                        connect.setEnabled( true );
                        uriField.setEditable( true );
                        draft.setEditable( true );
                        close.setEnabled( false );
                    }
                };

                close.setEnabled( true );
                connect.setEnabled( false );
                uriField.setEditable( false );
                draft.setEditable( false );
                cc.connect();
            } catch ( URISyntaxException ex ) {
                ta.append( uriField.getText() + " is not a valid WebSocket URI\n" );
            }
        } else if( e.getSource() == close ) {
            cc.close();
        }
    }

    public static void main( String[] args ) {
        WebSocketImpl.DEBUG = true;
        String location;
        if( args.length != 0 ) {
            location = args[ 0 ];
            System.out.println( "Default server url specified: \'" + location + "\'" );
        } else {
            location = "ws://localhost:8887";
            System.out.println( "Default server url not specified: defaulting to \'" + location + "\'" );
        }
        new ChatClient( location );
    }
}

运行程序:
这里会启动一个Java的GUI界面。输入websocket服务的地址:ws://localhost:8080/websocketServer/wsServlet
ws6

查看Java客户端和HTML客户端的对话,在Java客户端中,输入”你好,小朋友”。
ws4

我们发现在html的客户端中,同样出现的”你好,小朋友”的消息记录。
ws5

这样,我们就在Java6的环境中,实现了Java WebSocket的客户端程序。

3. 客户端实现(Javascript原生API)

编写一个纯HTML的网页,通过浏览器原生的websocketAPI实现对websocket的服务的调用。

~ vi D:\workspace\javascript\tomcatClient.html

<!DOCTYPE html>
<html>
<head>
<meta charset=UTF-8>
<title>Tomcat WebSocket Chat</title>
<script>
var ws = new WebSocket("ws://localhost:8080/websocketServer/wsServlet");
ws.onopen = function(){
};
ws.onmessage = function(message){
document.getElementById("chatlog").textContent += message.data + "\n";
};
function postToServer(){
ws.send(document.getElementById("msg").value);
document.getElementById("msg").value = "";
}
function closeConnect(){
ws.close();
}
</script>
</head>
<body>
<textarea id="chatlog" readonly></textarea><br/>
<input id="msg" type="text" />
<button type="submit" id="sendButton" onClick="postToServer()">Send!</button>
<button type="submit" id="sendButton" onClick="closeConnect()">End</button>
</body>
</html>

通过浏览器刚刚编写的文件:file:///D:/workspace/javascript/tomcatClient.html

打开两个窗口:
ws2

在右边窗口输入”我是BBB”,然后点击send。左边,右这,和后台日志,同时增加了”我是BBB”。
ws3

原来在浏览器上面,实现聊天功能是如此地简单!!

转载请注明出处:
http://blog.fens.me/nodejs-websocket-intro/

打赏作者

ZooKeeper实现分布式FIFO队列

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

转载请注明出处:
http://blog.fens.me/zookeeper-queue-fifo

zookeeper-fifo-queue

前言

ZooKeeper是一个强大的分布式协作系统,用ZooKeeper可以方便地实现先进先出(FIFO)队列。给“队列”的技术现实多一种选择,标准化我们的程序结构。另一篇,分步式同步队列实现,请参考:ZooKeeper实现分布式队列Queue

关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用

目录

  1. 分布式先进先出(FIFO)队列
  2. 设计思路
  3. 程序实现

1. 分布式先进先出(FIFO)队列

在计算机科学中,消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式。消息队列提供了异步的通信协议,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

先进先出(FIFO)队列,是消息队列最基本的一种实现形式,先发出的先消费。

2. 设计思路

实现的思路也非常简单,在/queue-fifo的目录下创建 SEQUENTIAL 类型的子目录 /x(i),这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证FIFO。

zookeeper-queue-fifo

应用实例
zookeeper-queue-fifo-app
图标解释

  1. app1,app2,app3是3个独立的业务系统
  2. zk1,zk2,zk3是ZooKeeper集群的3个连接点
  3. /queue-fifo,是znode的队列,按顺序存储数据
  4. /queue-fifo/x1,是znode队列中,1号排对者,由app1提交
  5. /queue-fifo/x2,是znode队列中,2号排对者,由app2提交
  6. app3是消费者,通过zk3连接到znode队列中,找到/queue-fifo中顺序最少的节点消费,删除消费后的节点(红色线表示)

注:

  • 1). app1可以通过zk2提交,app2也可通过zk3提交
  • 2). app1可以提交3次请求,生成x1,x2,x3多个节点
  • 3). app1可以作为消费者,消费队列数据

3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,生产2个节点,然后再消费3个节点。


    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

创建一个与服务器的连接


    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, null);
        return zk;
    }

出始化队列


    public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

生产者


    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

消费者


    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

启动main函数


    public static void main(String[] args) throws Exception {
            doOne();
    }

运行结果:


/queue-fifo is exist!
create /queue-fifo/x1 x1
create /queue-fifo/x2 x2
delete /queue/x10000000032
delete /queue/x20000000033
No node to cosume

完全符合我的们预期,由于produce时,我们创建的节点模式是EPHEMERAL_SEQUENTIAL,所以系统会在x(i)(n),随机生成n=0000000032,输出为x10000000032。

接下来我们看分布式环境。

2). 分布式模拟实验
app1通过zk1生产x1, app2通过zk2生产x2, app3通过zk3消费3个节点


public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

启动main函数


    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3
run1

run1: 执行app1–>zk1


#日志输出
/queue-fifo is exist!
create /queue-fifo/x1 x1

run2: 执行app2–>zk2


#日志输出
/queue-fifo is exist!
create /queue-fifo/x2 x2

run3: 执行app3–>zk3


#日志输出
/queue-fifo is exist!
delete /queue/x10000000034
delete /queue/x20000000035
No node to cosume

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:


package org.conan.zookeeper.demo;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class FIFOZooKeeper {

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        if (zk.exists("/queue-fifo", false) == null) {
            System.out.println("create /queue-fifo task-queue-fifo");
            zk.create("/queue-fifo", "task-queue-fifo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue-fifo is exist!");
        }
    }

    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

}

转载请注明出处:
http://blog.fens.me/zookeeper-queue-fifo

打赏作者

ZooKeeper实现分布式队列Queue

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

转载请注明出处:
http://blog.fens.me/zookeeper-queue
zookeeper-queue

前言
ZooKeeper是一个分步式的协作系统,何为协作,ZooKeeper价值又有何体现。通过这篇文章的分布式队列的案例,你将了解到ZooKeeper的强大。关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用

目录

  1. 分布式队列
  2. 设计思路
  3. 程序实现

1. 分布式队列

队列有很多种产品,大都是消息系统所实现的,像ActiveMQ,JBossMQ,RabbitMQ,IBM-MQ等。分步式队列产品并不太多,像Beanstalkd。

本文实现的分布式对列,是基于ZooKeeper现实的一种同步的分步式队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。

2. 设计思路

创建一个父目录 /queue,每个成员都监控(Watch)标志位目录/queue/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /queue/x(i)的临时目录节点,然后每个成员获取 /queue 目录的所有目录节点,也就是 x(i)。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /queue/start 的出现,如果已经相等就创建 /queue/start。

产品流程图
zookeeper-queue

应用实例
zookeeper-queue-dataprocess

图标解释

  1. app1,app2,app3,app4是4个独立的业务系统
  2. zk1,zk2,zk3是ZooKeeper集群的3个连接点
  3. /queue,是znode的队列,假设队列长度为3
  4. /queue/x1,是znode队列中,1号排对者,由app1提交,同步请求,app1挂载等待
  5. /queue/x2,是znode队列中,2号排对者,由app2提交,同步请求,app2挂起等待
  6. /queue/x3,是znode队列中,3号排对者,由app3提交,同步请求,app3挂起等待
  7. /queue/start,当znode队列中满了,触发创建开始节点
  8. 当/qeueu/start被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束

注:

  • 1). 创建/queue/x1,/queue/x2,/queue/x3没有前后顺序,提交后程序就同步挂起。
  • 2). app1可以通过zk2提交,app2也可通过zk3提交
  • 3). app1可以提交3次请求,生成x1,x2,x3使用队列充满
  • 4). /queue/start被创建后,zk1会监听到这个事件,再告诉app1,队列已完成!

3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,提交3个请求


   public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        joinQueue(zk, 1);
        joinQueue(zk, 2);
        joinQueue(zk, 3);
        zk.close();
    }

创建一个与服务器的连接


    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监听/queue/start创建的事件
            public void process(WatchedEvent event) {
                if (event.getPath() != null && event.getPath().equals("/queue/start") && event.getType() == Event.EventType.NodeCreated) {
                    System.out.println("Queue has Completed.Finish testing!!!");
                }
            }
        });
        return zk;
    }

出始化队列


    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => /queue/start");
        zk.exists("/queue/start", true);

        if (zk.exists("/queue", false) == null) {
            System.out.println("create /queue task-queue");
            zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue is exist!");
        }
    }

增加队列节点


    public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue/x" + x + " x" + x);
        zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        isCompleted(zk);
    }

检查队列是否完整


    public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
        int size = 3;
        int length = zk.getChildren("/queue", true).size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create /queue/start start");
            zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } 
    }

启动函数main


public static void main(String[] args) throws Exception {
    doOne();
}

运行结果:


WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3
create /queue/x2 x2
Queue Complete:2/3
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

完全符合我的们预期。接下来我们看分布式环境

2). 分布式模拟实验

模拟app1通过zk1提交x1,app2通过zk2提交x2,app3通过zk3提交x3


    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            joinQueue(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            joinQueue(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            joinQueue(zk, 3);
            break;
        }
    }

注:

  • 1). 为了简单起见,我们没有增加复杂的多线程控制的机制。
  • 2). 没有调用zk.close()方法,也就是说,app1执行完单独的提交,app1就结束了,但zk1还存在着,所以/queue/x1存在于队列。
  • 3). 程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3

zk-run1

执行app1–>zk1


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 4] ls /queue
[x10000000011]

执行app2–>zk2


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x2 x2
Queue Complete:2/3

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[x20000000012, x10000000011]

执行app3–>zk3


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[x30000000016, x10000000014, start, x20000000015]

/queue/stats被建立,打印出“Queue has Completed.Finish testing!!!”,代表调用app4完成!

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:


package org.conan.zookeeper.demo;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;

public class QueueZooKeeper {

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        joinQueue(zk, 1);
        joinQueue(zk, 2);
        joinQueue(zk, 3);
        zk.close();
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            joinQueue(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            joinQueue(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            joinQueue(zk, 3);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals("/queue/start")) {
                    System.out.println("Queue has Completed.Finish testing!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => /queue/start");
        zk.exists("/queue/start", true);

        if (zk.exists("/queue", false) == null) {
            System.out.println("create /queue task-queue");
            zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue is exist!");
        }
    }

    public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue/x" + x + " x" + x);
        zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
        int size = 3;
        int length = zk.getChildren("/queue", true).size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create /queue/start start");
            zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } 
    }

}

转载请注明出处:
http://blog.fens.me/zookeeper-queue

打赏作者

解惑rJava R与Java的高速通道

R的极客理想系列文章,涵盖了R的思想,使用,工具,创新等的一系列要点,以我个人的学习和体验去诠释R的强大。

R语言作为统计学一门语言,一直在小众领域闪耀着光芒。直到大数据的爆发,R语言变成了一门炙手可热的数据分析的利器。随着越来越多的工程背景的人的加入,R语言的社区在迅速扩大成长。现在已不仅仅是统计领域,教育,银行,电商,互联网….都在使用R语言。

要成为有理想的极客,我们不能停留在语法上,要掌握牢固的数学,概率,统计知识,同时还要有创新精神,把R语言发挥到各个领域。让我们一起动起来吧,开始R的极客理想。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: [email protected]

转载请注明出处:
http://blog.fens.me/r-rjava-java
rjava

前言
Java语言在工业界长期处于霸主地位,Java语法、JVM、JDK、Java开源库,在近10年得到了爆发式的发展,几乎覆盖了应用开发的所有领域。伴随着Java的全领域发展,问题也随之而来了。语法越来越复杂,近似的项目越来越多,学好Java变得很难。对于没有IT背景的统计人员,学用Java更是难于上青天。

R一直是统计圈内处于佼佼者的语言,语法简单,学习曲线不太长也不太陡。如果能结合Java的通用性和R的专业性,碰撞出的火花,将会缤纷绚烂。

本文将介绍R与Java连接的高速通道,rJava通信方案。另外一篇文章介绍的Rserve通信方案,请参考: Rserve与Java的跨平台通信

目录

  1. rJava介绍
  2. rJava安装
  3. rJava实现R调用Java
  4. rJava(JRI)实现Java调用R (win7)
  5. rJava(JRI)实现Java调用R (Ubuntu)

1. rJava介绍

rJava是一个R语言和Java语言的通信接口,通过底层JNI实现调用,允许在R中直接调用Java的对象和方法。

rJava还提供了Java调用R的功能,是通过JRI(Java/R Interface)实现的。JRI现在已经被嵌入到rJava的包中,我们也可以单独试用这个功能。现在rJava包,已经成为很多基于Java开发R包的基础功能组件。

正式由于rJava是底层接口,并使用JNI作为接口调用,所以效率非常高。在JRI的方案中,JVM直接通过内存直接加载RVM,调用过程性能几乎无损耗,因此是非常高效连接通道,是R和Java通信的首选开发包。

2. rJava安装

系统环境:

  • Linux Ubuntu 12.04.2 LTS 64bit server
  • R version 3.0.1 64bit
  • Java (Oracle SUN) 1.6.0_29 64bit Server VM

~ uname -a
Linux conan 3.5.0-23-generic #35~precise1-Ubuntu SMP Fri Jan 25 17:13:26 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

~ cat /etc/issue
Ubuntu 12.04.2 LTS \n \l

~ R --version
R version 3.0.1 (2013-05-16) -- "Good Sport"
Copyright (C) 2013 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under the terms of the
GNU General Public License versions 2 or 3.
For more information about these matters see
http://www.gnu.org/licenses/.

~ java -version
java version "1.6.0_29"
Java(TM) SE Runtime Environment (build 1.6.0_29-b11)
Java HotSpot(TM) 64-Bit Server VM (build 20.4-b02, mixed mode)

rJava安装


#配置rJava环境
~ sudo R CMD javareconf

#启动R
~ sudo R
> install.packages("rJava")
installing via 'install.libs.R' to /usr/local/lib/R/site-library/rJava
** R
** inst
** preparing package for lazy loading
** help
*** installing help indices
** building package indices
** testing if installed package can be loaded
* DONE (rJava)

The downloaded source packages are in
        ‘/tmp/RtmpiZyCE7/downloaded_packages’

3. rJava实现R调用Java

在R环境中,使用rJava包编程


#加载rJava包
> library(rJava)
> search()
 [1] ".GlobalEnv"        "package:rJava"     "package:stats"
 [4] "package:graphics"  "package:grDevices" "package:utils"
 [7] "package:datasets"  "package:methods"   "Autoloads"
[10] "package:base"

#启动JVM
> .jinit()

#声明并赋值到字符串
> s <- .jnew("java/lang/String", "Hello World!")
> s
[1] "Java-Object{Hello World!}"

#查看字符串长度
> .jcall(s,"I","length")
[1] 12

#索引World的位置
> .jcall(s,"I","indexOf","World")
[1] 6

#查看concat的方法声明
> .jmethods(s,"concat")
[1] "public java.lang.String java.lang.String.concat(java.lang.String)"

#使用concat方法连接字符串
> .jcall(s,"Ljava/lang/String;","concat",s)
[1] "Hello World!Hello World!"

#打印字符串对象
> print(s)
[1] "Java-Object{Hello World!}"

#打印字符串值
> .jstrVal(s)
[1] "Hello World!"

rJava优化过的方法调用,用$来调用方法


#同.jcall(s,"I","length")
> s$length()
[1] 12

#同.jcall(s,"I","indexOf","World")
> s$indexOf("World")
[1] 6

4. rJava(JRI)实现Java调用R (win7)

在win7中安装rJava

系统环境:

  • win7 64bit 旗舰版
  • R 3.0.1
  • Java 1.6.0_45

设置环境变量


PATH: C:\Program Files\R\R-3.0.1\bin\x64;D:\toolkit\java\jdk6\bin;;D:\toolkit\java\jdk6\jre\bin\server
JAVA_HOME: D:\toolkit\java\jdk6
CLASSPATH: C:\Program Files\R\R-3.0.1\library\rJava\jri

在R中安装rJava


> install.packages("rJava")

#加载rJava
> library(rJava)
> .jinit()

#R调用Java变量测试
> s <- .jnew("java/lang/String", "Hello World!")
> s
[1] "Java-Object{Hello World!}"

启动Eclipse编写程序
rjava2


package org.conan.r.rjava;

import org.rosuda.JRI.Rengine;

public class DemoRJava {

    public static void main(String[] args) {
        DemoRJava demo = new DemoRJava();
        demo.callRJava();
    }

    public void callRJava() {
        Rengine re = new Rengine(new String[] { "--vanilla" }, false, null);
        if (!re.waitForR()) {
            System.out.println("Cannot load R");
            return;
        }
        
        //打印变量
        String version = re.eval("R.version.string").asString();
        System.out.println(version);

        //循环打印数组
        double[] arr = re.eval("rnorm(10)").asDoubleArray();
        for (double a : arr) {
            System.out.print(a + ",");
        }
        re.end();
    }
}

在Eclipse启动设置VM参数:

-Djava.library.path="C:\Program Files\R\R-3.0.1\library\rJava\jri\x64"

rjava

运行结果:


R version 3.0.1 (2013-05-16)
0.04051018703700011,-0.3321596519938258,0.45642459001166913,-1.1907153494936031,1.5872266854172385,1.3639721994863943,-0.6309712627586983,-1.5226698569087498,-1.0416402147174952,0.4864034017637044,

打包DemoRJava.jar
在Eclipse中完成打包,上传到linux环境,继续测试。

5. rJava(JRI)实现Java调用R (Ubuntu)

新建目录DemoRJava,上传DemoRJava.jar到DemoRJava


~ mkdir /home/conan/R/DemoRJava
~ cd /home/conan/R/DemoRJava
~ ls -l
-rw-r--r-- 1 conan conan 1328 Aug  8  2013 DemoRJava.jar

运行Jar包


~ export R_HOME=/usr/lib/R
~ java -Djava.library.path=/usr/local/lib/R/site-library/rJava/jri -cp /usr/local/lib/R/site-library/rJava/jri/JRI.jar:/home/conan/R/DemoRJava/DemoRJava.jar org.conan.r.rjava.DemoRJava

运行结果


R version 3.0.1 (2013-05-16)
0.6374494596732511,1.3413824702002808,0.04573045670001342,-0.6885617932810327,0.14970067632722675,-0.3989493870007832,-0.6148250252955993,0.40132038323714453,-0.5385260423222166,0.3459850956295771,

我们完成了,R和Java的互调。包括了R通过rJava调用Java,Java通过JRI调用R。并演示了win和linux中的使用方法。

转载请注明出处:
http://blog.fens.me/r-rjava-java

打赏作者

Rserve与Java的跨平台通信

R的极客理想系列文章,涵盖了R的思想,使用,工具,创新等的一系列要点,以我个人的学习和体验去诠释R的强大。

R语言作为统计学一门语言,一直在小众领域闪耀着光芒。直到大数据的爆发,R语言变成了一门炙手可热的数据分析的利器。随着越来越多的工程背景的人的加入,R语言的社区在迅速扩大成长。现在已不仅仅是统计领域,教育,银行,电商,互联网….都在使用R语言。

要成为有理想的极客,我们不能停留在语法上,要掌握牢固的数学,概率,统计知识,同时还要有创新精神,把R语言发挥到各个领域。让我们一起动起来吧,开始R的极客理想。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: [email protected]

转载请注明出处:
http://blog.fens.me/r-rserve-java/

rserve-java

前言

现在主流的异构跨平台通信组件Apache Thrift已经火遍大江南北,支持15种编程语言,但是到目前为止还没有加入R语言。要让R实现跨平台的通信,就只能从R的社区中找方案,像rJava,RCpp,rpy都是2种语言结合的方案,这些方案类似地会把R引擎加载到其他的语言内存环境。优点是高效,缺点是紧耦合,扩展受限,接口程序无法重用。

Rserve给了我们一种新的选择,抽象R语言网络接口,基于TCP/IP协议实现与多语言之间的通信。让我们体验一下Rserve与Java的跨平台通信。

目录

  1. Rserve介绍
  2. Rserve安装
  3. Java远程连接Rserve

1. Rserve介绍

Rserve是一个基于TCP/IP协议的,允许R语言与其他语言通信的C/S结构的程序,支持C/C++,Java,PHP,Python,Ruby,Nodejs等。 Rserve提供远程连接,认证,文件传输等功能。我们可以设计R做为后台服务,处理统计建模,数据分析,绘图等的任务。

2. Rserve安装

系统环境:
Linux Ubuntu 12.04.2 LTS 64bit server
R 3.0.1 64bit


~ uname -a
Linux conan 3.5.0-23-generic #35~precise1-Ubuntu SMP Fri Jan 25 17:13:26 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

~ cat /etc/issue
Ubuntu 12.04.2 LTS \n \l

~ R --version
R version 3.0.1 (2013-05-16) -- "Good Sport"
Copyright (C) 2013 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under the terms of the
GNU General Public License versions 2 or 3.
For more information about these matters see
http://www.gnu.org/licenses/.

Rserve安装


#建议使用root权限安装
~ sudo R

> install.packages("Rserve")
installing via 'install.libs.R' to /usr/local/lib/R/site-library/Rserve
** R
** inst
** preparing package for lazy loading
** help
*** installing help indices
** building package indices
** testing if installed package can be loaded
* DONE (Rserve)

启动Rserve


~ R CMD Rserve

R version 3.0.1 (2013-05-16) -- "Good Sport"
Copyright (C) 2013 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

Rserv started in daemon mode.

#查看进程
~ ps -aux|grep Rserve
conan     7142  0.0  1.2 116296 25240 ?        Ss   09:13   0:00 /usr/lib/R/bin/Rserve

#查看端口
~ netstat -nltp|grep Rserve
tcp        0      0 127.0.0.1:6311          0.0.0.0:*               LISTEN      7142/Rserve

这时Rserve已经启动,端口是6311。接下来,我们来简单地用一下。

Java远程连接Rserve

1). 远程连接Rserve
刚刚启动时,使用的本地模式,如果想运程连接需要增加参数 –RS-enable-remote


#杀掉刚才的Rserve守护进程
~ kill -9 7142

#打开远程模式重新启动
~ R CMD Rserve --RS-enable-remote

#查看端口
~ netstat -nltp|grep Rserve
tcp        0      0 0.0.0.0:6311            0.0.0.0:*               LISTEN      7173/Rserve

0 0.0.0.0:6311,表示不限IP访问了。

2). 下载Java客户端JAR包
下载Java客户端JAR包:http://www.rforge.net/Rserve/files/

  • REngine.jar
  • RserveEngine.jar

3). 创建Java工程
在Eclipse中新建Java工程,并加载JAR包环境中。
rserve1

4). Java编程实现


package org.conan.r.rserve;

import org.rosuda.REngine.REXP;
import org.rosuda.REngine.REXPMismatchException;
import org.rosuda.REngine.Rserve.RConnection;
import org.rosuda.REngine.Rserve.RserveException;

public class Demo1 {

    public static void main(String[] args) throws RserveException, REXPMismatchException {
        Demo1 demo = new Demo1();
        demo.callRserve();
    }

    public void callRserve() throws RserveException, REXPMismatchException {
        RConnection c = new RConnection("192.168.1.201");
        REXP x = c.eval("R.version.string");
        System.out.println(x.asString());//打印变量x

        double[] arr = c.eval("rnorm(10)").asDoubles();
        for (double a : arr) {//循环打印变量arr
            System.out.print(a + ",");
        }
    }
}

5). 运行结果


R version 3.0.1 (2013-05-16)
1.7695224124757984,-0.29753038160770323,0.26596993631142246,1.4027325257239547,-0.30663565983302676,-0.17594309812158912,0.10071253841443684,0.9365455161259986,0.11272119436439701,0.5766373030674361,

通过Rserve非常简单地实现了,Java和R的通信。
解决了通信的问题,我们就可以发挥想象,把R更广泛的用起来。

接下来,会讲到如何设计Java和R互相调用的软件架构。敬请关注….

转载请注明出处:
http://blog.fens.me/r-rserve-java/

打赏作者