/**
* <strong>游戏服务器</strong>
* <p>
* 本服务器不和具体的业务和数据相关,仅仅实现一个通用的,标准的游戏TCP/IP Server
* </p>
*
* <p>
* 主要实现如下功能:
* 1) NIO Socket通信
* 2) 内存数据库管理
* 3) 消息管理
* 4) 数据库连接池
* 5) 启动加载管理
* 6) 游戏组件管理
* 7) 自定义异常
* 8) 数据包管理
* 9) 管理控制台: 使用Web方式实现
* 10) 性能测试包
* 11) 日志管理
* 12) 配置管理
* </p>
*
* <p>
* 系统配置文件为: config.xml
* 资源配置文件为: information.xml
* </p>
*
* @author: 汉娱网络.技术部.yangxf
* @date: 2007-12-24
* @version: 1.1
**/
package com.handjoys;
import java.util.Timer;
import java.util.Map;
import java.util.HashMap;
import java.util.Queue;
import java.util.Set;
import java.util.Vector;
import java.util.List;
import java.util.LinkedList;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Enumeration;
import java.util.zip.Deflater;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.net.NetworkInterface;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.BufferUnderflowException;
import java.nio.channels.SocketChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import com.handjoys.socket.SelectorFactory;
import com.handjoys.socket.DirectBufferFactory;
import com.handjoys.socket.ConnectionCleanerTask;
import com.handjoys.socket.GSession;
import com.handjoys.socket.RServerFactory;
import com.handjoys.socket.GBBServer;
import com.handjoys.conf.ConfigReader;
import com.handjoys.conf.ConfigParam;
import com.handjoys.logger.FileLogger;
import com.handjoys.startup.Startup;
import com.handjoys.startup.ShutdownManager;
import com.handjoys.packet.MessagePacket;
import com.handjoys.packet.MessageParser;
import com.handjoys.gcm.GCManager;
import com.handjoys.console.GameState;
import com.handjoys.console.MovableObject;
public class GameServer extends Thread {
//private static final int HEADER_MAX_BYTES=6;
private static final int CONNECTION_CLEANER_INTERVAL=5*60;
private static final int SESSION_INITIALCAPACITY=100000;
private static final float SESSION_LOADFACTOR=0.5f;
private static GameServer instance;
private GCManager gcm;
private ServerSocketChannel sSockChan;
private LinkedList<Startup> startups;
// 处理connect
private SelectionKey acceptKey;
private Selector acceptSelector;
// 处理数据接受
private Selector readSelector;
private long serverStartTime=0;
// 做shutdown处理的标志
public boolean IS_SHUTTING_DOWN=false;
// 接受的SocketChannel连接队列
private Queue<SocketChannel> clients;
// 待处理消息的session队列
private LinkedBlockingQueue<GSession> workersQueue;
// 接受的消息数据Map, 注意: 每个session维护自己的内部消息队列
// 这样处理的优点:减少线程切换的次数, 一次可以处理一个session的多个消息(高并发下)
// 注意该LinkedList操作需要synchronized
private Map<String, LinkedList<String>> channelEvents;
private int receiveBufferSize;
//private int maxConnected;
// 读IO缓冲区
private ByteBuffer readBuffer;
private CharsetDecoder utfDecoder;
private Charset utfEncoder;
//private Deflater zlibCompresser;
//
private EventReader eventReader;
private EventWriter eventWriter;
private Timer connectionCleanerTimer;
// byte字节缓存区
private byte[] receiveBytes;
private byte[] sendBytes;
// 断报文缓存
private HashMap<SocketChannel, StringBuffer> invalidXml;
public GameServer() {
super("GameServer");
startups=new LinkedList<Startup>();
clients=new ConcurrentLinkedQueue<SocketChannel>();
workersQueue=new LinkedBlockingQueue<GSession>();
channelEvents=new ConcurrentHashMap<String, LinkedList<String>>(SESSION_INITIALCAPACITY, SESSION_LOADFACTOR, 2);
receiveBufferSize=((Integer)(ConfigReader.getParam(ConfigParam.RECEIVEBUFFERSIZE))).intValue();
//maxConnected=((Integer)(ConfigReader.getParam(ConfigParam.MAXROOM))).intValue() * ((Integer)(ConfigReader.getParam(ConfigParam.MAXROOMUSER))).intValue();
readBuffer = ByteBuffer.allocateDirect(receiveBufferSize);
utfDecoder=Charset.forName("UTF-8").newDecoder();
utfEncoder=Charset.forName("UTF-8");
//zlibCompresser=new Deflater();
receiveBytes=new byte[receiveBufferSize];
sendBytes=new byte[receiveBufferSize];
invalidXml=new HashMap<SocketChannel, StringBuffer>();
// 打印服务器配置信息
welcomeMessage();
showSystemInfo();
ConfigReader.print();
// 加载启动类
loadStartUp();
// 加载游戏组件管理器
gcm=new GCManager();
gcm.init();
// 初始化游戏状态
GameState.init();
// 注册Shutdown回调线程
ShutdownManager shutDownManager = new ShutdownManager();
Runtime.getRuntime().addShutdownHook(shutDownManager);
}
public static void main(String[] args) {
instance = new GameServer();
getInstance().start();
}
public void run() {
// 监听端口
initSocketServer();
Broadcaster.init(this);
RServerFactory.init(this);
GBBServer.init(this);
// 启动eventreader线程
eventReader=new EventReader(this);
eventWriter=new EventWriter(this);
eventWriter.initWriter();
//connectionCleanerTimer = new Timer(true);
//connectionCleanerTimer.schedule(new ConnectionCleanerTask(), CONNECTION_CLEANER_INTERVAL * 1000, CONNECTION_CLEANER_INTERVAL * 1000);
while(!IS_SHUTTING_DOWN) {
acceptNewConnections();
try {
/* 连接管理不是GameServer的主要功能(由IMServer处理) */
Thread.sleep(20L);
}catch(InterruptedException ie){
// 发生interrupt线程调度事件, 不做特殊处理,
FileLogger.error(Thread.currentThread().getName() + " InterruptedException during Accept task");
}
}
}
private void acceptNewConnections() {
SocketChannel clientChannel=null;
try{
acceptSelector.select();
Set readyKeys = acceptSelector.selectedKeys();
for(Iterator i = readyKeys.iterator(); i.hasNext(); ){
SelectionKey key = (SelectionKey)i.next();
i.remove();
clientChannel = ((ServerSocketChannel)key.channel()).accept();
/*
// 判断是否达到最大连接数
// 最大连接数=MaxRoom * MaxRoomUser
if(maxConnected <= clients.size()) {
FileLogger.info("refuse connection: clients size=" + clients.size());
if(clientChannel.isConnected())
clientChannel.close();
continue;
}
*/
clientChannel.configureBlocking(false);
// 禁用 TCP Nagle 算法, 最小化报文传输的延时
clientChannel.socket().setTcpNoDelay(true);
SelectionKey skey=clientChannel.register(readSelector, SelectionKey.OP_READ, null);
clients.add(clientChannel);
FileLogger.info("accept new connection:" + clientChannel.socket().toString());
}
}catch(Exception e){
FileLogger.error("Generic Exception in acceptNewConnections():" + e);
e.printStackTrace();
}
}
public void readIncomingMessages() {
try{
readSelector.selectNow();
Set readyKeys = readSelector.selectedKeys();
if(readyKeys.size() == 0)
return;
for(Iterator i = readyKeys.iterator(); i.hasNext();) {
SelectionKey key = (SelectionKey)i.next();
i.remove();
// Since the ready operations are cumulative,
// need to check readiness for each operation
if(!key.isValid()) {
key.cancel();
FileLogger.debug( " readIncomingMessages key.isValid lostConnection");
continue;
}
SocketChannel channel = (SocketChannel)key.channel();
try{
readBuffer.clear();
long nbytes = channel.read(readBuffer);
if(nbytes == -1L) {
// 当Client端断开Socket后, 会发送一个数据过来, 此时读到的是-1
FileLogger.debug("readIncomingMessages channel.read(readBuffer) = -1L lostConnection");
lostConnection(channel);
continue;
}
if(nbytes <= 0) {
continue;
}
// 去掉最后一个\n(Flash Socket协议)
readBuffer.position((int)nbytes-1);
readBuffer.flip();
String msg=utfDecoder.decode(readBuffer).toString();
FileLogger.debug(channel.socket().getRemoteSocketAddress() + ": readIncomingMessages=" + msg);
// 检查是否为一次读取了多个报文
// 报文之间以"\n"分割
// 需要考虑读取到断报文的处理
String[] moreMsg=msg.split("\n");
for(int x=0; x < moreMsg.length; x++) {
String oneMsg=moreMsg[x];
if(oneMsg.equals(""))
continue;
// 检查是否为断报文: 是否以<msg开头, 以</msg>结束
if(!oneMsg.endsWith("</msg>") || !oneMsg.startsWith("<msg")) {
// 检查是否已经存在了断报文
StringBuffer oldXml=invalidXml.get(channel);
if(oldXml != null) {
// 已经存在, 则直接拼接报文
oldXml.append(oneMsg);
// 拼接后,检查报文是否已经完整
String newXml=oldXml.toString();
if(!oneMsg.endsWith("</msg>") || !oneMsg.startsWith("<msg")) {
invalidXml.put(channel,oldXml);
continue;
}else {
// 已完整,清除Map数据
oneMsg=newXml;
invalidXml.remove(channel);
}
}else {
// 不存在,则缓存
invalidXml.put(channel, new StringBuffer(oneMsg));
continue;
}
}
// 把消息入到队列
// 检查本session是否已经分配队列, 如果未分配, 需要new一个
int sAttr_start=oneMsg.indexOf("s='");
if(sAttr_start == -1)
continue;
sAttr_start += 3;
int sAttr_end=oneMsg.indexOf('\'', sAttr_start);
if(sAttr_end == -1)
continue;
String sessionID=oneMsg.substring(sAttr_start, sAttr_end);
LinkedList<String> q=(LinkedList<String>)channelEvents.get(sessionID);
if(q == null) {
// 分配一个LinkedList
q=new LinkedList<String>();
q.add(oneMsg);
channelEvents.put(sessionID, q);
}else {
synchronized(q) {
q.add(oneMsg);
}
}
// 并通过workersQueue队列通知eventwriter线程处理该session的消息
workersQueue.add(new GSession(sessionID, channel));
FileLogger.debug("sessionList size=" + q.size());
}
}catch(NotYetConnectedException nye) {
// If this channel is not yet connected
if(!key.isValid()) {
key.cancel();
key=null;
}
FileLogger.debug("readIncomingMessages NotYetConnectedException");
nye.printStackTrace();
}catch(AsynchronousCloseException ace) {
// If another thread closes this channel while the read operation is in progress
if(!key.isValid()) {
key.cancel();
key=null;
}
FileLogger.debug("readIncomingMessages AsynchronousCloseException");
ace.printStackTrace();
// 这种情况是eventwriter线程关闭了socketchannel, 其中已经做了lostConnetion, 无需重复操作
}catch(IllegalStateException ise) {
// If a decoding operation is already in progress
if(!key.isValid()) {
key.cancel();
key=null;
}
FileLogger.debug("readIncomingMessages IllegalStateException");
ise.printStackTrace();
}catch(Exception e) {
if(!key.isValid()) {
key.cancel();
key=null;
}
FileLogger.debug("readIncomingMessages Exception");
e.printStackTrace();
lostConnection(channel);
}
}
}catch(ClosedSelectorException cse) {
// If this selector is closed
FileLogger.error( "readIncomingMessages ClosedSelectorException: ");
cse.printStackTrace();
try{
// 重新打开
readSelector = Selector.open();
}catch(IOException ie) {
FileLogger.error( "readIncomingMessages readSelector = Selector.open() IOException ");
ie.printStackTrace();
}
}catch(IOException ioe) {
FileLogger.error( "readIncomingMessages I/O problems while reading socket > ");
ioe.printStackTrace();
}
}
public void writeOutgoingMessage() {
SelectionKey key = null;
Selector writeSelector = null;
int attempts = 0;
int bytesProduced = 0;
long writeTimeout=500;
// 获取待处理的session, 阻塞读取
GSession gSession=null;
try {
gSession=(GSession)workersQueue.take();
}catch(InterruptedException ie) {
return;
}
// 队列无待处理session, 直接释放本线程的时间片
if(gSession == null)
return;
String sessionID=gSession.sessionID;
SocketChannel socketChannel=gSession.channel;
ByteBuffer buffer=DirectBufferFactory.getBuffer();
if(buffer == null) {
// 如果没有取到buffer, 则在本线程new一个(以防万一)
buffer=ByteBuffer.allocateDirect(receiveBufferSize);
}
// 获取该channel待处理的events
try {
LinkedList q=channelEvents.get(sessionID);
synchronized(q) {
// 如果本session有消息, 则处理
while(q != null && q.size() != 0) {
String msg=(String)q.remove();
String ret=null;
if(!msg.startsWith(Broadcaster.BROADSYMBOL)){
MessagePacket packet=MessageParser.parseMessgae(msg);
if(packet.isValid()) {
// 根据查找表调用组件
ret=GCManager.executeLocal(packet, gSession);
}
// 非法报文
if(ret == null) {
continue;
//ret="<msg t='sls' s='" + packet.getSession() + "'><body action='0.0' r='0'></body></msg>";
}
if(ret.equals(""))
continue;
ret += "\n";
}else {
// 是广播消息, 直接发出
ret= msg.substring(2,msg.length());
}
System.out.println("ret=" + ret);
// 如果输出的报文未指定sessionID, 则补充上
if(ret.indexOf("s='") == -1) {
int spos=ret.indexOf(">");
ret=ret.substring(0, spos) + " s='" + sessionID + "'" + ret.substring(spos);
}
FileLogger.debug("socketChannel.write = " + ret);
buffer.clear();
buffer.put(utfEncoder.encode(ret));
buffer.flip();
/*
// Compress the bytes
zlibCompresser.reset();
zlibCompresser.setInput(ret.getBytes("UTF-8"));
zlibCompresser.finish();
int zlibLen=zlibCompresser.deflate(sendBytes);
FileLogger.debug("compressedDataLength=" + ret.length() + "/" + zlibLen);
// 每个报文的开头设置6位的报文压缩后的长度
String strLen=zlibLen + "";
for(int len=strLen.length(); len<HEADER_MAX_BYTES; len++) {
strLen="0" + strLen;
}
buffer.clear();
buffer.put(strLen.getBytes());
buffer.put(sendBytes, 0, zlibLen);
buffer.flip();
*/
while (buffer.hasRemaining()) {
int len = socketChannel.write(buffer);
FileLogger.debug("socketChannel.write(buffer) = " + len);
attempts++;
if (len < 0){
FileLogger.debug(" socketChannel.write(buffer) Exception lostConnection");
lostConnection(socketChannel);
}
bytesProduced += len;
if (len == 0) {
// client没有准备好接受数据
if (writeSelector == null){
writeSelector = SelectorFactory.getSelector();
if (writeSelector == null){
// Continue using the main one
continue;
}
}
key = socketChannel.register(writeSelector, key.OP_WRITE);
if (writeSelector.select(writeTimeout) == 0) {
if (attempts > 5) {
FileLogger.debug( " attempts > 5 lostConnection" );
// 如果write失败, 则准备以后再尝试
// lostConnection(socketChannel);
break;
}
} else {
attempts--;
}
} else {
attempts = 0;
}
}
}
}
}catch(Exception e) {
FileLogger.debug(" writeOutgoingMessage Exception lostConnection");
e.printStackTrace();
// lostConnection(socketChannel);
} finally {
if (key != null) {
key.cancel();
key = null;
}
if(buffer != null)
DirectBufferFactory.returnBuffer(buffer);
if (writeSelector != null) {
try {
writeSelector.selectNow();
SelectorFactory.returnSelector(writeSelector);
}catch(Exception ee) {
ee.printStackTrace();
}
}
}
}
public void lostConnection(SocketChannel sc) {
// 如果是rserver断开连接
RServerFactory.removeRServer(sc);
// 通知(socket断开事件)接入服务器, 与本世界服务器的连接中断, 需要重新连接, 并释放相关session与本世界服务器的对应关系
String ip=null;
if(sc.isConnected()) {
ip = sc.socket().getInetAddress().toString();
FileLogger.info(sc + " closed ");
try{
sc.close();
}catch(Exception e) {
e.printStackTrace();
}
}
clients.remove(sc);
}
public static GameServer getInstance() {
if(instance == null){
instance=new GameServer();
}
return instance;
}
private void welcomeMessage() {
System.out.println("|------------------------------------------------------------|");
System.out.println("| Handjoys Game Server |");
System.out.println("| Multiplayer Socket Server |");
System.out.println("| version 1.1 |");
System.out.println("| --- |");
System.out.println("| (c) 2007 - 2009 handjoys() |");
System.out.println("| www.handjoys.com |");
System.out.println("|------------------------------------------------------------|\n");
}
private void showSystemInfo() {
List<String> props = new ArrayList<String>();
FileLogger.info("--- [ System Info ] ---\n");
props.add("os.name");
props.add("os.arch");
props.add("os.version");
props.add("java.version");
props.add("java.vendor");
props.add("java.vendor.url");
props.add("java.vm.specification.version");
props.add("java.vm.version");
props.add("java.vm.vendor");
props.add("java.vm.name");
Runtime rt = Runtime.getRuntime();
FileLogger.info("System CPU(s): " + rt.availableProcessors());
FileLogger.info("VM Max memory: " + rt.maxMemory() / 0xf4240L + " MB");
String prop;
for(Iterator i = props.iterator(); i.hasNext(); FileLogger.info(prop + ": " + System.getProperty(prop)))
prop = (String)i.next();
FileLogger.info("\n\n--- [ Network Cards ] ---\n");
try {
for(Enumeration list = NetworkInterface.getNetworkInterfaces(); list.hasMoreElements();){
NetworkInterface iFace = (NetworkInterface)list.nextElement();
FileLogger.info("Card:" + iFace.getDisplayName());
InetAddress adr;
for(Enumeration addresses = iFace.getInetAddresses(); addresses.hasMoreElements(); FileLogger.info(" -> " + adr.getHostAddress()))
adr = (InetAddress)addresses.nextElement();
}
}catch(Exception se){
FileLogger.error("Failed discovering network cards!");
System.exit(1);
}
}
private void loadStartUp() {
Vector v=(Vector)ConfigReader.getParam(ConfigParam.MODULES);
FileLogger.info("\n\n--- [ StartUp Object Init ] ---\n");
try{
for(int i=0; i<v.size(); i++) {
String clsName=(String)v.elementAt(i);
Class cls = Class.forName(clsName);
Startup startUp=(Startup)cls.newInstance();
startUp.start();
startups.add(startUp);
}
}catch(ClassNotFoundException cfe) {
FileLogger.error("Failed load Startup class!");
cfe.printStackTrace();
System.exit(1);
}catch(InstantiationException ie) {
FileLogger.error("Failed load Startup class!");
ie.printStackTrace();
System.exit(1);
}catch(IllegalAccessException iae) {
FileLogger.error("Failed load Startup class!");
iae.printStackTrace();
System.exit(1);
}
}
public void initSocketServer() {
String hostName=(String)ConfigReader.getParam(ConfigParam.SERVERIP);
int port=((Integer)(ConfigReader.getParam(ConfigParam.SERVERPORT))).intValue();
int backlog=((Integer)(ConfigReader.getParam(ConfigParam.BACKLOG))).intValue();
try {
sSockChan = ServerSocketChannel.open();
sSockChan.configureBlocking(false);
InetAddress addr = InetAddress.getByName(hostName);
// 设置缓冲区大小
sSockChan.socket().setReceiveBufferSize(receiveBufferSize);
sSockChan.socket().setReuseAddress(true);
sSockChan.socket().bind(new InetSocketAddress(addr, port), backlog);
readSelector = Selector.open();
acceptSelector = Selector.open();
acceptKey = sSockChan.register(acceptSelector, SelectionKey.OP_ACCEPT);
FileLogger.info("\n\n--- [ Server Starting ] ---\n");
FileLogger.info("Server address: " + addr.getHostAddress());
FileLogger.info("Server port : " + port + "\n");
serverStartTime = System.currentTimeMillis();
}catch(Exception e) {
FileLogger.error("\n\n[ --> FATAL ERROR <-- ]: Error initializing server.\n\nCheck if server address and port are properly configured.\n");
FileLogger.error("Exception caught in " + Thread.currentThread().getName());
System.exit(1);
}
}
public void addBroadEvent(String msg, GSession session) {
// 把消息入到eventwriter线程处理队列
LinkedList<String> q=(LinkedList<String>)channelEvents.get(session.sessionID);
if(q != null) {
q.add(msg + "\n");
}
// 并通过workersQueue队列通知eventwriter线程处理该Session的消息
workersQueue.add(session);
}
public void initRServer(String msg, GSession session) {
clients.add(session.channel);
LinkedList<String> q=(LinkedList<String>)channelEvents.get(session.sessionID);
if(q == null) {
// 分配一个LinkedList
q=new LinkedList<String>();
channelEvents.put(session.sessionID, q);
}
try {
// 注册事件
session.channel.configureBlocking(false);
// 禁用 TCP Nagle 算法, 最小化报文传输的延时
session.channel.socket().setTcpNoDelay(true);
SelectionKey skey=session.channel.register(readSelector, SelectionKey.OP_READ, null);
}catch(Exception e) {
e.printStackTrace();
}
if(msg != null)
Broadcaster.write(msg, session);
}
public String printBuffer(ByteBuffer readBuffer) throws UnsupportedEncodingException {
ByteBuffer test=readBuffer.duplicate();
int c=0;
try{
while(true){
receiveBytes[c++]=test.get();
}
}catch(BufferUnderflowException ee) {
// 结束
}
return new String(receiveBytes, 0, c-1, "utf-8");
}
public Queue getChannels() {
return clients;
}
// 世界服务器连接: 通知其他世界服务器该玩家宠物登陆
public void acceptNewPlayer(Long playerID, SocketChannel channel) {
}
}