From 769b779562d48c9c3c0e0ddb770d1652a2ed0dca Mon Sep 17 00:00:00 2001 From: Janick Reynders Date: Sun, 15 Jan 2023 19:40:18 +0100 Subject: [PATCH] Replace synchronized blocks with a ReentrantLock. The synchronized blocks were removed and replaced by lock.lock() and lock.unlock(). This avoids that the carrier thread (OS thread) is pinned when running on virtual threads which were introduced as a preview feature in JDK 19. The code inside the replaced synchronized blocks in ConnectionImpl and ServerPreparedQuery performed IO, the ones in NativeSession didn't, but were replaced because ServerPreparedQuery synchronized on NativeSession objects. https://openjdk.org/jeps/425 `There are two scenarios in which a virtual thread cannot be unmounted during blocking operations because it is pinned to its carrier: When it executes code inside a synchronized block or method, or When it executes a native method or a foreign function.` See also https://bugs.mysql.com/bug.php?id=109346 --- .../java/com/mysql/cj/NativeSession.java | 34 +++++++++++++++---- .../com/mysql/cj/ServerPreparedQuery.java | 23 ++++++++++--- .../com/mysql/cj/jdbc/ConnectionImpl.java | 13 +++++-- 3 files changed, 58 insertions(+), 12 deletions(-) diff --git a/src/main/core-impl/java/com/mysql/cj/NativeSession.java b/src/main/core-impl/java/com/mysql/cj/NativeSession.java index bf2c7b4d6..87a9c36f8 100644 --- a/src/main/core-impl/java/com/mysql/cj/NativeSession.java +++ b/src/main/core-impl/java/com/mysql/cj/NativeSession.java @@ -43,6 +43,7 @@ import java.util.Properties; import java.util.Timer; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import com.mysql.cj.conf.HostInfo; @@ -103,6 +104,8 @@ public class NativeSession extends CoreSession implements Serializable { private transient Timer cancelTimer; + private final ReentrantLock lock = new ReentrantLock(); + public NativeSession(HostInfo hostInfo, PropertySet propSet) { super(hostInfo, propSet); } @@ -151,11 +154,14 @@ public void quit() { } } - synchronized (this) { + lock.lock(); + try { if (this.cancelTimer != null) { this.cancelTimer.cancel(); this.cancelTimer = null; } + } finally { + lock.unlock(); } this.isClosed = true; super.quit(); @@ -177,11 +183,14 @@ public void forceClose() { } //this.protocol = null; // TODO actually we shouldn't remove protocol instance because some it's methods can be called after closing socket } - synchronized (this) { + lock.lock(); + try { if (this.cancelTimer != null) { this.cancelTimer.cancel(); this.cancelTimer = null; } + } finally { + lock.unlock(); } this.isClosed = true; super.forceClose(); @@ -825,10 +834,23 @@ public String getIdentifierQuoteString() { return this.protocol != null && this.protocol.getServerSession().useAnsiQuotedIdentifiers() ? "\"" : "`"; } - public synchronized Timer getCancelTimer() { - if (this.cancelTimer == null) { - this.cancelTimer = new Timer("MySQL Statement Cancellation Timer", Boolean.TRUE); + public Timer getCancelTimer() { + lock.lock(); + try { + if (this.cancelTimer == null) { + this.cancelTimer = new Timer("MySQL Statement Cancellation Timer", Boolean.TRUE); + } + return this.cancelTimer; + } finally { + lock.unlock(); } - return this.cancelTimer; + } + + public void lock() { + lock.lock(); + } + + public void unlock() { + lock.unlock(); } } diff --git a/src/main/core-impl/java/com/mysql/cj/ServerPreparedQuery.java b/src/main/core-impl/java/com/mysql/cj/ServerPreparedQuery.java index 8f711c441..ea8f659b8 100644 --- a/src/main/core-impl/java/com/mysql/cj/ServerPreparedQuery.java +++ b/src/main/core-impl/java/com/mysql/cj/ServerPreparedQuery.java @@ -34,6 +34,7 @@ import java.io.InputStream; import java.io.Reader; import java.sql.Clob; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.conf.PropertyKey; import com.mysql.cj.conf.RuntimeProperty; @@ -93,6 +94,8 @@ public class ServerPreparedQuery extends ClientPreparedQuery { protected NativeMessageBuilder commandBuilder = null; + private final ReentrantLock lock = new ReentrantLock(); + public static ServerPreparedQuery getInstance(NativeSession sess) { if (sess.getPropertySet().getBooleanProperty(PropertyKey.autoGenerateTestcaseScript).getValue()) { return new ServerPreparedQueryTestcaseGenerator(sess); @@ -122,7 +125,8 @@ protected ServerPreparedQuery(NativeSession sess) { public void serverPrepare(String sql) throws IOException { this.session.checkClosed(); - synchronized (this.session) { + this.session.lock(); + try { long begin = this.profileSQL ? System.currentTimeMillis() : 0; NativePacketPayload prepareResultPacket = this.session.getProtocol() @@ -162,6 +166,8 @@ public void serverPrepare(String sql) throws IOException { if (fieldCount > 0) { this.resultFields = this.session.getProtocol().read(ColumnDefinition.class, new ColumnDefinitionFactory(fieldCount, null)); } + } finally { + this.session.unlock(); } } @@ -397,7 +403,8 @@ public T readExecuteResult(NativePacketPayload resultPacke * */ private void serverLongData(int parameterIndex, BindValue binding) { - synchronized (this) { + lock.lock(); + try { NativePacketPayload packet = this.session.getSharedSendPacket(); Object value = binding.getValue(); if (value instanceof byte[]) { @@ -429,6 +436,8 @@ private void serverLongData(int parameterIndex, BindValue binding) { throw ExceptionFactory.createException(WrongArgumentException.class, Messages.getString("ServerPreparedStatement.18") + value.getClass().getName() + "'", this.session.getExceptionInterceptor()); } + } finally { + lock.unlock(); } } @@ -472,7 +481,8 @@ private void storeStreamOrReader(int parameterIndex, NativePacketPayload packet, char[] cBuf = null; String clobEncoding = null; - synchronized (this.session) { + this.session.lock(); + try { if (isStream) { bBuf = new byte[BLOB_STREAM_READ_BUF_SIZE]; } else { @@ -544,6 +554,8 @@ private void storeStreamOrReader(int parameterIndex, NativePacketPayload packet, } } } + } finally { + this.session.unlock(); } } @@ -566,13 +578,16 @@ public void clearParameters(boolean clearServerParameters) { public void serverResetStatement() { this.session.checkClosed(); - synchronized (this.session) { + this.session.lock(); + try { try { this.session.getProtocol().sendCommand(this.commandBuilder.buildComStmtReset(this.session.getSharedSendPacket(), this.serverStatementId), false, 0); } finally { this.session.clearInputStream(); } + } finally { + this.session.unlock(); } } diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java index 2ac7357fb..c81d23fa8 100644 --- a/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ConnectionImpl.java @@ -54,6 +54,7 @@ import java.util.Stack; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import com.mysql.cj.CacheAdapter; @@ -296,6 +297,7 @@ private static boolean nullSafeCompare(String s1, String s2) { private final CopyOnWriteArrayList openStatements = new CopyOnWriteArrayList<>(); private LRUCache parsedCallableStatementCache; + private final ReentrantLock parsedCallableStatementCacheLock = new ReentrantLock(); /** The password we used */ private String password = null; @@ -320,6 +322,7 @@ private static boolean nullSafeCompare(String s1, String s2) { private LRUCache serverSideStatementCheckCache; private LRUCache serverSideStatementCache; + private ReentrantLock serverSideStatementCacheLock = new ReentrantLock(); private HostInfo origHostInfo; @@ -1520,7 +1523,8 @@ public java.sql.CallableStatement prepareCall(String sql, int resultSetType, int cStmt = parseCallableStatement(sql); } else { - synchronized (this.parsedCallableStatementCache) { + this.parsedCallableStatementCacheLock.lock(); + try { CompoundCacheKey key = new CompoundCacheKey(getDatabase(), sql); CallableStatement.CallableStatementParamInfo cachedParamInfo = this.parsedCallableStatementCache.get(key); @@ -1536,6 +1540,8 @@ public java.sql.CallableStatement prepareCall(String sql, int resultSetType, int this.parsedCallableStatementCache.put(key, cachedParamInfo); } + } finally { + this.parsedCallableStatementCacheLock.unlock(); } } @@ -1592,7 +1598,8 @@ public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType if (this.useServerPrepStmts.getValue() && canServerPrepare) { if (this.cachePrepStmts.getValue()) { - synchronized (this.serverSideStatementCache) { + this.serverSideStatementCacheLock.lock(); + try { pStmt = this.serverSideStatementCache.remove(new CompoundCacheKey(this.database, sql)); if (pStmt != null) { @@ -1623,6 +1630,8 @@ public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType } } } + } finally { + this.serverSideStatementCacheLock.unlock(); } } else { try {