From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Aikar <aikar@aikar.co>
Date: Wed, 6 May 2020 04:53:35 -0400
Subject: [PATCH] Optimize Network Manager and add advanced packet support

Adds ability for 1 packet to bundle other packets to follow it
Adds ability for a packet to delay sending more packets until a state is ready.

Removes synchronization from sending packets
Removes processing packet queue off of main thread
  - for the few cases where it is allowed, order is not necessary nor
    should it even be happening concurrently in first place (handshaking/login/status)

Ensures packets sent asynchronously are dispatched on main thread

This helps ensure safety for ProtocolLib as packet listeners
are commonly accessing world state. This will allow you to schedule
a packet to be sent async, but itll be dispatched sync for packet
listeners to process.

This should solve some deadlock risks

Part of this commit was authored by: Spottedleaf

diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java
index b1dededc15cce686ead74a99bee64c89ac1de22c..94c25c542dd18396fa792af944794bdb2436d2fd 100644
--- a/src/main/java/net/minecraft/server/NetworkManager.java
+++ b/src/main/java/net/minecraft/server/NetworkManager.java
@@ -64,6 +64,10 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
     public int protocolVersion;
     public java.net.InetSocketAddress virtualHost;
     private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
+    // Optimize network
+    boolean isPending = true;
+    boolean queueImmunity = false;
+    EnumProtocol protocol;
     // Paper end
 
     public NetworkManager(EnumProtocolDirection enumprotocoldirection) {
@@ -87,6 +91,7 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
     }
 
     public void setProtocol(EnumProtocol enumprotocol) {
+        protocol = enumprotocol; // Paper
         this.channel.attr(NetworkManager.c).set(enumprotocol);
         this.channel.config().setAutoRead(true);
         NetworkManager.LOGGER.debug("Enabled auto read");
@@ -158,19 +163,82 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
         NetworkManager.LOGGER.debug("Set listener of {} to {}", this, packetlistener);
         this.packetListener = packetlistener;
     }
+    // Paper start
+    private EntityPlayer getPlayer() {
+        if (packetListener instanceof PlayerConnection) {
+            return ((PlayerConnection) packetListener).player;
+        } else {
+            return null;
+        }
+    }
+    private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up.
+        private static java.util.List<Packet> buildExtraPackets(Packet packet) {
+            java.util.List<Packet> extra = packet.getExtraPackets();
+            if (extra == null || extra.isEmpty()) {
+                return null;
+            }
+            java.util.List<Packet> ret = new java.util.ArrayList<>(1 + extra.size());
+            buildExtraPackets0(extra, ret);
+            return ret;
+        }
+
+        private static void buildExtraPackets0(java.util.List<Packet> extraPackets, java.util.List<Packet> into) {
+            for (Packet extra : extraPackets) {
+                into.add(extra);
+                java.util.List<Packet> extraExtra = extra.getExtraPackets();
+                if (extraExtra != null && !extraExtra.isEmpty()) {
+                    buildExtraPackets0(extraExtra, into);
+                }
+            }
+        }
+        // Paper start
+        private static boolean canSendImmediate(NetworkManager networkManager, Packet<?> packet) {
+            return networkManager.isPending || networkManager.protocol != EnumProtocol.PLAY || networkManager.queueImmunity ||
+                packet instanceof PacketPlayOutKeepAlive ||
+                packet instanceof PacketPlayOutChat ||
+                packet instanceof PacketPlayOutTabComplete;
+        }
+        // Paper end
+    }
+    // Paper end
 
     public void sendPacket(Packet<?> packet) {
         this.sendPacket(packet, (GenericFutureListener) null);
     }
 
     public void sendPacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> genericfuturelistener) {
-        if (this.isConnected()) {
-            this.o();
-            this.b(packet, genericfuturelistener);
-        } else {
-            this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener));
+        // Paper start - handle oversized packets better
+        boolean connected = this.isConnected();
+        if (!connected && !preparing) {
+            return; // Do nothing
+        }
+        packet.onPacketDispatch(getPlayer());
+        if (connected && (InnerUtil.canSendImmediate(this, packet) || (
+            MCUtil.isMainThread() && packet.isReady() && this.packetQueue.isEmpty() &&
+            (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())
+        ))) {
+            this.dispatchPacket(packet, genericfuturelistener);
+            return;
         }
