diff --git a/pom.xml b/pom.xml index ede0210..007532b 100644 --- a/pom.xml +++ b/pom.xml @@ -38,10 +38,10 @@ maven-compiler-plugin - 2.3.2 + 3.3 - 1.5 - 1.5 + 1.8 + 1.8 @@ -91,27 +91,18 @@ - - - repository.jboss.org - https://repository.jboss.org/nexus/content/groups/public/ - - false - - - org.msgpack msgpack - 0.6.6 + 0.6.12 compile - org.jboss.netty - netty - 3.2.1.Final + io.netty + netty-all + 4.0.28.Final javax.servlet diff --git a/src/main/java/org/msgpack/rpc/Request.java b/src/main/java/org/msgpack/rpc/Request.java index a0007e1..f82d7f1 100644 --- a/src/main/java/org/msgpack/rpc/Request.java +++ b/src/main/java/org/msgpack/rpc/Request.java @@ -15,15 +15,32 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc; -import org.msgpack.type.Value; import org.msgpack.rpc.message.ResponseMessage; import org.msgpack.rpc.transport.MessageSendable; +import org.msgpack.type.Value; public class Request implements Callback { - private MessageSendable channel; // TODO #SF synchronized? - private int msgid; + protected MessageSendable channel; // TODO #SF synchronized? + protected int msgid; private String method; private Value args; diff --git a/src/main/java/org/msgpack/rpc/Server.java b/src/main/java/org/msgpack/rpc/Server.java index ba027d5..bf178dc 100644 --- a/src/main/java/org/msgpack/rpc/Server.java +++ b/src/main/java/org/msgpack/rpc/Server.java @@ -15,27 +15,41 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc; import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import org.msgpack.rpc.address.IPAddress; import org.msgpack.rpc.builder.DefaultDispatcherBuilder; import org.msgpack.rpc.builder.DispatcherBuilder; -import org.msgpack.rpc.reflect.Reflect; -import org.msgpack.type.NilValue; -import org.msgpack.type.Value; -import org.msgpack.rpc.address.IPAddress; -import org.msgpack.rpc.dispatcher.Dispatcher; -import org.msgpack.rpc.dispatcher.MethodDispatcher; import org.msgpack.rpc.config.ClientConfig; import org.msgpack.rpc.config.ServerConfig; import org.msgpack.rpc.config.TcpServerConfig; -import org.msgpack.rpc.transport.ServerTransport; -import org.msgpack.rpc.transport.MessageSendable; -import org.msgpack.rpc.loop.EventLoop; +import org.msgpack.rpc.dispatcher.Dispatcher; import org.msgpack.rpc.error.RPCError; +import org.msgpack.rpc.loop.EventLoop; +import org.msgpack.rpc.transport.MessageSendable; +import org.msgpack.rpc.transport.ServerTransport; +import org.msgpack.type.Value; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +57,7 @@ public class Server extends SessionPool { private final static Logger logger = LoggerFactory.getLogger(Server.class); - private Dispatcher dp; + protected Dispatcher dp; private ServerTransport stran; private DispatcherBuilder dispatcherBuilder = new DefaultDispatcherBuilder(); diff --git a/src/main/java/org/msgpack/rpc/Session.java b/src/main/java/org/msgpack/rpc/Session.java index b3cf986..959a5c4 100644 --- a/src/main/java/org/msgpack/rpc/Session.java +++ b/src/main/java/org/msgpack/rpc/Session.java @@ -15,35 +15,53 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc; -import java.util.List; import java.util.ArrayList; -import java.util.Map; import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; + import org.msgpack.rpc.address.Address; -import org.msgpack.rpc.message.RequestMessage; +import org.msgpack.rpc.config.ClientConfig; +import org.msgpack.rpc.loop.EventLoop; import org.msgpack.rpc.message.NotifyMessage; +import org.msgpack.rpc.message.RequestMessage; import org.msgpack.rpc.reflect.Reflect; import org.msgpack.rpc.transport.ClientTransport; -import org.msgpack.rpc.config.ClientConfig; -import org.msgpack.rpc.loop.EventLoop; import org.msgpack.type.Value; import org.msgpack.type.ValueFactory; public class Session { protected Address address; protected EventLoop loop; - private ClientTransport transport; + protected ClientTransport transport; private Reflect reflect; private int requestTimeout; - private AtomicInteger seqid = new AtomicInteger(0); // FIXME rand()? - private Map reqtable = new HashMap(); + protected AtomicInteger seqid = new AtomicInteger(0); // FIXME rand()? + protected Map reqtable = new HashMap(); Session(Address address, ClientConfig config, EventLoop loop) { this(address,config,loop,new Reflect(loop.getMessagePack())); @@ -141,16 +159,14 @@ void closeSession() { } public void transportConnectFailed() { // FIXME error rseult - /* synchronized(reqtable) { for(Map.Entry pair : reqtable.entrySet()) { // FIXME FutureImpl f = pair.getValue(); - f.setResult(null,null); + f.setResult(null,ValueFactory.createRawValue("Fail to connect")); } reqtable.clear(); } - */ } public void onResponse(int msgid, Value result, Value error) { @@ -183,4 +199,15 @@ void stepTimeout() { f.setResult(null, ValueFactory.createRawValue("timedout")); } } + + public void transportError(String msg){ + synchronized(reqtable) { + for(Map.Entry pair : reqtable.entrySet()) { + // FIXME + FutureImpl f = pair.getValue(); + f.setResult(null,ValueFactory.createRawValue(msg)); + } + reqtable.clear(); + } + } } diff --git a/src/main/java/org/msgpack/rpc/config/ClientConfig.java b/src/main/java/org/msgpack/rpc/config/ClientConfig.java index 1df431f..695e1d3 100644 --- a/src/main/java/org/msgpack/rpc/config/ClientConfig.java +++ b/src/main/java/org/msgpack/rpc/config/ClientConfig.java @@ -15,13 +15,32 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.config; -import java.util.HashMap; +import io.netty.channel.ChannelOption; + import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public abstract class ClientConfig { - private Map options = new HashMap(); + private Map,Object> options = new ConcurrentHashMap<>(); protected int requestTimeout = 30; // FIXME default timeout time public void setRequestTimeout(int sec) { @@ -32,15 +51,16 @@ public int getRequestTimeout() { return this.requestTimeout; } - public Object getOption(String key) { - return options.get(key); + @SuppressWarnings("unchecked") + public T getOption(ChannelOption key) { + return (T)options.get(key); } - public Map getOptions() { + public Map,Object> getOptions() { return options; } - public void setOption(String key, Object value) { + public void setOption(ChannelOption key, T value) { options.put(key, value); } } diff --git a/src/main/java/org/msgpack/rpc/extension/socks/SocksProxyHandler.java b/src/main/java/org/msgpack/rpc/extension/socks/SocksProxyHandler.java new file mode 100644 index 0000000..af8a426 --- /dev/null +++ b/src/main/java/org/msgpack/rpc/extension/socks/SocksProxyHandler.java @@ -0,0 +1,110 @@ +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + +package org.msgpack.rpc.extension.socks; + +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.socks.SocksAddressType; +import io.netty.handler.codec.socks.SocksAuthScheme; +import io.netty.handler.codec.socks.SocksCmdRequest; +import io.netty.handler.codec.socks.SocksCmdResponse; +import io.netty.handler.codec.socks.SocksCmdResponseDecoder; +import io.netty.handler.codec.socks.SocksCmdStatus; +import io.netty.handler.codec.socks.SocksCmdType; +import io.netty.handler.codec.socks.SocksInitRequest; +import io.netty.handler.codec.socks.SocksResponse; +import io.netty.handler.codec.socks.SocksResponseType; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.msgpack.rpc.Session; +import org.msgpack.rpc.address.IPAddress; +import org.msgpack.rpc.loop.netty.NettyTcpClientTransport; + +/** + * SocksProxy用Handler. + * Socks5/認証無し専用 + * @author kishimoto + * + */ +@ChannelHandler.Sharable +public class SocksProxyHandler extends SimpleChannelInboundHandler { + + private Session session = null; + private NettyTcpClientTransport clientTransport = null; + + public SocksProxyHandler() { + super(); + } + + public SocksProxyHandler(Session session,NettyTcpClientTransport clientTransport) { + super(); + this.session = session; + this.clientTransport = clientTransport; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, SocksResponse response) throws Exception { + + SocksResponseType rt = response.responseType(); + + switch (rt) { + + case INIT: + ctx.pipeline().addFirst("socks-cmd-decoder", new SocksCmdResponseDecoder()); + InetSocketAddress addr = ((IPAddress) session.getAddress()).getInetSocketAddress(); + SocksCmdRequest cmdSocks = new SocksCmdRequest(SocksCmdType.CONNECT, SocksAddressType.DOMAIN, addr.getHostName(), addr.getPort()); + ctx.writeAndFlush(cmdSocks); + break; + + case AUTH: + break; + case CMD: + SocksCmdResponse scr = (SocksCmdResponse) response; + ctx.pipeline().remove(this); + ctx.pipeline().remove("socks-encode"); + if (scr.cmdStatus() != SocksCmdStatus.SUCCESS) { + throw new ChannelException("Socks faild."); + } + clientTransport.onSocksConnected(ctx.channel()); + break; + case UNKNOWN: + default: + throw new ChannelException("No support Socks Command."); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + List lstAuth = new ArrayList<>(); + lstAuth.add(SocksAuthScheme.NO_AUTH); + SocksInitRequest si = new SocksInitRequest(lstAuth); + ctx.writeAndFlush(si); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + clientTransport.onError(ctx.channel(), cause.toString()); + } + +} diff --git a/src/main/java/org/msgpack/rpc/loop/EventLoop.java b/src/main/java/org/msgpack/rpc/loop/EventLoop.java index 7a500b7..1d8fc16 100644 --- a/src/main/java/org/msgpack/rpc/loop/EventLoop.java +++ b/src/main/java/org/msgpack/rpc/loop/EventLoop.java @@ -15,24 +15,41 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop; import java.io.IOException; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.msgpack.MessagePack; -import org.msgpack.rpc.Session; import org.msgpack.rpc.Server; -import org.msgpack.rpc.transport.ClientTransport; -import org.msgpack.rpc.transport.ServerTransport; +import org.msgpack.rpc.Session; import org.msgpack.rpc.config.ClientConfig; import org.msgpack.rpc.config.ServerConfig; import org.msgpack.rpc.config.TcpClientConfig; import org.msgpack.rpc.config.TcpServerConfig; import org.msgpack.rpc.loop.netty.NettyEventLoopFactory; +import org.msgpack.rpc.transport.ClientTransport; +import org.msgpack.rpc.transport.ServerTransport; public abstract class EventLoop { static private EventLoopFactory loopFactory; @@ -119,9 +136,9 @@ public ScheduledExecutorService getScheduledExecutor() { } public void shutdown() { - scheduledExecutor.shutdown(); - ioExecutor.shutdown(); - workerExecutor.shutdown(); + scheduledExecutor.shutdownNow(); + ioExecutor.shutdownNow(); + workerExecutor.shutdownNow(); } public void join() throws InterruptedException { diff --git a/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java b/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java index 41f2c5a..35b7934 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java @@ -15,24 +15,57 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.Channels; +import io.netty.channel.ChannelHandlerContext; + +import java.util.concurrent.atomic.AtomicLong; + import org.msgpack.rpc.transport.ClientTransport; class ChannelAdaptor implements ClientTransport { - private Channel channel; + private static final int FLUSH_SPAN_MS = 100; //FLUSH_SPAN_MS 以下の物は、Streamingでも一気に返す。 + private ChannelHandlerContext ctx; + private AtomicLong timeLastFlush = new AtomicLong(System.currentTimeMillis() + FLUSH_SPAN_MS); + + ChannelAdaptor(ChannelHandlerContext ctx) { + this.ctx = ctx; + } - ChannelAdaptor(Channel channel) { - this.channel = channel; - } + @Override + public void sendMessage(Object msg) { + ctx.writeAndFlush(msg); + } - public void sendMessage(Object msg) { - Channels.write(channel, msg); - } + @Override + public void close() { + ctx.flush(); + ctx.close(); + } - public void close() { - channel.close(); - } + @Override + public void sendDataDelay(Object obj) { + ctx.write(obj); + if(timeLastFlush.get() < System.currentTimeMillis()){ + ctx.flush(); + timeLastFlush.set((System.currentTimeMillis() + FLUSH_SPAN_MS)); + } + } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java b/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java index 5141b5c..179e5e7 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java @@ -15,39 +15,65 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop.netty; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import java.util.Objects; + import org.msgpack.rpc.transport.RpcMessageHandler; import org.msgpack.type.Value; -class MessageHandler extends SimpleChannelUpstreamHandler { +class MessageHandler extends SimpleChannelInboundHandler { private RpcMessageHandler handler; private ChannelAdaptor adaptor; + private NettyTcpClientTransport clientTransport = null; MessageHandler(RpcMessageHandler handler) { this.handler = handler; } - @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - this.adaptor = new ChannelAdaptor(e.getChannel()); - ctx.sendUpstream(e); - } + public MessageHandler(RpcMessageHandler handler,NettyTcpClientTransport clientTransport) { + this.handler = handler; + this.clientTransport = clientTransport; - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - Object m = e.getMessage(); - if (!(m instanceof Value)) { - ctx.sendUpstream(e); - return; - } + } - Value msg = (Value) m; + @Override + protected void channelRead0(ChannelHandlerContext ctx, Value msg) throws Exception { handler.handleMessage(adaptor, msg); - } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + this.adaptor = new ChannelAdaptor(ctx); + clientTransport.onConnected(ctx.channel()); + } + + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if(Objects.nonNull(clientTransport)){ + clientTransport.onError(ctx.channel(), cause.getMessage()); + } + } + } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java index f977328..ab0559c 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java @@ -15,17 +15,36 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop.netty; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + import java.nio.ByteBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; +import java.util.List; + import org.msgpack.MessagePack; import org.msgpack.type.Value; -public class MessagePackDecoder extends OneToOneDecoder { +public class MessagePackDecoder extends ByteToMessageDecoder { MessagePack messagePack; @@ -34,18 +53,15 @@ public MessagePackDecoder(MessagePack messagePack) { this.messagePack = messagePack; } - @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { - if (!(msg instanceof ChannelBuffer)) { - return msg; - } - ChannelBuffer source = (ChannelBuffer) msg; + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List result) throws Exception { + // TODO 自動生成されたメソッド・スタブ - ByteBuffer buffer = source.toByteBuffer(); + + ByteBuffer buffer = msg.nioBuffer(); if (!buffer.hasRemaining()) { - return null; + return; } byte[] bytes = buffer.array(); // FIXME buffer must has array @@ -53,12 +69,6 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, int length = buffer.arrayOffset() + buffer.limit(); Value v = messagePack.read(bytes, offset, length); - return v; - - // TODO MessagePack.unpack() - /* - * Unpacker pac = new Unpacker(); pac.wrap(bytes, offset, length); - * return pac.unpackObject(); - */ - } + result.add(v); + } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java index 2a8cc27..c0ec8eb 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java @@ -15,17 +15,31 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.MessageToByteEncoder; + import org.msgpack.MessagePack; -public class MessagePackEncoder extends OneToOneEncoder { +public class MessagePackEncoder extends MessageToByteEncoder { private final int estimatedLength; private MessagePack messagePack; @@ -39,21 +53,11 @@ public MessagePackEncoder(int estimatedLength, MessagePack messagePack) { this.messagePack = messagePack; } - @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { - if (msg instanceof ChannelBuffer) { - return msg; - } + @Override + protected void encode(io.netty.channel.ChannelHandlerContext ctx, Object msg, ByteBuf paramByteBuf) throws Exception { - ChannelBufferOutputStream out = new ChannelBufferOutputStream( - ChannelBuffers.dynamicBuffer(estimatedLength, ctx.getChannel() - .getConfig().getBufferFactory())); + byte[] buff = messagePack.write(msg); + paramByteBuf.writeBytes(buff); - // MessagePack.pack(out, msg); - messagePack.write(out, msg); - - ChannelBuffer result = out.buffer(); - return result; - } + } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java index edfdb7e..baf12d5 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java @@ -15,53 +15,63 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop.netty; -import java.io.ByteArrayInputStream; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + import java.io.EOFException; import java.nio.ByteBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.handler.codec.frame.FrameDecoder; +import java.util.List; + import org.msgpack.MessagePack; import org.msgpack.type.Value; import org.msgpack.unpacker.Unpacker; -public class MessagePackStreamDecoder extends FrameDecoder { - protected MessagePack msgpack; +public class MessagePackStreamDecoder extends ByteToMessageDecoder { + protected MessagePack msgpack; + + public MessagePackStreamDecoder(MessagePack msgpack) { + super(); + this.msgpack = msgpack; + } - public MessagePackStreamDecoder(MessagePack msgpack) { - super(); - this.msgpack = msgpack; - } + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List paramList) throws Exception { - @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer source) throws Exception { - // TODO #MN will modify the body with MessagePackBufferUnpacker. - ByteBuffer buffer = source.toByteBuffer(); - if (!buffer.hasRemaining()) { - return null; - } - source.markReaderIndex(); + if (msg.isReadable()) { + ByteBuffer buffer = msg.nioBuffer(); + Unpacker unpacker = msgpack.createBufferUnpacker(buffer); + int lastPos = 0; + try { + while (buffer.position() < buffer.limit()) { + Value v = unpacker.readValue(); + paramList.add(v); + lastPos = buffer.position(); + } + msg.skipBytes(lastPos); + } catch (EOFException e) { + msg.skipBytes(lastPos); + } + } + } - byte[] bytes = buffer.array(); // FIXME buffer must has array - int offset = buffer.arrayOffset() + buffer.position(); - int length = buffer.arrayOffset() + buffer.limit(); - ByteArrayInputStream stream = new ByteArrayInputStream(bytes, offset, - length); - int startAvailable = stream.available(); - try{ - Unpacker unpacker = msgpack.createUnpacker(stream); - Value v = unpacker.readValue(); - source.skipBytes(startAvailable - stream.available()); - return v; - }catch( EOFException e ){ - // not enough buffers. - // So retry reading - source.resetReaderIndex(); - return null; - } - } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java index cf71a25..8c42e67 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java @@ -15,58 +15,57 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop.netty; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.ServerSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; + import org.msgpack.MessagePack; -import org.msgpack.rpc.Session; import org.msgpack.rpc.Server; +import org.msgpack.rpc.Session; +import org.msgpack.rpc.config.TcpClientConfig; +import org.msgpack.rpc.config.TcpServerConfig; import org.msgpack.rpc.loop.EventLoop; -import org.msgpack.rpc.transport.ServerTransport; import org.msgpack.rpc.transport.ClientTransport; -import org.msgpack.rpc.config.TcpServerConfig; -import org.msgpack.rpc.config.TcpClientConfig; +import org.msgpack.rpc.transport.RpcMessageHandler; +import org.msgpack.rpc.transport.ServerTransport; public class NettyEventLoop extends EventLoop { - public NettyEventLoop(ExecutorService workerExecutor, - ExecutorService ioExecutor, - ScheduledExecutorService scheduledExecutor, MessagePack messagePack) { - super(workerExecutor, ioExecutor, scheduledExecutor, messagePack); - } - private ClientSocketChannelFactory clientFactory = null; - private ServerSocketChannelFactory serverFactory = null; + private Class handler = RpcMessageHandler.class; - public synchronized ClientSocketChannelFactory getClientFactory() { - if (clientFactory == null) { - clientFactory = new NioClientSocketChannelFactory(getIoExecutor(), - getWorkerExecutor()); // TODO: workerCount - } - return clientFactory; - } + public NettyEventLoop(ExecutorService workerExecutor, ExecutorService ioExecutor, ScheduledExecutorService scheduledExecutor, MessagePack messagePack) { + super(workerExecutor, ioExecutor, scheduledExecutor, messagePack); + } - public synchronized ServerSocketChannelFactory getServerFactory() { - if (serverFactory == null) { - serverFactory = new NioServerSocketChannelFactory(getIoExecutor(), - getWorkerExecutor()); // TODO: workerCount - // messages will be dispatched to worker thread on server. - // see useThread(true) in NettyTcpClientTransport(). - } - return serverFactory; - } + public NettyEventLoop(ExecutorService workerExecutor, ExecutorService ioExecutor, + ScheduledExecutorService scheduledExecutor, MessagePack messagePack, Class handlerClass) { + this(workerExecutor, ioExecutor, scheduledExecutor, messagePack); + this.handler = handlerClass; + } - protected ClientTransport openTcpTransport(TcpClientConfig config, - Session session) { - return new NettyTcpClientTransport(config, session, this); - } + protected ClientTransport openTcpTransport(TcpClientConfig config, Session session) { + return new NettyTcpClientTransport(config, session, this, handler); + } - protected ServerTransport listenTcpTransport(TcpServerConfig config, - Server server) { - return new NettyTcpServerTransport(config, server, this); - } + protected ServerTransport listenTcpTransport(TcpServerConfig config, Server server) { + return new NettyTcpServerTransport(config, server, this, handler); + } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoopFactory.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoopFactory.java index 1e715aa..218d3a0 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoopFactory.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoopFactory.java @@ -15,6 +15,23 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop.netty; import java.util.concurrent.ExecutorService; @@ -25,13 +42,10 @@ import org.msgpack.rpc.loop.EventLoopFactory; public class NettyEventLoopFactory implements EventLoopFactory { - public NettyEventLoopFactory() { - } + public NettyEventLoopFactory() { + } - public EventLoop make(ExecutorService workerExecutor, - ExecutorService ioExecutor, - ScheduledExecutorService scheduledExecutor, MessagePack messagePack) { - return new NettyEventLoop(workerExecutor, ioExecutor, - scheduledExecutor, messagePack); - } + public EventLoop make(ExecutorService workerExecutor, ExecutorService ioExecutor, ScheduledExecutorService scheduledExecutor, MessagePack messagePack) { + return new NettyEventLoop(workerExecutor, ioExecutor, scheduledExecutor, messagePack); + } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java index b9115fd..e7accb2 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java @@ -15,103 +15,181 @@ // See the License for the specific language governing permissions and // limitations under the License. // + +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop.netty; -import java.util.Map; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.socks.SocksInitResponseDecoder; +import io.netty.handler.codec.socks.SocksMessageEncoder; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.net.SocketAddress; +import java.net.UnknownHostException; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.HeapChannelBufferFactory; -import org.jboss.netty.bootstrap.ClientBootstrap; import org.msgpack.rpc.Session; +import org.msgpack.rpc.address.IPAddress; import org.msgpack.rpc.config.TcpClientConfig; -import org.msgpack.rpc.transport.RpcMessageHandler; +import org.msgpack.rpc.extension.socks.SocksProxyHandler; import org.msgpack.rpc.transport.PooledStreamClientTransport; +import org.msgpack.rpc.transport.RpcMessageHandler; + +public class NettyTcpClientTransport extends PooledStreamClientTransport { + private static final InternalLogger LOG = InternalLoggerFactory.getInstance(NettyTcpClientTransport.class); + private final Bootstrap bootstrap; + private final EventLoopGroup group = new NioEventLoopGroup(maxChannel); + private RpcMessageHandler handler = null; + private NettyEventLoop eventLoop = null; + + NettyTcpClientTransport(TcpClientConfig config, Session session, NettyEventLoop loop, Class rpcHandlerClass) { + super(config, session); + + try { + handler = rpcHandlerClass.getConstructor(Session.class).newInstance(session); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // handler = new RpcMessageHandlerEx(session); + eventLoop = loop; + bootstrap = new Bootstrap().group(group); + bootstrap.channel(NioSocketChannel.class); + final NettyTcpClientTransport trans = this; + bootstrap.handler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (isSocks) { + p.addFirst("socks-handler", new SocksProxyHandler(session, trans)); + p.addFirst("socks-encode", new SocksMessageEncoder()); + p.addFirst("socks-decode", new SocksInitResponseDecoder()); + } + p.addLast("msgpack-decode-stream", new MessagePackStreamDecoder(eventLoop.getMessagePack())); + p.addLast("msgpack-encode", new MessagePackEncoder(eventLoop.getMessagePack())); + p.addLast("message", new MessageHandler(handler, trans)); + } + + }); + bootstrap.option(ChannelOption.TCP_NODELAY, true); + } + + @Override + protected ByteBufOutputStream newPendingBuffer() { + return new ByteBufOutputStream(Unpooled.buffer()); + } + + @Override + protected void sendMessageChannel(Channel c, Object msg) { + c.writeAndFlush(msg); + } + + @Override + protected void closeChannel(Channel c) { + c.close(); + } + + @Override + protected void resetPendingBuffer(ByteBufOutputStream b) { + b.buffer().clear(); + + } + + @Override + protected void flushPendingBuffer(ByteBufOutputStream b, Channel c) { + c.writeAndFlush(b.buffer()); + b.buffer().clear(); + + } + + @Override + protected void closePendingBuffer(ByteBufOutputStream b) { + b.buffer().clear(); + + } + + @Override + public void close() { + super.close(); + group.shutdownGracefully(); + } + + @Override + protected void startConnection() { + + try { + SocketAddress addr = (isSocks) ? + new IPAddress(System.getProperty("socksProxyHost", "localhost"), Integer.valueOf(System.getProperty("socksProxyPort", "0"))).getSocketAddress() : + session.getAddress().getSocketAddress(); + + ChannelFuture f = bootstrap.connect(addr); + + f.addListener((ChannelFutureListener) future -> { + + if (!future.isSuccess()) { + onConnectFailed(future.channel(), future.cause()); + return; + } + + Channel c = f.channel(); + + c.closeFuture().addListener((ChannelFutureListener) closeFt -> { + Channel ch = closeFt.channel(); + onClosed(ch); + }); + + // MessageChannel mc = (MessageChannel) f.channel(); + + if (isSocks) { + // mc.setUseSocksProxy(isSocks); + LOG.debug("--- useSocksProxy ----"); + LOG.debug(System.getProperty("socksProxyHost")); + LOG.debug(System.getProperty("socksProxyPort")); + + } + }); + + } catch (NumberFormatException e) { + // TODO 自動生成された catch ブロック + e.printStackTrace(); + } catch (UnknownHostException e) { + // TODO 自動生成された catch ブロック + e.printStackTrace(); + } + } + + public void onSocksConnected(Channel c) { + ChannelPipeline p = c.pipeline(); + p.fireChannelActive(); + } -class NettyTcpClientTransport extends PooledStreamClientTransport { - private static final String TCP_NO_DELAY = "tcpNoDelay"; - - private final ClientBootstrap bootstrap; - - NettyTcpClientTransport(TcpClientConfig config, Session session, - NettyEventLoop loop) { - // TODO check session.getAddress() instanceof IPAddress - super(config, session); - - RpcMessageHandler handler = new RpcMessageHandler(session); - - bootstrap = new ClientBootstrap(loop.getClientFactory()); - bootstrap.setPipelineFactory(new StreamPipelineFactory(loop.getMessagePack(), handler)); - Map options = config.getOptions(); - setIfNotPresent(options, TCP_NO_DELAY, Boolean.TRUE, bootstrap); - bootstrap.setOptions(options); - } - - private final ChannelFutureListener connectListener = new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - onConnectFailed(future.getChannel(), future.getCause()); - return; - } - Channel c = future.getChannel(); - c.getCloseFuture().addListener(closeListener); - onConnected(c); - } - }; - - private final ChannelFutureListener closeListener = new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) throws Exception { - Channel c = future.getChannel(); - onClosed(c); - } - }; - - @Override - protected void startConnection() { - ChannelFuture f = bootstrap.connect(session.getAddress().getSocketAddress()); - f.addListener(connectListener); - } - - @Override - protected ChannelBufferOutputStream newPendingBuffer() { - return new ChannelBufferOutputStream( - ChannelBuffers.dynamicBuffer(HeapChannelBufferFactory.getInstance())); - } - - @Override - protected void resetPendingBuffer(ChannelBufferOutputStream b) { - b.buffer().clear(); - } - - @Override - protected void flushPendingBuffer(ChannelBufferOutputStream b, Channel c) { - Channels.write(c, b.buffer()); - b.buffer().clear(); - } - - @Override - protected void closePendingBuffer(ChannelBufferOutputStream b) { - b.buffer().clear(); - } - - @Override - protected void sendMessageChannel(Channel c, Object msg) { - Channels.write(c, msg); - } - - @Override - protected void closeChannel(Channel c) { - c.close(); - } - - private static void setIfNotPresent(Map options, - String key, Object value, ClientBootstrap bootstrap) { - if (!options.containsKey(key)) { - bootstrap.setOption(key, value); - } - } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java index cab6cfe..9dcbd48 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java @@ -15,49 +15,86 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.loop.netty; -import java.util.Map; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.bootstrap.ServerBootstrap; import org.msgpack.rpc.Server; +import org.msgpack.rpc.address.Address; import org.msgpack.rpc.config.TcpServerConfig; import org.msgpack.rpc.transport.RpcMessageHandler; import org.msgpack.rpc.transport.ServerTransport; -import org.msgpack.rpc.address.Address; class NettyTcpServerTransport implements ServerTransport { - private Channel listenChannel; - private final static String CHILD_TCP_NODELAY = "child.tcpNoDelay"; - private final static String REUSE_ADDRESS = "reuseAddress"; - - NettyTcpServerTransport(TcpServerConfig config, Server server, NettyEventLoop loop) { - if (server == null) { - throw new IllegalArgumentException("Server must not be null"); - } - - Address address = config.getListenAddress(); - RpcMessageHandler handler = new RpcMessageHandler(server); - handler.useThread(true); - - ServerBootstrap bootstrap = new ServerBootstrap(loop.getServerFactory()); - bootstrap.setPipelineFactory(new StreamPipelineFactory(loop.getMessagePack(), handler)); - final Map options = config.getOptions(); - setIfNotPresent(options, CHILD_TCP_NODELAY, Boolean.TRUE, bootstrap); - setIfNotPresent(options, REUSE_ADDRESS, Boolean.TRUE, bootstrap); - bootstrap.setOptions(options); - this.listenChannel = bootstrap.bind(address.getSocketAddress()); - } - - public void close() { - listenChannel.close(); - } - - private static void setIfNotPresent(Map options, - String key, Object value, ServerBootstrap bootstrap) { - if (!options.containsKey(key)) { - bootstrap.setOption(key, value); - } - } + private ChannelFuture future = null; + private final EventLoopGroup bossGroup = new NioEventLoopGroup(2); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(8); + + NettyTcpServerTransport(TcpServerConfig config, Server server, NettyEventLoop loop, Class rpcHandlerClass) { + if (server == null) { + throw new IllegalArgumentException("Server must not be null"); + } + + Address address = config.getListenAddress(); + + try { + RpcMessageHandler handler = rpcHandlerClass.getConstructor(Server.class).newInstance(server); + handler.useThread(true); + + ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class); + + bootstrap.childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast("msgpack-decode-stream", new MessagePackStreamDecoder(loop.getMessagePack())); + p.addLast("msgpack-encode", new MessagePackEncoder(loop.getMessagePack())); + p.addLast("message", new MessageHandler(handler)); + } + + }); + bootstrap.childOption(ChannelOption.TCP_NODELAY, true); + bootstrap.option(ChannelOption.SO_REUSEADDR, true); + bootstrap.option(ChannelOption.TCP_NODELAY, true); + bootstrap.localAddress(address.getSocketAddress()); + future = bootstrap.bind(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // RpcMessageHandler handler = new RpcMessageHandlerEx(server); + + } + + public void close() { + future.channel().close(); + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java b/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java deleted file mode 100644 index 64a1cae..0000000 --- a/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -// -// MessagePack-RPC for Java -// -// Copyright (C) 2010 FURUHASHI Sadayuki -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.rpc.loop.netty; - -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.msgpack.MessagePack; -import org.msgpack.rpc.transport.RpcMessageHandler; - -class StreamPipelineFactory implements ChannelPipelineFactory { - private RpcMessageHandler handler; - private MessagePack messagePack; - - StreamPipelineFactory(MessagePack messagePack, RpcMessageHandler handler) { - this.handler = handler; - this.messagePack = messagePack; - } - - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline p = Channels.pipeline(); - p.addLast("msgpack-decode-stream", new MessagePackStreamDecoder(messagePack)); - p.addLast("msgpack-encode", new MessagePackEncoder(messagePack)); - p.addLast("message", new MessageHandler(handler)); - return p; - } -} diff --git a/src/main/java/org/msgpack/rpc/reflect/ReflectionInvokerBuilder.java b/src/main/java/org/msgpack/rpc/reflect/ReflectionInvokerBuilder.java index 2c403f2..cb153f5 100644 --- a/src/main/java/org/msgpack/rpc/reflect/ReflectionInvokerBuilder.java +++ b/src/main/java/org/msgpack/rpc/reflect/ReflectionInvokerBuilder.java @@ -15,6 +15,23 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.reflect; import java.io.IOException; @@ -138,7 +155,7 @@ public void convert(Object[] params, Value obj) throws MessageTypeException { } } - private static class ReflectionInvoker implements Invoker { + public static class ReflectionInvoker implements Invoker { protected Method method; protected int parameterLength; protected ReflectionArgumentEntry[] entries; @@ -255,6 +272,10 @@ public void invoke(Object target, Request request) throws Exception { // TODO exception } + + public final Method getMethod() { + return method; + } } public Invoker buildInvoker(Method targetMethod, ArgumentEntry[] entries, boolean async) { diff --git a/src/main/java/org/msgpack/rpc/transport/ClientTransport.java b/src/main/java/org/msgpack/rpc/transport/ClientTransport.java index f37abcf..263005b 100644 --- a/src/main/java/org/msgpack/rpc/transport/ClientTransport.java +++ b/src/main/java/org/msgpack/rpc/transport/ClientTransport.java @@ -15,12 +15,28 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.transport; import java.io.Closeable; public interface ClientTransport extends Closeable, MessageSendable { - public void sendMessage(Object obj); public void close(); } diff --git a/src/main/java/org/msgpack/rpc/transport/MessageSendable.java b/src/main/java/org/msgpack/rpc/transport/MessageSendable.java index c09f23b..fbbd2dc 100644 --- a/src/main/java/org/msgpack/rpc/transport/MessageSendable.java +++ b/src/main/java/org/msgpack/rpc/transport/MessageSendable.java @@ -15,8 +15,26 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.transport; public interface MessageSendable { public void sendMessage(Object obj); + public void sendDataDelay(Object obj); } diff --git a/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java b/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java index 6a3e029..4770fcb 100644 --- a/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java +++ b/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java @@ -15,165 +15,183 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.transport; -import java.io.IOException; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + import java.io.OutputStream; -import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; +import org.msgpack.MessagePack; import org.msgpack.rpc.Session; import org.msgpack.rpc.config.StreamClientConfig; -import org.msgpack.MessagePack; public abstract class PooledStreamClientTransport implements ClientTransport { - private static final InternalLogger LOG = - InternalLoggerFactory.getInstance(PooledStreamClientTransport.class); - - private final Object lock = new Object(); - private final List pool = new ArrayList(); - private final List errorChannelPool = new ArrayList(); - private int reconnectionLimit; - private int connecting = 0; - - protected final Session session; - protected final StreamClientConfig config; - protected final MessagePack messagePack; - - public PooledStreamClientTransport(StreamClientConfig config, - Session session) { - this.session = session; - this.config = config; - this.reconnectionLimit = config.getReconnectionLimit(); - this.messagePack = session.getEventLoop().getMessagePack(); - } - - protected Session getSession() { - return session; - } - - protected StreamClientConfig getConfig() { - return config; - } - - public void sendMessage(Object msg) { - synchronized (lock) { - if (connecting == -1) { - return; - } // already closed - if (pool.isEmpty()) { - if (connecting == 0) { - connecting++; - startConnection(); - } - if (pool.isEmpty()) { // may be already connected - try { - messagePack.write(getPendingBuffer(), msg); - } catch (IOException e) { - // FIXME - } - return; - } - } - // FIXME pseudo connection load balancing - Channel c = pool.get(0); - sendMessageChannel(c, msg); - } - } - - public void close() { - synchronized (lock) { - LOG.info("Close all channels"); - if (pendingBuffer != null) { - closePendingBuffer(pendingBuffer); - pendingBuffer = null; - } - connecting = -1; - for (Channel c : pool) { - closeChannel(c); - } - for (Channel c : errorChannelPool) { - closeChannel(c); - } - pool.clear(); - errorChannelPool.clear(); - } - } - - public void onConnected(Channel c) { - synchronized (lock) { - if (connecting == -1) { - closeChannel(c); - return; - } // already closed - LOG.debug("Success to connect new channel " + c); - pool.add(c); - connecting = 0; - if (pendingBuffer != null) { - flushPendingBuffer(pendingBuffer, c); - } - } - } - - public void onConnectFailed(Channel c, Throwable cause) { - synchronized (lock) { - if (connecting == -1) { - return; - } // already closed - if (connecting < reconnectionLimit) { - LOG.info(String.format("Reconnect %s(retry:%s)", c, - connecting + 1), cause); - connecting++; - if (pool.remove(c)) {// remove error channel - errorChannelPool.add(c); - } - startConnection(); - } else { - LOG.error(String.format( - "Fail to connect %s(tried %s times)", c, - reconnectionLimit), cause); - connecting = 0; - if (pendingBuffer != null) { - resetPendingBuffer(pendingBuffer); - } - session.transportConnectFailed(); - } - } - } - - public void onClosed(Channel c) { - synchronized (lock) { - if (connecting == -1) { - return; - } // already closed - LOG.info(String.format("Close channel %s", c)); - pool.remove(c); - errorChannelPool.remove(c); - } - } - - private PendingBuffer pendingBuffer = null; - - protected PendingBuffer getPendingBuffer() { - if (pendingBuffer == null) { - pendingBuffer = newPendingBuffer(); - } - return pendingBuffer; - } - - protected abstract PendingBuffer newPendingBuffer(); - - protected abstract void resetPendingBuffer(PendingBuffer b); - - protected abstract void flushPendingBuffer(PendingBuffer b, Channel c); - - protected abstract void closePendingBuffer(PendingBuffer b); - - protected abstract void startConnection(); - - protected abstract void sendMessageChannel(Channel c, Object msg); - - protected abstract void closeChannel(Channel c); + private static final InternalLogger LOG = InternalLoggerFactory.getInstance(PooledStreamClientTransport.class); + private static final int MAX_CHANNEL = 2; + + private final List pool = new CopyOnWriteArrayList<>(); + private final List errorChannelPool = new CopyOnWriteArrayList<>(); + private final AtomicInteger connecting = new AtomicInteger(); + private final Queue queMessage = new ConcurrentLinkedQueue<>(); + private final AtomicInteger assignIndex = new AtomicInteger(); + protected final Session session; + protected final StreamClientConfig config; + protected final MessagePack messagePack; + protected final int maxChannel = Objects.isNull(System.getProperty("maxChannel")) ? MAX_CHANNEL : Integer.valueOf(System.getProperty("maxChannel")); + protected final boolean isSocks = (Objects.nonNull(System.getProperty("socksProxyHost")) && Objects.nonNull(System.getProperty("socksProxyPort"))); + + public PooledStreamClientTransport(StreamClientConfig config, Session session) { + this.session = session; + this.config = config; + this.messagePack = session.getEventLoop().getMessagePack(); + } + + protected Session getSession() { + return session; + } + + protected StreamClientConfig getConfig() { + return config; + } + + public synchronized void sendMessage(Object msg) { + if (connecting.get() == -1) { + return; + } // already closed + + if (connecting.get() <= 0) { + if (connecting.get() < maxChannel) { + connecting.incrementAndGet(); + startConnection(); + queMessage.offer(msg); + } else { + queMessage.offer(msg); + } + } else { + if (connecting.get() >= maxChannel) { + if (!pool.isEmpty()) { + int index = assignIndex.getAndIncrement() % pool.size(); + if (index >= Integer.MAX_VALUE) { + assignIndex.set(0); + } + Channel c = pool.get(index); + sendMessageChannel(c, msg); + flushMessageQue(c); + } else { + queMessage.offer(msg); + } + } else { + connecting.incrementAndGet(); + startConnection(); + queMessage.offer(msg); + } + } + } + + public void close() { + LOG.debug("Close all channels"); + connecting.set(-1); + for (Channel c : pool) { + closeChannel(c); + } + for (Channel c : errorChannelPool) { + closeChannel(c); + } + pool.clear(); + errorChannelPool.clear(); + queMessage.clear(); + } + + public void onConnected(Channel c) { + if (connecting.get() == -1) { + closeChannel(c); + return; + } // already closed + + LOG.debug("Success to connect new channel " + c); + LOG.debug(Objects.toString(c)); + pool.add(c); + flushMessageQue(c); + } + + public void onConnectFailed(Channel c, Throwable cause) { + if (connecting.get() == -1) { + return; + } // already closed + LOG.error(String.format("Fail to connect %s", c), cause); + connecting.set(pool.size()); + session.transportConnectFailed(); + closeChannel(c); + } + + public void onClosed(Channel c) { + if (connecting.get() == -1) { + return; + } // already closed + LOG.debug(String.format("Close channel %s", c)); + pool.remove(c); + errorChannelPool.remove(c); + connecting.set(pool.size()); + } + + @Override + public void sendDataDelay(Object obj) { + + } + + public void onError(Channel c, String msg) { + if (connecting.get() == -1) { + return; + } // already closed + LOG.info(String.format("Error channel %s", c)); + closeChannel(c); + session.transportError(msg); + + } + + protected synchronized void flushMessageQue(Channel c) { + if (!queMessage.isEmpty()) { + queMessage.forEach(m -> sendMessageChannel(c, m)); + queMessage.clear(); + } + } + + protected abstract PendingBuffer newPendingBuffer(); + + protected abstract void resetPendingBuffer(PendingBuffer b); + + protected abstract void flushPendingBuffer(PendingBuffer b, Channel c); + + protected abstract void closePendingBuffer(PendingBuffer b); + + protected abstract void sendMessageChannel(Channel c, Object msg); + + protected abstract void closeChannel(Channel c); + + protected abstract void startConnection(); + } diff --git a/src/test/java/org/msgpack/rpc/reflect/AnnotationTest.java b/src/test/java/org/msgpack/rpc/reflect/AnnotationTest.java index 3a8665f..a9cd4d9 100644 --- a/src/test/java/org/msgpack/rpc/reflect/AnnotationTest.java +++ b/src/test/java/org/msgpack/rpc/reflect/AnnotationTest.java @@ -15,18 +15,29 @@ // See the License for the specific language governing permissions and // limitations under the License. // +/* +* Copyright (C) 2014-2015 Information Analysis Laboratory, NICT +* +* RaSC is free software: you can redistribute it and/or modify it +* under the terms of the GNU Lesser General Public License as published by +* the Free Software Foundation, either version 2.1 of the License, or (at +* your option) any later version. +* +* RaSC is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +* General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License +* along with this program. If not, see . +*/ + package org.msgpack.rpc.reflect; -import org.msgpack.*; -import org.msgpack.annotation.*; -import org.msgpack.rpc.*; -import org.msgpack.rpc.dispatcher.*; -import org.msgpack.rpc.config.*; -import org.msgpack.rpc.loop.*; -import org.msgpack.rpc.loop.netty.*; -import java.util.*; -import junit.framework.*; import org.junit.Test; +import org.msgpack.annotation.Ignore; +import org.msgpack.annotation.Index; +import org.msgpack.annotation.Optional; public class AnnotationTest extends ReflectTest { public static interface IgnoreTest { @@ -101,7 +112,7 @@ public void testOmitClientIgnoreServer() throws Exception { OmitTest c = context.getClient().proxy(OmitTest.class); try { String result; - + result = c.m01("a0"); assertEquals(""+"a0"+null, result); @@ -119,7 +130,7 @@ public void testOmitClientIgnoreServer2() throws Exception { OmitTest c = context.getClient().proxy(OmitTest.class); try { String result; - + result = c.m01("a0"); assertEquals(""+"a0"+null, result);