Dramatically reduce the idle network activity by collecting and framing packets for a max of 10 milliseconds. This reduces bandwidth consumption as much as possible whilst not generating network garbage and other nasties.

This commit is contained in:
md_5 2013-04-20 19:41:41 +10:00
parent 713fe4fb2d
commit 95cd0996c0
1 changed files with 86 additions and 116 deletions

View File

@ -1,4 +1,4 @@
From efe9a9aecb849b6886372c7d9445cd79dd706687 Mon Sep 17 00:00:00 2001
From c9f5ce7c8d93b8ae0346eaad87e2d272b0f2a462 Mon Sep 17 00:00:00 2001
From: md_5 <md_5@live.com.au>
Date: Fri, 19 Apr 2013 17:44:39 +1000
Subject: [PATCH] Netty
@ -417,10 +417,10 @@ index 0000000..c8ea80a
+}
diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java
new file mode 100644
index 0000000..2dbbf6c
index 0000000..5e3a5f9
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/CipherCodec.java
@@ -0,0 +1,67 @@
@@ -0,0 +1,59 @@
+package org.spigotmc.netty;
+
+import io.netty.buffer.ByteBuf;
@ -428,7 +428,6 @@ index 0000000..2dbbf6c
+import io.netty.handler.codec.ByteToByteCodec;
+import javax.crypto.Cipher;
+import javax.crypto.ShortBufferException;
+import net.minecraft.server.Packet252KeyResponse;
+
+/**
+ * This class is a complete solution for encrypting and decoding bytes in a
@ -439,7 +438,6 @@ index 0000000..2dbbf6c
+
+ private Cipher encrypt;
+ private Cipher decrypt;
+ private Packet252KeyResponse responsePacket;
+ private ThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal();
+ private ThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal();
+
@ -451,15 +449,9 @@ index 0000000..2dbbf6c
+ }
+ }
+
+ public CipherCodec(Cipher encrypt, Cipher decrypt, Packet252KeyResponse responsePacket) {
+ public CipherCodec(Cipher encrypt, Cipher decrypt) {
+ this.encrypt = encrypt;
+ this.decrypt = decrypt;
+ this.responsePacket = responsePacket;
+ }
+
+ @Override
+ public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
+ ctx.channel().write(responsePacket);
+ }
+
+ @Override
@ -490,27 +482,34 @@ index 0000000..2dbbf6c
+}
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
new file mode 100644
index 0000000..0e1b1fd
index 0000000..2036b8e
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
@@ -0,0 +1,253 @@
@@ -0,0 +1,298 @@
+package org.spigotmc.netty;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.ScheduledFuture;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.security.PrivateKey;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+import net.minecraft.server.Connection;
@ -535,22 +534,10 @@ index 0000000..0e1b1fd
+ private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae();
+ /*========================================================================*/
+ private final Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>();
+ private final List<Packet> highPriorityQueue = new AbstractList<Packet>() {
+ @Override
+ public void add(int index, Packet element) {
+ // NOP
+ }
+
+ @Override
+ public Packet get(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+ };
+ private final List<Packet> highPriorityQueue = new ArrayList<Packet>();
+ private final ReentrantLock writeLock = new ReentrantLock();
+ private Runnable packetDispatcher;
+ private ScheduledFuture<?> scheduledTask;
+ private volatile boolean connected;
+ private Channel channel;
+ private SocketAddress address;
@ -569,6 +556,7 @@ index 0000000..0e1b1fd
+ // Check the throttle
+ if (serverConnection.throttle(((InetSocketAddress) channel.remoteAddress()).getAddress())) {
+ channel.close();
+ return;
+ }
+ // Then the socket adaptor
+ socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel);
@ -577,10 +565,55 @@ index 0000000..0e1b1fd
+ // Finally register the connection
+ connected = true;
+ serverConnection.register((PendingConnection) connection);
+ // And register our send dispatcher
+ packetDispatcher = new Runnable() {
+ public void run() {
+ // Ensure exclusive access to the queue
+ if (!writeLock.isHeldByCurrentThread()) {
+ writeLock.lock();
+ }
+ try {
+ if (highPriorityQueue.size() == 0) {
+ return;
+ }
+ // Try and get a bearing on the size
+ int estimatedSize = 0;
+ for (Packet packet : highPriorityQueue) {
+ estimatedSize += packet.a();
+ }
+ // Allocate a buffer
+ ByteBuf buf = channel.alloc().directBuffer(estimatedSize);
+ // And an outputstream to this buffer
+ DataOutputStream out = new DataOutputStream(new ByteBufOutputStream(buf));
+ // Loop through all packets
+ for (Packet packet : highPriorityQueue) {
+ // Write packet ID
+ buf.writeByte(packet.n());
+ try {
+ // Write actual packet
+ packet.a(out);
+ } catch (IOException ex) {
+ // Catch exception in case it ever happens (should never)
+ a("disconnect.genericReason", new Object[]{"Exception writing packet: " + ex});
+ }
+ }
+ // Clear existing packets so we can unlock our lock
+ highPriorityQueue.clear();
+ // Send the whole buffer down
+ writtenBytes += buf.readableBytes();
+ channel.write(buf);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ };
+ scheduledTask = ctx.executor().scheduleWithFixedDelay(packetDispatcher, 10, 10, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // Cleanup the timer task
+ scheduledTask.cancel(false);
+ a("disconnect.endOfStream", new Object[0]);
+ }
+
@ -642,18 +675,24 @@ index 0000000..0e1b1fd
+ packet = PacketListener.callQueued(this, connection, packet);
+ // If handler indicates packet send
+ if (packet != null) {
+ highPriorityQueue.add(packet);
+
+ // If needed, check and prepare encryption phase
+ // We don't send the packet here as it is sent just before the cipher handler has been added to ensure we can safeguard from any race conditions
+ // Which are caused by the slow first initialization of the cipher SPI
+ if (packet instanceof Packet252KeyResponse) {
+ Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret);
+ Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret);
+ CipherCodec codec = new CipherCodec(encrypt, decrypt, (Packet252KeyResponse) packet);
+ channel.pipeline().addBefore("decoder", "cipher", codec);
+ } else {
+ channel.write(packet);
+ // Aquire lock
+ writeLock.lock();
+ try {
+ highPriorityQueue.add(packet);
+ // If needed, check and prepare encryption phase
+ if (packet instanceof Packet252KeyResponse) {
+ Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret);
+ Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret);
+ CipherCodec codec = new CipherCodec(encrypt, decrypt);
+ // Flush send queue
+ packetDispatcher.run();
+ channel.pipeline().addBefore("decoder", "cipher", codec);
+ }
+ } finally {
+ // If we still have a lock, we need to get ri
+ if (writeLock.isHeldByCurrentThread()) {
+ writeLock.unlock();
+ }
+ }
+ }
+ }
@ -709,6 +748,8 @@ index 0000000..0e1b1fd
+ public void d() {
+ if (connected) {
+ connected = false;
+ // Send all pending packets
+ packetDispatcher.run();
+ channel.close();
+ }
+ }
@ -742,17 +783,13 @@ index 0000000..0e1b1fd
+ public long getWrittenBytes() {
+ return writtenBytes;
+ }
+
+ public void addWrittenBytes(int written) {
+ writtenBytes += written;
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
new file mode 100644
index 0000000..e5d24f7
index 0000000..9ad9c52
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
@@ -0,0 +1,90 @@
@@ -0,0 +1,79 @@
+package org.spigotmc.netty;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -768,17 +805,10 @@ index 0000000..e5d24f7
+import java.net.InetAddress;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.logging.Level;
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import net.minecraft.server.MinecraftServer;
+import net.minecraft.server.PendingConnection;
+import net.minecraft.server.ServerConnection;
+import org.bukkit.Bukkit;
+
+/**
+ * This is the NettyServerConnection class. It implements
@ -790,8 +820,6 @@ index 0000000..e5d24f7
+
+ private final ChannelFuture socket;
+
+
+
+ public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) {
+ super(ms);
+ int threads = Integer.getInteger("org.spigotmc.netty.threads", 3);
@ -808,14 +836,12 @@ index 0000000..e5d24f7
+ ch.pipeline()
+ .addLast("timer", new ReadTimeoutHandler(30))
+ .addLast("decoder", new PacketDecoder())
+ .addLast("encoder", new PacketEncoder(networkManager))
+ .addLast("manager", networkManager);
+ }
+ }).group(new NioEventLoopGroup(threads, new ThreadFactoryBuilder().setNameFormat("Netty IO Thread - %1$d").build())).localAddress(host, port).bind();
+ MinecraftServer.getServer().getLogger().info("Using Netty NIO with " + threads + " threads for network connections.");
+ }
+
+
+ /**
+ * Shutdown. This method is called when the server is shutting down and the
+ * server socket and all clients should be terminated with no further
@ -1167,62 +1193,6 @@ index 0000000..65074d2
+ packet = null;
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java
new file mode 100644
index 0000000..c8832d6
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java
@@ -0,0 +1,50 @@
+package org.spigotmc.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import java.io.DataOutputStream;
+import net.minecraft.server.Packet;
+
+/**
+ * Netty encoder which takes a packet and encodes it, and adds a byte packet id
+ * header.
+ */
+public class PacketEncoder extends MessageToByteEncoder<Packet> {
+
+ private ByteBuf outBuf;
+ private DataOutputStream dataOut;
+ private final NettyNetworkManager networkManager;
+
+ public PacketEncoder(NettyNetworkManager networkManager) {
+ this.networkManager = networkManager;
+ }
+
+ @Override
+ public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
+ if (outBuf == null) {
+ outBuf = ctx.alloc().directBuffer();
+ }
+ if (dataOut == null) {
+ dataOut = new DataOutputStream(new ByteBufOutputStream(outBuf));
+ }
+
+ out.writeByte(msg.n());
+ msg.a(dataOut);
+
+ networkManager.addWrittenBytes(outBuf.readableBytes());
+ out.writeBytes(outBuf);
+ out.discardSomeReadBytes();
+ }
+
+ @Override
+ public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
+ super.freeOutboundBuffer(ctx);
+ if (outBuf != null) {
+ outBuf.release();
+ outBuf = null;
+ }
+ dataOut = null;
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java
new file mode 100644
index 0000000..8e3b932