From bb2577d18f5b7fd2e0582de1acd29825c3005276 Mon Sep 17 00:00:00 2001 From: md_5 Date: Sun, 23 Jun 2013 16:32:51 +1000 Subject: [PATCH] Netty diff --git a/pom.xml b/pom.xml index 8c9f66b..f1a4d4c 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,16 @@ trove4j 3.0.2 + + io.netty + netty-all + 4.0.0.CR7 + + + org.javassist + javassist + 3.18.0-GA + diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java index 59444cb..9e6e318 100644 --- a/src/main/java/net/minecraft/server/DedicatedServer.java +++ b/src/main/java/net/minecraft/server/DedicatedServer.java @@ -97,10 +97,12 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer this.getLogger().info("Generating keypair"); this.a(MinecraftEncryption.b()); - this.getLogger().info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G()); + // Spigot start + // this.getLogger().info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G()); try { - this.r = new DedicatedServerConnection(this, inetaddress, this.G()); + this.r = new org.spigotmc.MultiplexingServerConnection(this); + // Spigot end } catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable this.getLogger().warning("**** FAILED TO BIND TO PORT!"); this.getLogger().warning("The exception was: {0}", new Object[] { ioexception.toString()}); diff --git a/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java b/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java index ef7e10d..e25819d 100644 --- a/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java +++ b/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java @@ -66,23 +66,19 @@ public class DedicatedServerConnectionThread extends Thread { socket.close(); continue; } + // CraftBukkit end - connectionThrottle = ((MinecraftServer) this.e.d()).server.getConnectionThrottle(); - - synchronized (this.b) { - if (this.b.containsKey(address) && !"127.0.0.1".equals(address.getHostAddress()) && currentTime - ((Long) this.b.get(address)).longValue() < connectionThrottle) { - this.b.put(address, Long.valueOf(currentTime)); - socket.close(); - continue; - } - - this.b.put(address, Long.valueOf(currentTime)); + // Spigot Start + if ( ( (org.spigotmc.MultiplexingServerConnection) MinecraftServer.getServer().ae() ).throttle( address ) ) + { + socket.close(); + continue; } - // CraftBukkit end + // Spigot end PendingConnection pendingconnection = new PendingConnection(this.e.d(), socket, "Connection #" + this.c++); - this.a(pendingconnection); + ((org.spigotmc.MultiplexingServerConnection) this.e.d().ae()).register(pendingconnection); // Spigot } catch (IOException ioexception) { this.e.d().getLogger().warning("DSCT: " + ioexception.getMessage()); // CraftBukkit } diff --git a/src/main/java/net/minecraft/server/INetworkManager.java b/src/main/java/net/minecraft/server/INetworkManager.java new file mode 100644 index 0000000..6fcc5d7 --- /dev/null +++ b/src/main/java/net/minecraft/server/INetworkManager.java @@ -0,0 +1,26 @@ +package net.minecraft.server; + +import java.net.SocketAddress; + +public interface INetworkManager { + + void a(Connection connection); + + void queue(Packet packet); + + void a(); + + void b(); + + SocketAddress getSocketAddress(); + + void d(); + + int e(); + + void a(String s, Object... aobject); + + java.net.Socket getSocket(); // Spigot + + void setSocketAddress(java.net.SocketAddress address); // Spigot +} diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java index 1862863..5a24f2a 100644 --- a/src/main/java/net/minecraft/server/NetworkManager.java +++ b/src/main/java/net/minecraft/server/NetworkManager.java @@ -24,7 +24,7 @@ public class NetworkManager implements INetworkManager { private final Object h = new Object(); private final IConsoleLogManager i; public Socket socket; // CraftBukkit - private -> public - private final SocketAddress k; + private SocketAddress k; // Spigot - remove final private volatile DataInputStream input; private volatile DataOutputStream output; private volatile boolean n = true; @@ -369,4 +369,6 @@ public class NetworkManager implements INetworkManager { static Thread h(NetworkManager networkmanager) { return networkmanager.u; } + + public void setSocketAddress(SocketAddress address) { k = address; } // Spigot } diff --git a/src/main/java/net/minecraft/server/Packet51MapChunk.java b/src/main/java/net/minecraft/server/Packet51MapChunk.java index efe102e..e488fa8 100644 --- a/src/main/java/net/minecraft/server/Packet51MapChunk.java +++ b/src/main/java/net/minecraft/server/Packet51MapChunk.java @@ -42,7 +42,7 @@ public class Packet51MapChunk extends Packet { this.b = chunk.z; this.e = flag; ChunkMap chunkmap = a(chunk, flag, i); - Deflater deflater = new Deflater(-1); + Deflater deflater = new Deflater(4); this.d = chunkmap.c; this.c = chunkmap.b; diff --git a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java index a2cd9b0..f586415 100644 --- a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java +++ b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java @@ -24,7 +24,7 @@ public class Packet56MapChunkBulk extends Packet { @Override protected Deflater initialValue() { // Don't use higher compression level, slows things down too much - return new Deflater(6); + return new Deflater(4); // Spigot - use lower compression level still } }; // CraftBukkit end diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java index 17cfacc..a945892 100644 --- a/src/main/java/net/minecraft/server/PendingConnection.java +++ b/src/main/java/net/minecraft/server/PendingConnection.java @@ -17,7 +17,7 @@ public class PendingConnection extends Connection { private static Random random = new Random(); private byte[] d; private final MinecraftServer server; - public final NetworkManager networkManager; + public final INetworkManager networkManager; public boolean b = false; private int f = 0; private String g = null; @@ -27,10 +27,15 @@ public class PendingConnection extends Connection { private SecretKey k = null; public String hostname = ""; // CraftBukkit - add field + public PendingConnection(MinecraftServer minecraftserver, org.spigotmc.netty.NettyNetworkManager networkManager) { + this.server = minecraftserver; + this.networkManager = networkManager; + } + public PendingConnection(MinecraftServer minecraftserver, Socket socket, String s) throws java.io.IOException { // CraftBukkit - throws IOException this.server = minecraftserver; this.networkManager = new NetworkManager(minecraftserver.getLogger(), socket, s, this, minecraftserver.F().getPrivate()); - this.networkManager.e = 0; + // this.networkManager.e = 0; } // CraftBukkit start @@ -146,7 +151,7 @@ public class PendingConnection extends Connection { // CraftBukkit org.bukkit.event.server.ServerListPingEvent pingEvent = org.bukkit.craftbukkit.event.CraftEventFactory.callServerListPingEvent(this.server.server, getSocket().getInetAddress(), this.server.getMotd(), playerlist.getPlayerCount(), playerlist.getMaxPlayers()); - if (packet254getinfo.a == 1) { + if (true) { // CraftBukkit start - Fix decompile issues, don't create a list from an array Object[] list = new Object[] { 1, 61, this.server.getVersion(), pingEvent.getMotd(), playerlist.getPlayerCount(), pingEvent.getMaxPlayers() }; @@ -173,9 +178,11 @@ public class PendingConnection extends Connection { this.networkManager.queue(new Packet255KickDisconnect(s)); this.networkManager.d(); - if (inetaddress != null && this.server.ae() instanceof DedicatedServerConnection) { - ((DedicatedServerConnection) this.server.ae()).a(inetaddress); + // Spigot start + if (inetaddress != null) { + ((org.spigotmc.MultiplexingServerConnection) this.server.ae()).unThrottle(inetaddress); } + // Spigot end this.b = true; } catch (Exception exception) { diff --git a/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/src/main/java/org/bukkit/craftbukkit/CraftServer.java index 6e6fe1c..68694de 100644 --- a/src/main/java/org/bukkit/craftbukkit/CraftServer.java +++ b/src/main/java/org/bukkit/craftbukkit/CraftServer.java @@ -1369,4 +1369,20 @@ public final class CraftServer implements Server { public CraftScoreboardManager getScoreboardManager() { return scoreboardManager; } + + // Spigot start + @SuppressWarnings("unchecked") + public java.util.Collection getSecondaryHosts() { + java.util.Collection ret = new java.util.HashSet(); + List listeners = configuration.getList("listeners"); + if (listeners != null) { + for (Object o : listeners) { + + Map sect = (Map) o; + ret.add(new java.net.InetSocketAddress((String) sect.get("address"), (Integer) sect.get("port"))); + } + } + return ret; + } + // Spigot end } diff --git a/src/main/java/org/spigotmc/MultiplexingServerConnection.java b/src/main/java/org/spigotmc/MultiplexingServerConnection.java new file mode 100644 index 0000000..abe5e0c --- /dev/null +++ b/src/main/java/org/spigotmc/MultiplexingServerConnection.java @@ -0,0 +1,126 @@ +package org.spigotmc; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.logging.Level; +import net.minecraft.server.DedicatedServerConnection; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; +import org.bukkit.Bukkit; +import org.spigotmc.netty.NettyServerConnection; + +public class MultiplexingServerConnection extends ServerConnection +{ + + private final Collection children = new HashSet(); + private final List pending = Collections.synchronizedList( new ArrayList() ); + private final HashMap throttle = new HashMap(); + + public MultiplexingServerConnection(MinecraftServer ms) throws Throwable + { + super( ms ); + + for ( SpigotConfig.Listener listener : SpigotConfig.listeners ) + { + // Calculate address, can't use isEmpty due to Java 5 + InetAddress socketAddress = ( listener.host.length() == 0 ) ? null : InetAddress.getByName( listener.host ); + // Say hello to the log + d().getLogger().info( "Starting listener #" + children.size() + " on " + ( socketAddress == null ? "*" : listener.host ) + ":" + listener.port ); + // Start connection: Netty / non Netty + ServerConnection l = ( listener.netty ) ? new NettyServerConnection( d(), socketAddress, listener.port ) : new DedicatedServerConnection( d(), socketAddress, listener.port ); + // Register with other connections + children.add( l ); + } + } + + /** + * close. + */ + @Override + public void a() + { + for ( ServerConnection child : children ) + { + child.a(); + } + } + + /** + * Pulse. This method pulses all connections causing them to update. It is + * called from the main server thread a few times a tick. + */ + @Override + public void b() + { + super.b(); // pulse PlayerConnections + for ( int i = 0; i < pending.size(); ++i ) + { + PendingConnection connection = pending.get( i ); + + try + { + connection.c(); + } catch ( Exception ex ) + { + connection.disconnect( "Internal server error" ); + Bukkit.getServer().getLogger().log( Level.WARNING, "Failed to handle packet: " + ex, ex ); + } + + if ( connection.b ) + { + pending.remove( i-- ); + } + } + } + + /** + * Remove the user from connection throttle. This should fix the server ping + * bugs. + * + * @param address the address to remove + */ + public void unThrottle(InetAddress address) + { + if ( address != null ) + { + synchronized ( throttle ) + { + throttle.remove( address ); + } + } + } + + /** + * Add a connection to the throttle list. + * + * @param address + * @return Whether they must be disconnected + */ + public boolean throttle(InetAddress address) + { + long currentTime = System.currentTimeMillis(); + synchronized ( throttle ) + { + Long value = throttle.get( address ); + if ( value != null && !address.isLoopbackAddress() && currentTime - value < d().server.getConnectionThrottle() ) + { + throttle.put( address, currentTime ); + return true; + } + + throttle.put( address, currentTime ); + } + return false; + } + + public void register(PendingConnection conn) + { + pending.add( conn ); + } +} diff --git a/src/main/java/org/spigotmc/SpigotConfig.java b/src/main/java/org/spigotmc/SpigotConfig.java index a0a7790..8b7e48e 100644 --- a/src/main/java/org/spigotmc/SpigotConfig.java +++ b/src/main/java/org/spigotmc/SpigotConfig.java @@ -6,6 +6,8 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -148,4 +150,47 @@ public class SpigotConfig commands.put( "restart", new RestartCommand( "restart" ) ); WatchdogThread.doStart( timeoutTime, restartOnCrash ); } + + public static class Listener + { + + public String host; + public int port; + public boolean netty; + public long connectionThrottle; + + public Listener(String host, int port, boolean netty, long connectionThrottle) + { + this.host = host; + this.port = port; + this.netty = netty; + this.connectionThrottle = connectionThrottle; + } + } + public static List listeners = new ArrayList(); + public static int nettyThreads; + private static void listeners() + { + Map def = new HashMap(); + def.put( "host", "default" ); + def.put( "port", "default" ); + def.put( "netty", true ); + // def.put( "throttle", "default" ); + + config.addDefault( "listeners", Collections.singletonList( def ) ); + for ( Map info : (List>) config.getList( "listeners" ) ) + { + String host = (String) info.get( "host" ); + if ( "default".equals( host ) ) + { + host = Bukkit.getIp(); + } + int port = ( info.get( "port" ) instanceof Integer ) ? (Integer) info.get( "port" ) : Bukkit.getPort(); + boolean netty = (Boolean) info.get( "netty" ); + // long connectionThrottle = ( info.get( "throttle" ) instanceof Number ) ? ( (Number) info.get( "throttle" ) ).longValue() : Bukkit.getConnectionThrottle(); + listeners.add( new Listener( host, port, netty, Bukkit.getConnectionThrottle() ) ); + } + + nettyThreads = getInt( "settings.netty-threads", 3 ); + } } diff --git a/src/main/java/org/spigotmc/netty/CipherBase.java b/src/main/java/org/spigotmc/netty/CipherBase.java new file mode 100644 index 0000000..c4306f7 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherBase.java @@ -0,0 +1,73 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import javax.crypto.Cipher; +import javax.crypto.ShortBufferException; + +/** + * Class to expose an + * {@link #cipher(io.netty.buffer.ByteBuf, io.netty.buffer.ByteBuf)} method to + * aid in the efficient passing of ByteBuffers through a cipher. + */ +class CipherBase +{ + + private final Cipher cipher; + private ThreadLocal heapInLocal = new EmptyByteThreadLocal(); + private ThreadLocal heapOutLocal = new EmptyByteThreadLocal(); + + private static class EmptyByteThreadLocal extends ThreadLocal + { + + @Override + protected byte[] initialValue() + { + return new byte[ 0 ]; + } + } + + protected CipherBase(Cipher cipher) + { + this.cipher = cipher; + } + + private byte[] bufToByte(ByteBuf in) + { + byte[] heapIn = heapInLocal.get(); + int readableBytes = in.readableBytes(); + if ( heapIn.length < readableBytes ) + { + heapIn = new byte[ readableBytes ]; + heapInLocal.set( heapIn ); + } + in.readBytes( heapIn, 0, readableBytes ); + return heapIn; + } + + protected ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws ShortBufferException + { + int readableBytes = in.readableBytes(); + byte[] heapIn = bufToByte( in ); + + ByteBuf heapOut = ctx.alloc().heapBuffer( cipher.getOutputSize( readableBytes ) ); + heapOut.writerIndex( cipher.update( heapIn, 0, readableBytes, heapOut.array(), heapOut.arrayOffset() ) ); + + return heapOut; + } + + protected void cipher(ByteBuf in, ByteBuf out) throws ShortBufferException + { + int readableBytes = in.readableBytes(); + byte[] heapIn = bufToByte( in ); + + byte[] heapOut = heapOutLocal.get(); + int outputSize = cipher.getOutputSize( readableBytes ); + if ( heapOut.length < outputSize ) + { + heapOut = new byte[ outputSize ]; + heapOutLocal.set( heapOut ); + } + out.writeBytes( heapOut, 0, cipher.update( heapIn, 0, readableBytes, heapOut ) ); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherDecoder.java b/src/main/java/org/spigotmc/netty/CipherDecoder.java new file mode 100644 index 0000000..a1094d2 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherDecoder.java @@ -0,0 +1,24 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.MessageList; +import io.netty.handler.codec.MessageToMessageDecoder; +import javax.crypto.Cipher; + +public class CipherDecoder extends MessageToMessageDecoder +{ + + private final CipherBase cipher; + + public CipherDecoder(Cipher cipher) + { + this.cipher = new CipherBase( cipher ); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageList out) throws Exception + { + out.add( cipher.cipher( ctx, msg ) ); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherEncoder.java b/src/main/java/org/spigotmc/netty/CipherEncoder.java new file mode 100644 index 0000000..2eb1dcb --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherEncoder.java @@ -0,0 +1,23 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import javax.crypto.Cipher; + +public class CipherEncoder extends MessageToByteEncoder +{ + + private final CipherBase cipher; + + public CipherEncoder(Cipher cipher) + { + this.cipher = new CipherBase( cipher ); + } + + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception + { + cipher.cipher( in, out ); + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 index 0000000..313e3ea --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java @@ -0,0 +1,316 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.MessageList; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; +import java.util.AbstractList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet252KeyResponse; +import net.minecraft.server.Packet255KickDisconnect; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.PlayerConnection; +import org.spigotmc.MultiplexingServerConnection; + +/** + * This class forms the basis of the Netty integration. It implements + * {@link INetworkManager} and handles all events and inbound messages provided + * by the upstream Netty process. + */ +public class NettyNetworkManager extends ChannelInboundHandlerAdapter implements INetworkManager +{ + + private static final ExecutorService threadPool = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat( "Async Packet Handler - %1$d" ).build() ); + private static final MinecraftServer server = MinecraftServer.getServer(); + private static final PrivateKey key = server.F().getPrivate(); + private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae(); + /*========================================================================*/ + private final Queue syncPackets = new ConcurrentLinkedQueue(); + private final List highPriorityQueue = new AbstractList() + { + @Override + public void add(int index, Packet element) + { + // NOP + } + + @Override + public Packet get(int index) + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + return 0; + } + }; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; + Connection connection; + private SecretKey secret; + private String dcReason; + private Object[] dcArgs; + private Socket socketAdaptor; + private long writtenBytes; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception + { + // Channel and address groundwork first + channel = ctx.channel(); + address = channel.remoteAddress(); + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt( (SocketChannel) channel ); + // Followed by their first handler + connection = new PendingConnection( server, this ); + // Finally register the connection + connected = true; + serverConnection.register( (PendingConnection) connection ); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception + { + a( "disconnect.endOfStream", new Object[ 0 ] ); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception + { + // TODO: Remove this once we are more stable + // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log ======================="); + // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause); + // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log ======================="); + // Disconnect with generic reason + exception + a( "disconnect.genericReason", new Object[] + { + "Internal exception: " + cause + } ); + cause.printStackTrace(); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception + { + MessageList packets = msgs.cast(); + for ( final Packet msg : packets ) + { + if ( connected ) + { + if ( msg instanceof Packet252KeyResponse ) + { + secret = ( (Packet252KeyResponse) msg ).a( key ); + Cipher decrypt = NettyServerConnection.getCipher( Cipher.DECRYPT_MODE, secret ); + channel.pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) ); + } + + if ( msg.a_() ) + { + threadPool.submit( new Runnable() + { + public void run() + { + Packet packet = PacketListener.callReceived( NettyNetworkManager.this, connection, msg ); + if ( packet != null ) + { + packet.handle( connection ); + } + } + } ); + } else + { + syncPackets.add( msg ); + } + } + } + } + + public Socket getSocket() + { + return socketAdaptor; + } + + /** + * setHandler. Set the {@link NetHandler} used to process received packets. + * + * @param nh the new {@link NetHandler} instance + */ + public void a(Connection nh) + { + connection = nh; + } + + /** + * queue. Queue a packet for sending, or in this case send it to be write it + * straight to the channel. + * + * @param packet the packet to queue + */ + public void queue(final Packet packet) + { + // Only send if channel is still connected + if ( connected ) + { + if ( channel.eventLoop().inEventLoop() ) + { + queue0( packet ); + } else + { + channel.eventLoop().execute( new Runnable() + { + public void run() + { + queue0( packet ); + } + } ); + } + } + } + + private void queue0(Packet packet) + { + // Process packet via handler + packet = PacketListener.callQueued( this, connection, packet ); + // If handler indicates packet send + if ( packet != null ) + { + highPriorityQueue.add( packet ); + + ChannelPromise promise = channel.newPromise(); + if ( packet instanceof Packet255KickDisconnect ) + { + channel.pipeline().get( OutboundManager.class ).lastFlush = 0; + } + + channel.write( packet, promise ); + if ( packet instanceof Packet252KeyResponse ) + { + Cipher encrypt = NettyServerConnection.getCipher( Cipher.ENCRYPT_MODE, secret ); + channel.pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) ); + } + } + } + + /** + * wakeThreads. In Vanilla this method will interrupt the network read and + * write threads, thus waking them. + */ + public void a() + { + } + + /** + * processPackets. Remove up to 1000 packets from the queue and process + * them. This method should only be called from the main server thread. + */ + public void b() + { + for ( int i = 1000; !syncPackets.isEmpty() && i >= 0; i-- ) + { + if ( connection instanceof PendingConnection ? ( (PendingConnection) connection ).b : ( (PlayerConnection) connection ).disconnected ) + { + syncPackets.clear(); + break; + } + + Packet packet = PacketListener.callReceived( this, connection, syncPackets.poll() ); + if ( packet != null ) + { + packet.handle( connection ); + } + } + + // Disconnect via the handler - this performs all plugin related cleanup + logging + if ( !connected && ( dcReason != null || dcArgs != null ) ) + { + connection.a( dcReason, dcArgs ); + } + } + + /** + * getSocketAddress. Return the remote address of the connected user. It is + * important that this method returns a value even after disconnect. + * + * @return the remote address of this connection + */ + public SocketAddress getSocketAddress() + { + return address; + } + + public void setSocketAddress(SocketAddress address) + { + this.address = address; + } + + /** + * close. Close and release all resources associated with this connection. + */ + public void d() + { + if ( connected ) + { + connected = false; + channel.close(); + } + } + + /** + * queueSize. Return the number of packets in the low priority queue. In a + * NIO environment this will always be 0. + * + * @return the size of the packet send queue + */ + public int e() + { + return 0; + } + + /** + * networkShutdown. Shuts down this connection, storing the reason and + * parameters, used to notify the current {@link Connection}. + * + * @param reason the main disconnect reason + * @param arguments additional disconnect arguments, for example, the + * exception which triggered the disconnect. + */ + public void a(String reason, Object... arguments) + { + if ( connected ) + { + dcReason = reason; + dcArgs = arguments; + d(); + } + } + + public long getWrittenBytes() + { + return writtenBytes; + } + + public void addWrittenBytes(int written) + { + writtenBytes += written; + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 index 0000000..9eecd59 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java @@ -0,0 +1,106 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.security.GeneralSecurityException; +import java.security.Key; +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.ServerConnection; +import org.spigotmc.MultiplexingServerConnection; +import org.spigotmc.SpigotConfig; + +/** + * This is the NettyServerConnection class. It implements + * {@link ServerConnection} and is the main interface between the Minecraft + * server and this NIO implementation. It handles starting, stopping and + * processing the Netty backend. + */ +public class NettyServerConnection extends ServerConnection +{ + + private final ChannelFuture socket; + private static EventLoopGroup group; + + public NettyServerConnection(final MinecraftServer ms, InetAddress host, int port) + { + super( ms ); + if ( group == null ) + { + group = new NioEventLoopGroup( SpigotConfig.nettyThreads, new ThreadFactoryBuilder().setNameFormat( "Netty IO Thread - %1$d" ).build() ); + } + + socket = new ServerBootstrap().channel( NioServerSocketChannel.class ).childHandler( new ChannelInitializer() + { + @Override + public void initChannel(Channel ch) throws Exception + { + // Check the throttle + if ( ( (MultiplexingServerConnection) ms.ae() ).throttle( ( (InetSocketAddress) ch.remoteAddress() ).getAddress() ) ) + { + ch.close(); + return; + } + // Set IP_TOS + try + { + ch.config().setOption( ChannelOption.IP_TOS, 0x18 ); + } catch ( ChannelException ex ) + { + // IP_TOS is not supported (Windows XP / Windows Server 2003) + } + + NettyNetworkManager networkManager = new NettyNetworkManager(); + ch.pipeline() + .addLast( "flusher", new OutboundManager( networkManager ) ) + .addLast( "timer", new ReadTimeoutHandler( 30 ) ) + .addLast( "decoder", new PacketDecoder() ) + .addLast( "encoder", new PacketEncoder( networkManager ) ) + .addLast( "manager", networkManager ); + } + } ).childOption( ChannelOption.TCP_NODELAY, false ).group( group ).localAddress( host, port ).bind().syncUninterruptibly(); + } + + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further + * action. + */ + @Override + public void a() + { + socket.channel().close().syncUninterruptibly(); + } + + /** + * Return a Minecraft compatible cipher instance from the specified key. + * + * @param opMode the mode to initialize the cipher in + * @param key to use as the initial vector + * @return the initialized cipher + */ + public static Cipher getCipher(int opMode, Key key) + { + try + { + Cipher cip = Cipher.getInstance( "AES/CFB8/NoPadding" ); + cip.init( opMode, key, new IvParameterSpec( key.getEncoded() ) ); + return cip; + } catch ( GeneralSecurityException ex ) + { + throw new RuntimeException( ex ); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java new file mode 100644 index 0000000..5da8a59 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java @@ -0,0 +1,294 @@ +package org.spigotmc.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +/** + * This class wraps a Netty {@link Channel} in a {@link Socket}. It overrides + * all methods in {@link Socket} to ensure that calls are not mistakingly made + * to the unsupported super socket. All operations that can be sanely applied to + * a {@link Channel} are implemented here. Those which cannot will throw an + * {@link UnsupportedOperationException}. + */ +public class NettySocketAdaptor extends Socket +{ + + private final io.netty.channel.socket.SocketChannel ch; + + private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) + { + this.ch = ch; + } + + public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) + { + return new NettySocketAdaptor( ch ); + } + + @Override + public void bind(SocketAddress bindpoint) throws IOException + { + ch.bind( bindpoint ).syncUninterruptibly(); + } + + @Override + public synchronized void close() throws IOException + { + ch.close().syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint) throws IOException + { + ch.connect( endpoint ).syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException + { + ch.config().setConnectTimeoutMillis( timeout ); + ch.connect( endpoint ).syncUninterruptibly(); + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof NettySocketAdaptor && ch.equals( ( (NettySocketAdaptor) obj ).ch ); + } + + @Override + public SocketChannel getChannel() + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public InetAddress getInetAddress() + { + return ch.remoteAddress().getAddress(); + } + + @Override + public InputStream getInputStream() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public boolean getKeepAlive() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_KEEPALIVE ); + } + + @Override + public InetAddress getLocalAddress() + { + return ch.localAddress().getAddress(); + } + + @Override + public int getLocalPort() + { + return ch.localAddress().getPort(); + } + + @Override + public SocketAddress getLocalSocketAddress() + { + return ch.localAddress(); + } + + @Override + public boolean getOOBInline() throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public OutputStream getOutputStream() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public int getPort() + { + return ch.remoteAddress().getPort(); + } + + @Override + public synchronized int getReceiveBufferSize() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_RCVBUF ); + } + + @Override + public SocketAddress getRemoteSocketAddress() + { + return ch.remoteAddress(); + } + + @Override + public boolean getReuseAddress() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_REUSEADDR ); + } + + @Override + public synchronized int getSendBufferSize() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_SNDBUF ); + } + + @Override + public int getSoLinger() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_LINGER ); + } + + @Override + public synchronized int getSoTimeout() throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public boolean getTcpNoDelay() throws SocketException + { + return ch.config().getOption( ChannelOption.TCP_NODELAY ); + } + + @Override + public int getTrafficClass() throws SocketException + { + return ch.config().getOption( ChannelOption.IP_TOS ); + } + + @Override + public int hashCode() + { + return ch.hashCode(); + } + + @Override + public boolean isBound() + { + return ch.localAddress() != null; + } + + @Override + public boolean isClosed() + { + return !ch.isOpen(); + } + + @Override + public boolean isConnected() + { + return ch.isActive(); + } + + @Override + public boolean isInputShutdown() + { + return ch.isInputShutdown(); + } + + @Override + public boolean isOutputShutdown() + { + return ch.isOutputShutdown(); + } + + @Override + public void sendUrgentData(int data) throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setKeepAlive(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.SO_KEEPALIVE, on ); + } + + @Override + public void setOOBInline(boolean on) throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public synchronized void setReceiveBufferSize(int size) throws SocketException + { + ch.config().setOption( ChannelOption.SO_RCVBUF, size ); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.SO_REUSEADDR, on ); + } + + @Override + public synchronized void setSendBufferSize(int size) throws SocketException + { + ch.config().setOption( ChannelOption.SO_SNDBUF, size ); + } + + @Override + public void setSoLinger(boolean on, int linger) throws SocketException + { + ch.config().setOption( ChannelOption.SO_LINGER, linger ); + } + + @Override + public synchronized void setSoTimeout(int timeout) throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setTcpNoDelay(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.TCP_NODELAY, on ); + } + + @Override + public void setTrafficClass(int tc) throws SocketException + { + ch.config().setOption( ChannelOption.IP_TOS, tc ); + } + + @Override + public void shutdownInput() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void shutdownOutput() throws IOException + { + ch.shutdownOutput().syncUninterruptibly(); + } + + @Override + public String toString() + { + return ch.toString(); + } +} diff --git a/src/main/java/org/spigotmc/netty/OutboundManager.java b/src/main/java/org/spigotmc/netty/OutboundManager.java new file mode 100644 index 0000000..728f260 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/OutboundManager.java @@ -0,0 +1,34 @@ +package org.spigotmc.netty; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.MessageList; +import net.minecraft.server.PendingConnection; + +class OutboundManager extends ChannelOutboundHandlerAdapter +{ + + private static final int FLUSH_TIME = 1; + /*========================================================================*/ + public long lastFlush; + private final NettyNetworkManager manager; + private final MessageList pending = MessageList.newInstance(); + + OutboundManager(NettyNetworkManager manager) + { + this.manager = manager; + } + + @Override + public void write(ChannelHandlerContext ctx, MessageList msgs, ChannelPromise promise) throws Exception + { + pending.add( msgs ); + if ( manager.connection instanceof PendingConnection || System.currentTimeMillis() - lastFlush > FLUSH_TIME ) + { + lastFlush = System.currentTimeMillis(); + ctx.write( pending.copy() ); + pending.clear(); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java new file mode 100644 index 0000000..3adc8d6 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketDecoder.java @@ -0,0 +1,68 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.MessageList; +import io.netty.handler.codec.ReplayingDecoder; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; + +/** + * Packet decoding class backed by a reusable {@link DataInputStream} which + * backs the input {@link ByteBuf}. Reads an unsigned byte packet header and + * then decodes the packet accordingly. + */ +public class PacketDecoder extends ReplayingDecoder +{ + + private DataInputStream input; + private Packet packet; + + public PacketDecoder() + { + super( ReadState.HEADER ); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList out) throws Exception + { + if ( input == null ) + { + input = new DataInputStream( new ByteBufInputStream( in ) ); + } + + while ( true ) + { + switch ( state() ) + { + case HEADER: + short packetId = in.readUnsignedByte(); + packet = Packet.a( MinecraftServer.getServer().getLogger(), packetId ); + if ( packet == null ) + { + throw new IOException( "Bad packet id " + packetId ); + } + checkpoint( ReadState.DATA ); + case DATA: + try + { + packet.a( input ); + } catch ( EOFException ex ) + { + return; + } + + checkpoint( ReadState.HEADER ); + out.add( packet ); + packet = null; + break; + default: + throw new IllegalStateException(); + } + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java new file mode 100644 index 0000000..e6a45d3 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java @@ -0,0 +1,55 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import java.io.DataOutputStream; +import net.minecraft.server.Packet; + +/** + * Netty encoder which takes a packet and encodes it, and adds a byte packet id + * header. + */ +public class PacketEncoder extends MessageToByteEncoder +{ + + private ByteBuf outBuf; + private DataOutputStream dataOut; + private final NettyNetworkManager networkManager; + + public PacketEncoder(NettyNetworkManager networkManager) + { + this.networkManager = networkManager; + } + + @Override + public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception + { + if ( outBuf == null ) + { + outBuf = ctx.alloc().buffer(); + } + if ( dataOut == null ) + { + dataOut = new DataOutputStream( new ByteBufOutputStream( outBuf ) ); + } + + out.writeByte( msg.n() ); + msg.a( dataOut ); + + networkManager.addWrittenBytes( outBuf.readableBytes() ); + out.writeBytes( outBuf ); + outBuf.discardSomeReadBytes(); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception + { + if ( outBuf != null ) + { + outBuf.release(); + outBuf = null; + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..965ba12 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketListener.java @@ -0,0 +1,112 @@ +package org.spigotmc.netty; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.Packet; +import org.bukkit.Bukkit; +import org.bukkit.plugin.Plugin; + +/** + * This class is used for plugins that wish to register to listen to incoming + * and outgoing packets. To use this class, simply create a new instance, + * override the methods you wish to use, and call + * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}. + */ +public class PacketListener +{ + + /** + * A mapping of all registered listeners and their owning plugins. + */ + private static final Map listeners = new HashMap(); + /** + * A baked list of all listeners, for efficiency sake. + */ + private static PacketListener[] baked = new PacketListener[ 0 ]; + + /** + * Used to register a handler for receiving notifications of packet + * activity. + * + * @param listener the listener to register + * @param plugin the plugin owning this listener + */ + public static synchronized void register(PacketListener listener, Plugin plugin) + { + Preconditions.checkNotNull( listener, "listener" ); + Preconditions.checkNotNull( plugin, "plugin" ); + Preconditions.checkState( !listeners.containsKey( listener ), "listener already registered" ); + + int size = listeners.size(); + Preconditions.checkState( baked.length == size ); + listeners.put( listener, plugin ); + baked = Arrays.copyOf( baked, size + 1 ); + baked[size] = listener; + } + + static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) + { + for ( PacketListener listener : baked ) + { + try + { + packet = listener.packetReceived( networkManager, connection, packet ); + } catch ( Throwable t ) + { + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing receive hook for packet", t ); + } + } + return packet; + } + + static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) + { + for ( PacketListener listener : baked ) + { + try + { + packet = listener.packetQueued( networkManager, connection, packet ); + } catch ( Throwable t ) + { + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing queued hook for packet", t ); + } + } + return packet; + } + + /** + * Called when a packet has been received and is about to be handled by the + * current {@link Connection}. The returned packet will be the packet passed + * on for handling, or in the case of null being returned, not handled at + * all. + * + * @param networkManager the NetworkManager receiving the packet + * @param connection the connection which will handle the packet + * @param packet the received packet + * @return the packet to be handled, or null to cancel + */ + public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) + { + return packet; + } + + /** + * Called when a packet is queued to be sent. The returned packet will be + * the packet sent. In the case of null being returned, the packet will not + * be sent. + * + * @param networkManager the NetworkManager which will send the packet + * @param connection the connection which queued the packet + * @param packet the queue packet + * @return the packet to be sent, or null if the packet will not be sent. + */ + public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) + { + return packet; + } +} diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java new file mode 100644 index 0000000..d3a9cab --- /dev/null +++ b/src/main/java/org/spigotmc/netty/ReadState.java @@ -0,0 +1,17 @@ +package org.spigotmc.netty; + +/** + * Stores the state of the packet currently being read. + */ +public enum ReadState +{ + + /** + * Indicates the byte representing the ID has been read. + */ + HEADER, + /** + * Shows the packet body is being read. + */ + DATA; +} -- 1.8.1.2