+        // write the packets to the queue, then flush - antixray hooks there already
+        java.util.List<Packet> extraPackets = InnerUtil.buildExtraPackets(packet);
+        boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
+        if (!hasExtraPackets) {
+            this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener));
+        } else {
+            java.util.List<NetworkManager.QueuedPacket> packets = new java.util.ArrayList<>(1 + extraPackets.size());
+            packets.add(new NetworkManager.QueuedPacket(packet, null)); // delay the future listener until the end of the extra packets
 
+            for (int i = 0, len = extraPackets.size(); i < len;) {
+                Packet extra = extraPackets.get(i);
+                boolean end = ++i == len;
+                packets.add(new NetworkManager.QueuedPacket(extra, end ? genericfuturelistener : null)); // append listener to the end
+            }
+
+            this.packetQueue.addAll(packets); // atomic
+        }
+        this.sendPacketQueue();
+        // Paper end
     }
 
     private void dispatchPacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> genericFutureListener) { this.b(packet, genericFutureListener); } // Paper - OBFHELPER
@@ -194,6 +262,11 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
             if (genericfuturelistener != null) {
                 channelfuture.addListener(genericfuturelistener);
             }
+            // Paper start
+            if (packet.hasFinishListener()) {
+                channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(getPlayer(), channelFuture));
+            }
+            // Paper end
 
             channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
         } else {
@@ -207,6 +280,11 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
                 if (genericfuturelistener != null) {
                     channelfuture1.addListener(genericfuturelistener);
                 }
+                // Paper start
+                if (packet.hasFinishListener()) {
+                    channelfuture1.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(getPlayer(), channelFuture));
+                }
+                // Paper end
 
                 channelfuture1.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             });
@@ -214,21 +292,46 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
 
     }
 
