diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/ProxyTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/ProxyTest.java new file mode 100644 index 00000000000..f5c4ab27cdf --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/ProxyTest.java @@ -0,0 +1,251 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.testing.integration; + +import static org.junit.Assert.assertEquals; + +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@RunWith(JUnit4.class) +public class ProxyTest { + + private int serverPort = 5001; + private int proxyPort = 5050; + private String loopBack = "127.0.0.1"; + private static ThreadPoolExecutor executor = + new ThreadPoolExecutor(1, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue()); + + @AfterClass + public static void stopExecutor() { + executor.shutdown(); + } + + @Test + public void smallLatency() + throws UnknownHostException, IOException, InterruptedException, ExecutionException { + Server server = new Server(); + Thread serverThread = new Thread(server); + serverThread.start(); + + int latency = (int) TimeUnit.MILLISECONDS.toNanos(10); + TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS); + startProxy(p).get(); + Socket client = new Socket(loopBack, proxyPort); + client.setReuseAddress(true); + DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); + DataInputStream clientIn = new DataInputStream(client.getInputStream()); + byte[] message = new byte[1]; + + // test + long start = System.nanoTime(); + clientOut.write(message, 0, 1); + clientIn.read(message); + long stop = System.nanoTime(); + + p.shutDown(); + server.shutDown(); + client.close(); + + long rtt = (stop - start); + assertEquals(latency, rtt, latency); + } + + @Test + public void bigLatency() + throws UnknownHostException, IOException, InterruptedException, ExecutionException { + Server server = new Server(); + Thread serverThread = new Thread(server); + serverThread.start(); + + int latency = (int) TimeUnit.MILLISECONDS.toNanos(250); + TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS); + startProxy(p).get(); + Socket client = new Socket(loopBack, proxyPort); + DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); + DataInputStream clientIn = new DataInputStream(client.getInputStream()); + byte[] message = new byte[1]; + + // test + long start = System.nanoTime(); + clientOut.write(message, 0, 1); + clientIn.read(message); + long stop = System.nanoTime(); + + p.shutDown(); + server.shutDown(); + client.close(); + + long rtt = (stop - start); + assertEquals(latency, rtt, latency); + } + + @Test + public void smallBandwidth() + throws UnknownHostException, IOException, InterruptedException, ExecutionException { + Server server = new Server(); + server.setMode("stream"); + (new Thread(server)).start(); + assertEquals(server.mode(), "stream"); + + int bandwidth = 64 * 1024; + TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS); + startProxy(p).get(); + Socket client = new Socket(loopBack, proxyPort); + DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); + DataInputStream clientIn = new DataInputStream(client.getInputStream()); + + clientOut.write(new byte[1]); + clientIn.readFully(new byte[100 * 1024]); + long start = System.nanoTime(); + clientIn.readFully(new byte[5 * bandwidth]); + long stop = System.nanoTime(); + + p.shutDown(); + server.shutDown(); + client.close(); + + long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1))); + assertEquals(bandwidth, bandUsed, .5 * bandwidth); + } + + @Test + public void largeBandwidth() + throws UnknownHostException, IOException, InterruptedException, ExecutionException { + Server server = new Server(); + server.setMode("stream"); + (new Thread(server)).start(); + assertEquals(server.mode(), "stream"); + int bandwidth = 10 * 1024 * 1024; + TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS); + startProxy(p).get(); + Socket client = new Socket(loopBack, proxyPort); + DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); + DataInputStream clientIn = new DataInputStream(client.getInputStream()); + + clientOut.write(new byte[1]); + clientIn.readFully(new byte[100 * 1024]); + long start = System.nanoTime(); + clientIn.readFully(new byte[5 * bandwidth]); + long stop = System.nanoTime(); + + p.shutDown(); + server.shutDown(); + client.close(); + + long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1))); + assertEquals(bandwidth, bandUsed, .5 * bandwidth); + } + + private Future startProxy(final TrafficControlProxy p) { + return executor.submit(new Runnable() { + @Override + public void run() { + try { + p.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + // server with echo and streaming modes + private class Server implements Runnable { + private ServerSocket server; + private Socket rcv; + private boolean shutDown; + private String mode = "echo"; + + public void setMode(String mode) { + this.mode = mode; + } + + public String mode() { + return mode; + } + + public void shutDown() { + try { + rcv.close(); + server.close(); + shutDown = true; + } catch (IOException e) { + shutDown = true; + } + } + + @Override + public void run() { + try { + server = new ServerSocket(serverPort); + rcv = server.accept(); + DataInputStream serverIn = new DataInputStream(rcv.getInputStream()); + DataOutputStream serverOut = new DataOutputStream(rcv.getOutputStream()); + byte[] response = new byte[1024]; + if (mode.equals("echo")) { + while (!shutDown) { + int readable = serverIn.read(response); + serverOut.write(response, 0, readable); + } + } else if (mode.equals("stream")) { + serverIn.read(response); + byte[] message = new byte[16 * 1024]; + while (!shutDown) { + serverOut.write(message, 0, message.length); + } + serverIn.close(); + serverOut.close(); + rcv.close(); + } else { + System.out.println("Unknown mode: use 'echo' or 'stream'"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TrafficControlProxy.java b/interop-testing/src/test/java/io/grpc/testing/integration/TrafficControlProxy.java new file mode 100644 index 00000000000..4e6a7b11b40 --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TrafficControlProxy.java @@ -0,0 +1,266 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.testing.integration; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public final class TrafficControlProxy { + + private static final int DEFAULT_BAND_BPS = 1024 * 1024; + private static final int DEFAULT_DELAY_NANOS = 200 * 1000 * 1000; + private static final Logger logger = Logger.getLogger(TrafficControlProxy.class.getName()); + + // TODO: make host and ports arguments + private String localhost = "127.0.0.1"; + private int proxyPort = 5050; + private int serverPort = 5001; + private int queueLength; + private int chunkSize; + private int bandwidth; + private long latency; + private volatile boolean shutDown; + private ServerSocket clientAcceptor; + private Socket serverSock; + private Socket clientSock; + private final ThreadPoolExecutor executor = + new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue()); + + /** + * Returns a new TrafficControlProxy with default bandwidth and latency. + */ + public TrafficControlProxy() { + this(DEFAULT_BAND_BPS, DEFAULT_DELAY_NANOS, TimeUnit.NANOSECONDS); + } + + /** + * Returns a new TrafficControlProxy with bandwidth set to targetBPS, and latency set to + * targetLatency in latencyUnits. + */ + public TrafficControlProxy(int targetBps, int targetLatency, TimeUnit latencyUnits) { + checkArgument(targetBps > 0); + checkArgument(targetLatency > 0); + bandwidth = targetBps; + // divide by 2 because latency is applied in both directions + latency = latencyUnits.toNanos(targetLatency) / 2; + queueLength = (int) Math.max(bandwidth * latency / TimeUnit.SECONDS.toNanos(1), 1); + chunkSize = Math.max(1, queueLength); + } + + /** + * Starts a new thread that waits for client and server and start reader/writer threads. + */ + public void start() throws IOException { + // ClientAcceptor uses a ServerSocket server so that the client can connect to the proxy as it + // normally would a server. serverSock then connects the server using a regular Socket as a + // client normally would. + logger.info("Starting new proxy on port " + proxyPort + " with Queue Length " + queueLength); + clientAcceptor = new ServerSocket(); + clientAcceptor.setReuseAddress(true); + clientAcceptor.bind(new InetSocketAddress(localhost, proxyPort)); + executor.submit(new Runnable() { + @Override + public void run() { + try { + clientSock = clientAcceptor.accept(); + serverSock = new Socket(); + serverSock.connect(new InetSocketAddress(localhost, serverPort)); + startWorkers(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + } + + /** Interrupt all workers and close sockets. */ + public void shutDown() throws IOException { + // TODO: Handle case where a socket fails to close, therefore blocking the others from closing + logger.info("Proxy shutting down... "); + shutDown = true; + executor.shutdown(); + clientAcceptor.close(); + clientSock.close(); + serverSock.close(); + logger.info("Shutdown Complete"); + } + + private void startWorkers() throws IOException { + DataInputStream clientIn = new DataInputStream(clientSock.getInputStream()); + DataOutputStream clientOut = new DataOutputStream(serverSock.getOutputStream()); + DataInputStream serverIn = new DataInputStream(serverSock.getInputStream()); + DataOutputStream serverOut = new DataOutputStream(clientSock.getOutputStream()); + + MessageQueue clientPipe = new MessageQueue(clientIn, clientOut); + MessageQueue serverPipe = new MessageQueue(serverIn, serverOut); + + executor.submit(new Thread(new Reader(clientPipe))); + executor.submit(new Thread(new Writer(clientPipe))); + executor.submit(new Thread(new Reader(serverPipe))); + executor.submit(new Thread(new Writer(serverPipe))); + } + + private final class Reader implements Runnable { + + private final MessageQueue queue; + + Reader(MessageQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + while (!shutDown) { + try { + queue.readIn(); + } catch (IOException e) { + shutDown = true; + } catch (InterruptedException e) { + shutDown = true; + } + } + } + + } + + private final class Writer implements Runnable { + + private final MessageQueue queue; + + Writer(MessageQueue queue) { + this.queue = queue; + } + + @Override + public void run() { + while (!shutDown) { + try { + queue.writeOut(); + } catch (IOException e) { + shutDown = true; + } catch (InterruptedException e) { + shutDown = true; + } + } + } + } + + /** + * A Delay Queue that counts by number of bytes instead of the number of elements. + */ + private class MessageQueue { + DataInputStream inStream; + DataOutputStream outStream; + int bytesQueued; + BlockingQueue queue = new DelayQueue(); + + MessageQueue(DataInputStream inputStream, DataOutputStream outputStream) { + inStream = inputStream; + outStream = outputStream; + } + + /** + * Take a message off the queue and write it to an endpoint. Blocks until a message becomes + * available. + */ + void writeOut() throws InterruptedException, IOException { + Message next = queue.take(); + outStream.write(next.message, 0, next.messageLength); + incrementBytes(-next.messageLength); + } + + /** + * Read bytes from an endpoint and add them as a message to the queue. Blocks if the queue is + * full. + */ + void readIn() throws InterruptedException, IOException { + byte[] request = new byte[getNextChunk()]; + int readableBytes = inStream.read(request); + long sendTime = System.nanoTime() + latency; + queue.put(new Message(sendTime, request, readableBytes)); + incrementBytes(readableBytes); + } + + /** + * Block until space on the queue becomes available. Returns how many bytes can be read on to + * the queue + */ + synchronized int getNextChunk() throws InterruptedException { + while (bytesQueued == queueLength) { + wait(); + } + return Math.max(0, Math.min(chunkSize, queueLength - bytesQueued)); + } + + synchronized void incrementBytes(int delta) { + bytesQueued += delta; + if (bytesQueued < queueLength) { + notifyAll(); + } + } + } + + private static class Message implements Delayed { + long sendTime; + byte[] message; + int messageLength; + + Message(long sendTime, byte[] message, int messageLength) { + this.sendTime = sendTime; + this.message = message; + this.messageLength = messageLength; + } + + @Override + public int compareTo(Delayed o) { + return ((Long) sendTime).compareTo(((Message) o).sendTime); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(sendTime - System.nanoTime(), TimeUnit.NANOSECONDS); + } + } +}