-    private void sendPacketQueue() { this.o(); } // Paper - OBFHELPER
-    private void o() {
-        if (this.channel != null && this.channel.isOpen()) {
-            Queue queue = this.packetQueue;
-
+    // Paper start - rewrite this to be safer on
+    private boolean sendPacketQueue() { return this.o(); } // OBFHELPER // void -> boolean
+    private boolean o() { // void -> boolean
+        if (!isConnected()) {
+            return true;
+        }
+        if (MCUtil.isMainThread()) {
+            return processQueue();
+        } else if (isPending) {
+            // Should only happen during login/status stages
             synchronized (this.packetQueue) {
-                NetworkManager.QueuedPacket networkmanager_queuedpacket;
-
-                while ((networkmanager_queuedpacket = (NetworkManager.QueuedPacket) this.packetQueue.poll()) != null) {
-                    this.b(networkmanager_queuedpacket.a, networkmanager_queuedpacket.b);
-                }
+                return this.processQueue();
+            }
+        }
+        return false;
+    }
+    private boolean processQueue() {
+        if (this.packetQueue.isEmpty()) return true;
+        // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
+        // But if we are not on main due to login/status, the parent is synchronized on packetQueue
+        java.util.Iterator<QueuedPacket> iterator = this.packetQueue.iterator();
+        while (iterator.hasNext()) {
+            NetworkManager.QueuedPacket queued = iterator.next(); // poll -> peek
+
+            // Fix NPE (Spigot bug caused by handleDisconnection())
+            if (queued == null) {
+                return true;
+            }
 
+            Packet<?> packet = queued.getPacket();
+            if (!packet.isReady()) {
+                return false;
+            } else {
+                iterator.remove();
+                this.dispatchPacket(packet, queued.getGenericFutureListener());
             }
         }
+        return true;
     }
+    // Paper end
 
     public void a() {
         this.o();
@@ -257,9 +360,21 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
         return this.socketAddress;
     }
 
+    // Paper start
+    public void clearPacketQueue() {
+        EntityPlayer player = getPlayer();
+        packetQueue.forEach(queuedPacket -> {
+            Packet<?> packet = queuedPacket.getPacket();
+            if (packet.hasFinishListener()) {
+                packet.onPacketDispatchFinish(player, null);
+            }
+        });
+        packetQueue.clear();
+    } // Paper end
     public void close(IChatBaseComponent ichatbasecomponent) {
         // Spigot Start
         this.preparing = false;
+        clearPacketQueue(); // Paper
         // Spigot End
         if (this.channel.isOpen()) {
             this.channel.close(); // We can't wait as this may be called from an event loop.
@@ -335,7 +450,7 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
                 } else if (this.i() != null) {
                     this.i().a(new ChatMessage("multiplayer.disconnect.generic", new Object[0]));
                 }
-                this.packetQueue.clear(); // Free up packet queue.
+                clearPacketQueue(); // Paper
                 // Paper start - Add PlayerConnectionCloseEvent
                 final PacketListener packetListener = this.i();
                 if (packetListener instanceof PlayerConnection) {
diff --git a/src/main/java/net/minecraft/server/Packet.java b/src/main/java/net/minecraft/server/Packet.java
index 2d8e6a2f4a0c3c5d74a647d7164b0028781d3bf5..c5edf8c4b01cc7ddac06797133e6fd13ec5b6592 100644
--- a/src/main/java/net/minecraft/server/Packet.java
+++ b/src/main/java/net/minecraft/server/Packet.java
@@ -11,6 +11,20 @@ public interface Packet<T extends PacketListener> {
     void a(T t0);
 
     // Paper start
+
+    /**
+     * @param player Null if not at PLAY stage yet
+     */
+    default void onPacketDispatch(@javax.annotation.Nullable EntityPlayer player) {}
+
+    /**
+     * @param player Null if not at PLAY stage yet
+     * @param future Can be null if packet was cancelled
+     */
+    default void onPacketDispatchFinish(@javax.annotation.Nullable EntityPlayer player, @javax.annotation.Nullable io.netty.channel.ChannelFuture future) {}
+    default boolean hasFinishListener() { return false; }
+    default boolean isReady() { return true; }
+    default java.util.List<Packet> getExtraPackets() { return null; }
     default boolean packetTooLarge(NetworkManager manager) {
         return false;
     }
diff --git a/src/main/java/net/minecraft/server/PlayerList.java b/src/main/java/net/minecraft/server/PlayerList.java
index 3cf4f3b88157e3c174146e5ee30052686b57c768..53bd078594797f0e879305c14fd0731f6d348e42 100644
--- a/src/main/java/net/minecraft/server/PlayerList.java
+++ b/src/main/java/net/minecraft/server/PlayerList.java
@@ -143,6 +143,7 @@ public abstract class PlayerList {
 
         // CraftBukkit - getType()
         // Spigot - view distance
+        networkmanager.queueImmunity = true; // Paper
         playerconnection.sendPacket(new PacketPlayOutLogin(entityplayer.getId(), entityplayer.playerInteractManager.getGameMode(), WorldData.c(worlddata.getSeed()), worlddata.isHardcore(), worldserver.worldProvider.getDimensionManager().getType(), this.getMaxPlayers(), worlddata.getType(), worldserver.spigotConfig.viewDistance, flag1, !flag));
         entityplayer.getBukkitEntity().sendSupportedChannels(); // CraftBukkit
         playerconnection.sendPacket(new PacketPlayOutCustomPayload(PacketPlayOutCustomPayload.a, (new PacketDataSerializer(Unpooled.buffer())).a(this.getServer().getServerModName())));
@@ -152,6 +153,7 @@ public abstract class PlayerList {
         playerconnection.sendPacket(new PacketPlayOutRecipeUpdate(this.server.getCraftingManager().b()));
         playerconnection.sendPacket(new PacketPlayOutTags(this.server.getTagRegistry()));
         playerconnection.sendPacket(new PacketPlayOutEntityStatus(entityplayer, (byte) (worldserver.getGameRules().getBoolean(GameRules.REDUCED_DEBUG_INFO) ? 22 : 23))); // Paper - fix this rule not being initialized on the client
+        networkmanager.queueImmunity = false; // Paper
         this.d(entityplayer);
         entityplayer.getStatisticManager().c();
         entityplayer.B().a(entityplayer);
diff --git a/src/main/java/net/minecraft/server/ServerConnection.java b/src/main/java/net/minecraft/server/ServerConnection.java
index 37a22ba6f7a2ac54759428d23d5ea9787bb557f7..06cd29bb9a5d6b67f896c129662c3f493238c758 100644
--- a/src/main/java/net/minecraft/server/ServerConnection.java
+++ b/src/main/java/net/minecraft/server/ServerConnection.java
@@ -45,6 +45,7 @@ public class ServerConnection {
         NetworkManager manager = null;
         while ((manager = pending.poll()) != null) {
             connectedChannels.add(manager);
+            manager.isPending = false;
         }
     }
     // Paper end