From 818878650e65d3ab246d8cd1b5d479537e419e66 Mon Sep 17 00:00:00 2001 From: bdeneuter Date: Fri, 24 Mar 2023 14:31:09 +0100 Subject: [PATCH] Replace synchronized with ReentrantLock Replace connectionMutex and synchronized blocks with a ReentrantLock. The synchronized blocks were removed and replaced by `lock.lock()` and `lock.unlock()`. This avoids that the carrier thread (OS thread) is pinned when running on virtual threads which were introduced as a preview feature in JDK 19. The JEP that is being prepared for JDK 21: https://openjdk.org/jeps/8303683 `There are two scenarios in which a virtual thread cannot be unmounted during blocking operations because it is pinned to its carrier: When it executes code inside a synchronized block or method, or When it executes a native method or a foreign function.` --- .../com/mysql/cj/CacheAdapterFactory.java | 4 +- .../java/com/mysql/cj/MysqlConnection.java | 3 +- .../com/mysql/cj/PerConnectionLRUFactory.java | 32 +- .../core-api/java/com/mysql/cj/Query.java | 3 +- .../core-api/java/com/mysql/cj/Session.java | 3 + .../mysql/cj/protocol/ResultsetRowsOwner.java | 4 +- .../com/mysql/cj/util/EscapeTokenizer.java | 171 ++--- .../util/PerVmServerConfigCacheFactory.java | 3 +- .../java/com/mysql/cj/util/TimeUtil.java | 7 +- .../java/com/mysql/cj/AbstractQuery.java | 17 +- .../com/mysql/cj/CancelQueryTaskImpl.java | 5 +- .../java/com/mysql/cj/CoreSession.java | 12 +- .../java/com/mysql/cj/NativeSession.java | 33 +- .../com/mysql/cj/ServerPreparedQuery.java | 20 +- .../cj/protocol/NamedPipeSocketFactory.java | 13 +- .../cj/protocol/ReadAheadInputStream.java | 60 +- .../mysql/cj/result/SqlDateValueFactory.java | 9 +- .../mysql/cj/result/SqlTimeValueFactory.java | 14 +- .../cj/result/SqlTimestampValueFactory.java | 28 +- .../cj/protocol/a/NativeMessageBuilder.java | 7 +- .../cj/protocol/a/result/NativeResultset.java | 33 +- .../a/result/ResultsetRowsCursor.java | 7 +- .../a/result/ResultsetRowsStreaming.java | 10 +- .../cj/protocol/x/SyncMessageReader.java | 35 +- .../cj/protocol/x/SyncMessageSender.java | 13 +- .../com/mysql/cj/jdbc/JdbcConnection.java | 3 + .../java/com/mysql/cj/jdbc/Blob.java | 282 +++++--- .../com/mysql/cj/jdbc/CallableStatement.java | 479 ++++++++++--- .../cj/jdbc/CallableStatementWrapper.java | 5 +- .../mysql/cj/jdbc/ClientInfoProviderSP.java | 137 ++-- .../cj/jdbc/ClientPreparedStatement.java | 627 +++++++++++------ .../com/mysql/cj/jdbc/ConnectionGroup.java | 83 ++- .../com/mysql/cj/jdbc/ConnectionImpl.java | 403 ++++++++--- .../com/mysql/cj/jdbc/ConnectionWrapper.java | 37 +- .../com/mysql/cj/jdbc/DatabaseMetaData.java | 8 +- .../jdbc/DatabaseMetaDataUsingInfoSchema.java | 11 +- .../mysql/cj/jdbc/JdbcPropertySetImpl.java | 3 + .../jdbc/MysqlConnectionPoolDataSource.java | 26 +- .../mysql/cj/jdbc/MysqlPooledConnection.java | 154 +++-- .../com/mysql/cj/jdbc/MysqlXAConnection.java | 11 +- .../cj/jdbc/PreparedStatementWrapper.java | 34 +- .../cj/jdbc/ServerPreparedStatement.java | 176 +++-- .../java/com/mysql/cj/jdbc/StatementImpl.java | 518 +++++++++----- .../com/mysql/cj/jdbc/StatementWrapper.java | 7 +- .../cj/jdbc/SuspendableXAConnection.java | 79 ++- .../cj/jdbc/ha/FailoverConnectionProxy.java | 283 +++++--- .../jdbc/ha/LoadBalancedConnectionProxy.java | 639 +++++++++++------- .../cj/jdbc/ha/MultiHostConnectionProxy.java | 50 +- .../cj/jdbc/ha/MultiHostMySQLConnection.java | 19 +- .../jdbc/ha/ReplicationConnectionGroup.java | 1 - .../jdbc/ha/ReplicationConnectionProxy.java | 450 +++++++----- .../jdbc/ha/ReplicationMySQLConnection.java | 113 ++-- .../mysql/cj/jdbc/result/ResultSetImpl.java | 276 ++++++-- .../cj/jdbc/result/UpdatableResultSet.java | 201 ++++-- .../com/mysql/cj/jdbc/util/BaseBugReport.java | 49 +- .../java/com/mysql/cj/xdevapi/ClientImpl.java | 43 +- .../regression/ConnectionRegressionTest.java | 2 + .../regression/DataSourceRegressionTest.java | 2 +- .../PooledConnectionRegressionTest.java | 1 - .../regression/StatementRegressionTest.java | 3 +- 60 files changed, 3953 insertions(+), 1808 deletions(-) diff --git a/src/main/core-api/java/com/mysql/cj/CacheAdapterFactory.java b/src/main/core-api/java/com/mysql/cj/CacheAdapterFactory.java index 46048525b..e4762cbbe 100644 --- a/src/main/core-api/java/com/mysql/cj/CacheAdapterFactory.java +++ b/src/main/core-api/java/com/mysql/cj/CacheAdapterFactory.java @@ -29,8 +29,10 @@ package com.mysql.cj; +import java.util.concurrent.locks.ReentrantLock; + public interface CacheAdapterFactory { - CacheAdapter getInstance(Object syncMutex, String url, int cacheMaxSize, int maxKeySize); + CacheAdapter getInstance(ReentrantLock syncMutex, String url, int cacheMaxSize, int maxKeySize); } diff --git a/src/main/core-api/java/com/mysql/cj/MysqlConnection.java b/src/main/core-api/java/com/mysql/cj/MysqlConnection.java index 5b1dfe190..a680821ab 100644 --- a/src/main/core-api/java/com/mysql/cj/MysqlConnection.java +++ b/src/main/core-api/java/com/mysql/cj/MysqlConnection.java @@ -30,6 +30,7 @@ package com.mysql.cj; import java.util.Properties; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.conf.PropertySet; import com.mysql.cj.exceptions.ExceptionInterceptor; @@ -56,7 +57,7 @@ public interface MysqlConnection { */ Properties getProperties(); - Object getConnectionMutex(); + ReentrantLock getConnectionMutex(); Session getSession(); diff --git a/src/main/core-api/java/com/mysql/cj/PerConnectionLRUFactory.java b/src/main/core-api/java/com/mysql/cj/PerConnectionLRUFactory.java index 1f36d1805..9bd0fcff6 100644 --- a/src/main/core-api/java/com/mysql/cj/PerConnectionLRUFactory.java +++ b/src/main/core-api/java/com/mysql/cj/PerConnectionLRUFactory.java @@ -30,12 +30,13 @@ package com.mysql.cj; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.util.LRUCache; public class PerConnectionLRUFactory implements CacheAdapterFactory { - public CacheAdapter getInstance(Object syncMutex, String url, int cacheMaxSize, int maxKeySize) { + public CacheAdapter getInstance(ReentrantLock syncMutex, String url, int cacheMaxSize, int maxKeySize) { return new PerConnectionLRU(syncMutex, cacheMaxSize, maxKeySize); } @@ -43,9 +44,9 @@ public CacheAdapter getInstance(Object syncMutex, String url, class PerConnectionLRU implements CacheAdapter { private final int cacheSqlLimit; private final LRUCache cache; - private final Object syncMutex; + private final ReentrantLock syncMutex; - protected PerConnectionLRU(Object syncMutex, int cacheMaxSize, int maxKeySize) { + protected PerConnectionLRU(ReentrantLock syncMutex, int cacheMaxSize, int maxKeySize) { final int cacheSize = cacheMaxSize; this.cacheSqlLimit = maxKeySize; this.cache = new LRUCache<>(cacheSize); @@ -57,8 +58,11 @@ public QueryInfo get(String key) { return null; } - synchronized (this.syncMutex) { + this.syncMutex.lock(); + try { return this.cache.get(key); + } finally { + this.syncMutex.unlock(); } } @@ -67,29 +71,41 @@ public void put(String key, QueryInfo value) { return; } - synchronized (this.syncMutex) { + this.syncMutex.lock(); + try { this.cache.put(key, value); + } finally { + this.syncMutex.unlock(); } } public void invalidate(String key) { - synchronized (this.syncMutex) { + this.syncMutex.lock(); + try { this.cache.remove(key); + } finally { + this.syncMutex.unlock(); } } public void invalidateAll(Set keys) { - synchronized (this.syncMutex) { + this.syncMutex.lock(); + try { for (String key : keys) { this.cache.remove(key); } + } finally { + this.syncMutex.unlock(); } } public void invalidateAll() { - synchronized (this.syncMutex) { + this.syncMutex.lock(); + try { this.cache.clear(); + } finally { + this.syncMutex.unlock(); } } } diff --git a/src/main/core-api/java/com/mysql/cj/Query.java b/src/main/core-api/java/com/mysql/cj/Query.java index 6e1f1a1e8..c912e6860 100644 --- a/src/main/core-api/java/com/mysql/cj/Query.java +++ b/src/main/core-api/java/com/mysql/cj/Query.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.protocol.Message; import com.mysql.cj.protocol.ProtocolEntityFactory; @@ -57,7 +58,7 @@ public enum CancelStatus { Session getSession(); - Object getCancelTimeoutMutex(); + ReentrantLock getCancelTimeoutMutex(); void resetCancelledState(); diff --git a/src/main/core-api/java/com/mysql/cj/Session.java b/src/main/core-api/java/com/mysql/cj/Session.java index 09c522c57..d775d6367 100644 --- a/src/main/core-api/java/com/mysql/cj/Session.java +++ b/src/main/core-api/java/com/mysql/cj/Session.java @@ -31,6 +31,7 @@ import java.net.SocketAddress; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collector; @@ -56,6 +57,8 @@ */ public interface Session { + ReentrantLock getSessionMutex(); + PropertySet getPropertySet(); MessageBuilder getMessageBuilder(); diff --git a/src/main/core-api/java/com/mysql/cj/protocol/ResultsetRowsOwner.java b/src/main/core-api/java/com/mysql/cj/protocol/ResultsetRowsOwner.java index 3cd462839..c715dd5e7 100644 --- a/src/main/core-api/java/com/mysql/cj/protocol/ResultsetRowsOwner.java +++ b/src/main/core-api/java/com/mysql/cj/protocol/ResultsetRowsOwner.java @@ -33,6 +33,8 @@ import com.mysql.cj.Query; import com.mysql.cj.Session; +import java.util.concurrent.locks.ReentrantLock; + public interface ResultsetRowsOwner { void closeOwner(boolean calledExplicitly); @@ -41,7 +43,7 @@ public interface ResultsetRowsOwner { Session getSession(); - Object getSyncMutex(); + ReentrantLock getSyncMutex(); /** * StackTrace generated where ResultSet was created... used when profiling diff --git a/src/main/core-api/java/com/mysql/cj/util/EscapeTokenizer.java b/src/main/core-api/java/com/mysql/cj/util/EscapeTokenizer.java index 48af210b2..306d43bbe 100644 --- a/src/main/core-api/java/com/mysql/cj/util/EscapeTokenizer.java +++ b/src/main/core-api/java/com/mysql/cj/util/EscapeTokenizer.java @@ -29,6 +29,8 @@ package com.mysql.cj.util; +import java.util.concurrent.locks.ReentrantLock; + /** * EscapeTokenizer breaks up an SQL statement into SQL and escape code parts. */ @@ -52,6 +54,7 @@ public class EscapeTokenizer { private int bracesLevel = 0; private boolean inQuotes = false; private char quoteChar = 0; + private final ReentrantLock objectLock = new ReentrantLock(); /** * Creates a new EscapeTokenizer object. @@ -70,8 +73,13 @@ public EscapeTokenizer(String source) { * * @return if this tokenizer has more tokens available */ - public synchronized boolean hasMoreTokens() { - return (this.pos < this.sourceLength); + public boolean hasMoreTokens() { + objectLock.lock(); + try { + return (this.pos < this.sourceLength); + } finally { + objectLock.unlock(); + } } /** @@ -79,103 +87,108 @@ public synchronized boolean hasMoreTokens() { * * @return the next token. */ - public synchronized String nextToken() { - StringBuilder tokenBuf = new StringBuilder(); - boolean backslashEscape = false; - - if (this.emittingEscapeCode) { - // Previous token ended at the beginning of an escape code, so this token must start with '{' - tokenBuf.append("{"); - this.emittingEscapeCode = false; - } - - for (; this.pos < this.sourceLength; this.pos++) { - char c = this.source.charAt(this.pos); - - // process escape char: (\) - if (c == CHR_ESCAPE) { - tokenBuf.append(c); - backslashEscape = !backslashEscape; - continue; + public String nextToken() { + objectLock.lock(); + try { + StringBuilder tokenBuf = new StringBuilder(); + boolean backslashEscape = false; + + if (this.emittingEscapeCode) { + // Previous token ended at the beginning of an escape code, so this token must start with '{' + tokenBuf.append("{"); + this.emittingEscapeCode = false; } - // process quotes: ('|") - if ((c == CHR_SGL_QUOTE || c == CHR_DBL_QUOTE) && !backslashEscape) { - tokenBuf.append(c); - if (this.inQuotes) { - if (c == this.quoteChar) { - // look ahead for doubled quote - if ((this.pos + 1 < this.sourceLength) && (this.source.charAt(this.pos + 1) == this.quoteChar)) { - tokenBuf.append(c); - this.pos++; // consume following char '\'' or '"' - } else { - this.inQuotes = false; - } - } - } else { - this.inQuotes = true; - this.quoteChar = c; - } - continue; - } + for (; this.pos < this.sourceLength; this.pos++) { + char c = this.source.charAt(this.pos); - // process new line: (\n|\r) - if ((c == CHR_LF) || (c == CHR_CR)) { - tokenBuf.append(c); - backslashEscape = false; - continue; - } + // process escape char: (\) + if (c == CHR_ESCAPE) { + tokenBuf.append(c); + backslashEscape = !backslashEscape; + continue; + } - if (!this.inQuotes && !backslashEscape) { - // process comments: (--) - if (c == CHR_COMMENT) { + // process quotes: ('|") + if ((c == CHR_SGL_QUOTE || c == CHR_DBL_QUOTE) && !backslashEscape) { tokenBuf.append(c); - // look ahead for double hyphen - if ((this.pos + 1 < this.sourceLength) && (this.source.charAt(this.pos + 1) == CHR_COMMENT)) { - // consume following chars until new line or end of string - while (++this.pos < this.sourceLength && c != CHR_LF && c != CHR_CR) { - c = this.source.charAt(this.pos); - tokenBuf.append(c); + if (this.inQuotes) { + if (c == this.quoteChar) { + // look ahead for doubled quote + if ((this.pos + 1 < this.sourceLength) && (this.source.charAt(this.pos + 1) == this.quoteChar)) { + tokenBuf.append(c); + this.pos++; // consume following char '\'' or '"' + } else { + this.inQuotes = false; + } } - this.pos--; + } else { + this.inQuotes = true; + this.quoteChar = c; } continue; } - // process begin token: ({) - if (c == CHR_BEGIN_TOKEN) { - this.bracesLevel++; - if (this.bracesLevel == 1) { - this.emittingEscapeCode = true; - this.pos++; // consume char '{' before returning - return tokenBuf.toString(); - } + // process new line: (\n|\r) + if ((c == CHR_LF) || (c == CHR_CR)) { tokenBuf.append(c); + backslashEscape = false; continue; } - // process end token: (}) - if (c == CHR_END_TOKEN) { - tokenBuf.append(c); - this.bracesLevel--; - if (this.bracesLevel == 0) { - this.pos++; // consume char '}' before returning - return tokenBuf.toString(); + if (!this.inQuotes && !backslashEscape) { + // process comments: (--) + if (c == CHR_COMMENT) { + tokenBuf.append(c); + // look ahead for double hyphen + if ((this.pos + 1 < this.sourceLength) && (this.source.charAt(this.pos + 1) == CHR_COMMENT)) { + // consume following chars until new line or end of string + while (++this.pos < this.sourceLength && c != CHR_LF && c != CHR_CR) { + c = this.source.charAt(this.pos); + tokenBuf.append(c); + } + this.pos--; + } + continue; + } + + // process begin token: ({) + if (c == CHR_BEGIN_TOKEN) { + this.bracesLevel++; + if (this.bracesLevel == 1) { + this.emittingEscapeCode = true; + this.pos++; // consume char '{' before returning + return tokenBuf.toString(); + } + tokenBuf.append(c); + continue; } - continue; - } - // detect variable usage: (@) - if (c == CHR_VARIABLE) { - this.sawVariableUse = true; + // process end token: (}) + if (c == CHR_END_TOKEN) { + tokenBuf.append(c); + this.bracesLevel--; + if (this.bracesLevel == 0) { + this.pos++; // consume char '}' before returning + return tokenBuf.toString(); + } + continue; + } + + // detect variable usage: (@) + if (c == CHR_VARIABLE) { + this.sawVariableUse = true; + } } + + tokenBuf.append(c); + backslashEscape = false; } - tokenBuf.append(c); - backslashEscape = false; + return tokenBuf.toString(); + } finally { + objectLock.unlock(); } - - return tokenBuf.toString(); } /** diff --git a/src/main/core-api/java/com/mysql/cj/util/PerVmServerConfigCacheFactory.java b/src/main/core-api/java/com/mysql/cj/util/PerVmServerConfigCacheFactory.java index 241daf0dd..50b3c558b 100644 --- a/src/main/core-api/java/com/mysql/cj/util/PerVmServerConfigCacheFactory.java +++ b/src/main/core-api/java/com/mysql/cj/util/PerVmServerConfigCacheFactory.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.CacheAdapter; import com.mysql.cj.CacheAdapterFactory; @@ -64,7 +65,7 @@ public void invalidateAll() { } }; - public CacheAdapter> getInstance(Object syncMutex, String url, int cacheMaxSize, int maxKeySize) { + public CacheAdapter> getInstance(ReentrantLock syncMutex, String url, int cacheMaxSize, int maxKeySize) { return serverConfigCache; } } diff --git a/src/main/core-api/java/com/mysql/cj/util/TimeUtil.java b/src/main/core-api/java/com/mysql/cj/util/TimeUtil.java index 352994ab5..54a0bce05 100644 --- a/src/main/core-api/java/com/mysql/cj/util/TimeUtil.java +++ b/src/main/core-api/java/com/mysql/cj/util/TimeUtil.java @@ -46,6 +46,7 @@ import java.util.Locale; import java.util.Properties; import java.util.TimeZone; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import com.mysql.cj.Messages; @@ -60,6 +61,7 @@ */ public class TimeUtil { static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT"); + static final ReentrantLock OBJECT_LOCK = new ReentrantLock(); public static final LocalDate DEFAULT_DATE = LocalDate.of(1970, 1, 1); public static final LocalTime DEFAULT_TIME = LocalTime.of(0, 0); @@ -164,10 +166,13 @@ public static String getCanonicalTimeZone(String timezoneStr, ExceptionIntercept } } - synchronized (TimeUtil.class) { + OBJECT_LOCK.lock(); + try { if (timeZoneMappings == null) { loadTimeZoneMappings(exceptionInterceptor); } + } finally { + OBJECT_LOCK.unlock(); } String canonicalTz; diff --git a/src/main/core-impl/java/com/mysql/cj/AbstractQuery.java b/src/main/core-impl/java/com/mysql/cj/AbstractQuery.java index b4505ec76..2b768bb61 100644 --- a/src/main/core-impl/java/com/mysql/cj/AbstractQuery.java +++ b/src/main/core-impl/java/com/mysql/cj/AbstractQuery.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.conf.PropertyKey; import com.mysql.cj.conf.RuntimeProperty; @@ -61,7 +62,7 @@ public abstract class AbstractQuery implements Query { protected String charEncoding = null; /** Mutex to prevent race between returning query results and noticing that query has been timed-out or cancelled. */ - protected Object cancelTimeoutMutex = new Object(); + protected ReentrantLock cancelTimeoutMutex = new ReentrantLock(); private CancelStatus cancelStatus = CancelStatus.NOT_CANCELED; @@ -92,6 +93,8 @@ public abstract class AbstractQuery implements Query { /** Query attributes bindings */ protected QueryAttributesBindings queryAttributesBindings; + protected final ReentrantLock objectLock = new ReentrantLock(); + public AbstractQuery(NativeSession sess) { statementCounter++; this.session = sess; @@ -122,18 +125,24 @@ public void setExecuteTime(long executeTime) { @Override public void checkCancelTimeout() { - synchronized (this.cancelTimeoutMutex) { + this.cancelTimeoutMutex.lock(); + try { if (this.cancelStatus != CancelStatus.NOT_CANCELED) { CJException cause = this.cancelStatus == CancelStatus.CANCELED_BY_TIMEOUT ? new CJTimeoutException() : new OperationCancelledException(); resetCancelledState(); throw cause; } + } finally { + this.cancelTimeoutMutex.unlock(); } } public void resetCancelledState() { - synchronized (this.cancelTimeoutMutex) { + this.cancelTimeoutMutex.lock(); + try { this.cancelStatus = CancelStatus.NOT_CANCELED; + } finally { + this.cancelTimeoutMutex.unlock(); } } @@ -149,7 +158,7 @@ public NativeSession getSession() { } @Override - public Object getCancelTimeoutMutex() { + public ReentrantLock getCancelTimeoutMutex() { return this.cancelTimeoutMutex; } diff --git a/src/main/core-impl/java/com/mysql/cj/CancelQueryTaskImpl.java b/src/main/core-impl/java/com/mysql/cj/CancelQueryTaskImpl.java index 24c5e6ecc..b75de908c 100644 --- a/src/main/core-impl/java/com/mysql/cj/CancelQueryTaskImpl.java +++ b/src/main/core-impl/java/com/mysql/cj/CancelQueryTaskImpl.java @@ -84,7 +84,8 @@ public void run() { localQueryToCancel.setCancelStatus(CancelStatus.CANCELED_BY_TIMEOUT); session.invokeCleanupListeners(new OperationCancelledException(Messages.getString("Statement.ConnectionKilledDueToTimeout"))); } else { - synchronized (localQueryToCancel.getCancelTimeoutMutex()) { + localQueryToCancel.getCancelTimeoutMutex().lock(); + try { long origConnId = session.getThreadId(); HostInfo hostInfo = session.getHostInfo(); String database = hostInfo.getDatabase(); @@ -112,6 +113,8 @@ public void transactionBegun() { } } localQueryToCancel.setCancelStatus(CancelStatus.CANCELED_BY_TIMEOUT); + } finally { + localQueryToCancel.getCancelTimeoutMutex().unlock(); } } // } catch (NullPointerException npe) { diff --git a/src/main/core-impl/java/com/mysql/cj/CoreSession.java b/src/main/core-impl/java/com/mysql/cj/CoreSession.java index 15934eae9..e90e60be7 100644 --- a/src/main/core-impl/java/com/mysql/cj/CoreSession.java +++ b/src/main/core-impl/java/com/mysql/cj/CoreSession.java @@ -30,6 +30,7 @@ package com.mysql.cj; import java.net.SocketAddress; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.conf.HostInfo; import com.mysql.cj.conf.PropertyKey; @@ -78,6 +79,7 @@ public abstract class CoreSession implements Session { /** The event sink to use for profiling */ private ProfilerEventHandler eventSink; + private final ReentrantLock sessionMutex = new ReentrantLock(); public CoreSession(HostInfo hostInfo, PropertySet propSet) { this.connectionCreationTimeMillis = System.currentTimeMillis(); @@ -95,6 +97,11 @@ public CoreSession(HostInfo hostInfo, PropertySet propSet) { this.log = LogFactory.getLogger(getPropertySet().getStringProperty(PropertyKey.logger).getStringValue(), Log.LOGGER_INSTANCE_NAME); } + @Override + public ReentrantLock getSessionMutex() { + return sessionMutex; + } + @Override public void changeUser(String user, String password, String database) { // reset maxRows to default value @@ -171,7 +178,8 @@ public boolean isSetNeededForAutoCommitMode(boolean autoCommitFlag) { @Override public ProfilerEventHandler getProfilerEventHandler() { if (this.eventSink == null) { - synchronized (this) { + this.sessionMutex.lock(); + try { if (this.eventSink == null) { // check again to ensure that other thread didn't set it already this.eventSink = (ProfilerEventHandler) Util.getInstance( this.propertySet.getStringProperty(PropertyKey.profilerEventHandler).getStringValue(), new Class[0], new Object[0], @@ -179,6 +187,8 @@ public ProfilerEventHandler getProfilerEventHandler() { this.eventSink.init(this.log); } + } finally { + this.sessionMutex.unlock(); } } return this.eventSink; diff --git a/src/main/core-impl/java/com/mysql/cj/NativeSession.java b/src/main/core-impl/java/com/mysql/cj/NativeSession.java index bf2c7b4d6..5d1c73b8f 100644 --- a/src/main/core-impl/java/com/mysql/cj/NativeSession.java +++ b/src/main/core-impl/java/com/mysql/cj/NativeSession.java @@ -43,6 +43,7 @@ import java.util.Properties; import java.util.Timer; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import com.mysql.cj.conf.HostInfo; @@ -151,11 +152,14 @@ public void quit() { } } - synchronized (this) { + this.getSessionMutex().lock(); + try { if (this.cancelTimer != null) { this.cancelTimer.cancel(); this.cancelTimer = null; } + } finally { + this.getSessionMutex().unlock(); } this.isClosed = true; super.quit(); @@ -177,11 +181,14 @@ public void forceClose() { } //this.protocol = null; // TODO actually we shouldn't remove protocol instance because some it's methods can be called after closing socket } - synchronized (this) { + this.getSessionMutex().lock(); + try { if (this.cancelTimer != null) { this.cancelTimer.cancel(); this.cancelTimer = null; } + } finally { + this.getSessionMutex().unlock(); } this.isClosed = true; super.forceClose(); @@ -317,8 +324,9 @@ public void setLocalInfileInputStream(InputStream stream) { this.protocol.setLocalInfileInputStream(stream); } - private void createConfigCacheIfNeeded(Object syncMutex) { - synchronized (syncMutex) { + private void createConfigCacheIfNeeded(ReentrantLock syncMutex) { + syncMutex.lock(); + try { if (this.serverConfigCache != null) { return; } @@ -364,6 +372,8 @@ public Exception interceptException(Exception sqlEx) { new Object[] { getPropertySet().getStringProperty(PropertyKey.queryInfoCacheFactory).getValue(), PropertyKey.queryInfoCacheFactory }), e, getExceptionInterceptor()); } + } finally { + syncMutex.unlock(); } } @@ -379,7 +389,7 @@ public Exception interceptException(Exception sqlEx) { * @param version * driver version string */ - public void loadServerVariables(Object syncMutex, String version) { + public void loadServerVariables(ReentrantLock syncMutex, String version) { if (this.cacheServerConfiguration.getValue()) { createConfigCacheIfNeeded(syncMutex); @@ -825,10 +835,15 @@ public String getIdentifierQuoteString() { return this.protocol != null && this.protocol.getServerSession().useAnsiQuotedIdentifiers() ? "\"" : "`"; } - public synchronized Timer getCancelTimer() { - if (this.cancelTimer == null) { - this.cancelTimer = new Timer("MySQL Statement Cancellation Timer", Boolean.TRUE); + public Timer getCancelTimer() { + this.getSessionMutex().lock(); + try { + if (this.cancelTimer == null) { + this.cancelTimer = new Timer("MySQL Statement Cancellation Timer", Boolean.TRUE); + } + return this.cancelTimer; + } finally { + this.getSessionMutex().unlock(); } - return this.cancelTimer; } } diff --git a/src/main/core-impl/java/com/mysql/cj/ServerPreparedQuery.java b/src/main/core-impl/java/com/mysql/cj/ServerPreparedQuery.java index 20b32ea0d..9c824f1eb 100644 --- a/src/main/core-impl/java/com/mysql/cj/ServerPreparedQuery.java +++ b/src/main/core-impl/java/com/mysql/cj/ServerPreparedQuery.java @@ -122,7 +122,8 @@ protected ServerPreparedQuery(NativeSession sess) { public void serverPrepare(String sql) throws IOException { this.session.checkClosed(); - synchronized (this.session) { + this.session.getSessionMutex().lock(); + try { long begin = this.profileSQL ? System.currentTimeMillis() : 0; NativePacketPayload prepareResultPacket = this.session.getProtocol() @@ -164,6 +165,8 @@ public void serverPrepare(String sql) throws IOException { if (checkEOF && this.session.getProtocol().probeMessage(null).isEOFPacket()) { // Skip the following EOF packet. this.session.getProtocol().skipPacket(); } + } finally { + this.session.getSessionMutex().unlock(); } } @@ -399,7 +402,8 @@ public T readExecuteResult(NativePacketPayload resultPacke * */ private void serverLongData(int parameterIndex, BindValue binding) { - synchronized (this) { + this.objectLock.lock(); + try { NativePacketPayload packet = this.session.getSharedSendPacket(); Object value = binding.getValue(); if (value instanceof byte[]) { @@ -431,6 +435,8 @@ private void serverLongData(int parameterIndex, BindValue binding) { throw ExceptionFactory.createException(WrongArgumentException.class, Messages.getString("ServerPreparedStatement.18") + value.getClass().getName() + "'", this.session.getExceptionInterceptor()); } + } finally { + this.objectLock.unlock(); } } @@ -474,7 +480,8 @@ private void storeStreamOrReader(int parameterIndex, NativePacketPayload packet, char[] cBuf = null; String clobEncoding = null; - synchronized (this.session) { + this.session.getSessionMutex().lock(); + try { if (isStream) { bBuf = new byte[BLOB_STREAM_READ_BUF_SIZE]; } else { @@ -546,6 +553,8 @@ private void storeStreamOrReader(int parameterIndex, NativePacketPayload packet, } } } + } finally { + this.session.getSessionMutex().unlock(); } } @@ -568,13 +577,16 @@ public void clearParameters(boolean clearServerParameters) { public void serverResetStatement() { this.session.checkClosed(); - synchronized (this.session) { + this.session.getSessionMutex().lock(); + try { try { this.session.getProtocol().sendCommand(this.commandBuilder.buildComStmtReset(this.session.getSharedSendPacket(), this.serverStatementId), false, 0); } finally { this.session.clearInputStream(); } + } finally { + this.session.getSessionMutex().unlock(); } } diff --git a/src/main/core-impl/java/com/mysql/cj/protocol/NamedPipeSocketFactory.java b/src/main/core-impl/java/com/mysql/cj/protocol/NamedPipeSocketFactory.java index 1c5f077ac..563ebd843 100644 --- a/src/main/core-impl/java/com/mysql/cj/protocol/NamedPipeSocketFactory.java +++ b/src/main/core-impl/java/com/mysql/cj/protocol/NamedPipeSocketFactory.java @@ -38,6 +38,7 @@ import java.net.Socket; import java.net.SocketException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.Messages; import com.mysql.cj.Session; @@ -59,6 +60,7 @@ class NamedPipeSocket extends Socket { private boolean isClosed = false; private RandomAccessFile namedPipeFile; + private final ReentrantLock objectLock = new ReentrantLock(); NamedPipeSocket(String filePath, int timeout) throws IOException { if ((filePath == null) || (filePath.length() == 0)) { @@ -92,9 +94,14 @@ class NamedPipeSocket extends Socket { * @see java.net.Socket#close() */ @Override - public synchronized void close() throws IOException { - this.namedPipeFile.close(); - this.isClosed = true; + public void close() throws IOException { + objectLock.lock(); + try { + this.namedPipeFile.close(); + this.isClosed = true; + } finally { + objectLock.unlock(); + } } /** diff --git a/src/main/core-impl/java/com/mysql/cj/protocol/ReadAheadInputStream.java b/src/main/core-impl/java/com/mysql/cj/protocol/ReadAheadInputStream.java index 4c939f6a9..0d409de38 100644 --- a/src/main/core-impl/java/com/mysql/cj/protocol/ReadAheadInputStream.java +++ b/src/main/core-impl/java/com/mysql/cj/protocol/ReadAheadInputStream.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.log.Log; @@ -54,6 +55,8 @@ public class ReadAheadInputStream extends InputStream { protected Log log; + private final ReentrantLock objectLock = new ReentrantLock(); + private void fill(int readAtLeastTheseManyBytes) throws IOException { checkClosed(); @@ -166,42 +169,47 @@ private int readFromUnderlyingStreamIfNecessary(byte[] b, int off, int len) thro } @Override - public synchronized int read(byte b[], int off, int len) throws IOException { - checkClosed(); // Check for closed stream - if ((off | len | (off + len) | (b.length - (off + len))) < 0) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return 0; - } + public int read(byte b[], int off, int len) throws IOException { + objectLock.lock(); + try { + checkClosed(); // Check for closed stream + if ((off | len | (off + len) | (b.length - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + int totalBytesRead = 0; - int totalBytesRead = 0; + while (true) { + int bytesReadThisRound = readFromUnderlyingStreamIfNecessary(b, off + totalBytesRead, len - totalBytesRead); - while (true) { - int bytesReadThisRound = readFromUnderlyingStreamIfNecessary(b, off + totalBytesRead, len - totalBytesRead); + // end-of-stream? + if (bytesReadThisRound <= 0) { + if (totalBytesRead == 0) { + totalBytesRead = bytesReadThisRound; + } - // end-of-stream? - if (bytesReadThisRound <= 0) { - if (totalBytesRead == 0) { - totalBytesRead = bytesReadThisRound; + break; } - break; - } + totalBytesRead += bytesReadThisRound; - totalBytesRead += bytesReadThisRound; + // Read _at_least_ enough bytes + if (totalBytesRead >= len) { + break; + } - // Read _at_least_ enough bytes - if (totalBytesRead >= len) { - break; + // Nothing to read? + if (this.underlyingStream.available() <= 0) { + break; + } } - // Nothing to read? - if (this.underlyingStream.available() <= 0) { - break; - } + return totalBytesRead; + } finally { + objectLock.unlock(); } - - return totalBytesRead; } @Override diff --git a/src/main/core-impl/java/com/mysql/cj/result/SqlDateValueFactory.java b/src/main/core-impl/java/com/mysql/cj/result/SqlDateValueFactory.java index 1d6b68a8a..d447408e5 100644 --- a/src/main/core-impl/java/com/mysql/cj/result/SqlDateValueFactory.java +++ b/src/main/core-impl/java/com/mysql/cj/result/SqlDateValueFactory.java @@ -33,6 +33,7 @@ import java.util.Calendar; import java.util.Locale; import java.util.TimeZone; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.Messages; import com.mysql.cj.WarningListener; @@ -51,7 +52,8 @@ public class SqlDateValueFactory extends AbstractDateTimeValueFactory { private WarningListener warningListener; // cached per instance to avoid re-creation on every create*() call - private Calendar cal; + private final Calendar cal; + private final ReentrantLock calLock = new ReentrantLock(); public SqlDateValueFactory(PropertySet pset, Calendar calendar, TimeZone tz) { super(pset); @@ -72,7 +74,8 @@ public SqlDateValueFactory(PropertySet pset, Calendar calendar, TimeZone tz, War @Override public Date localCreateFromDate(InternalDate idate) { - synchronized (this.cal) { + this.calLock.lock(); + try { try { if (idate.isZero()) { throw new DataReadException(Messages.getString("ResultSet.InvalidZeroDate")); @@ -85,6 +88,8 @@ public Date localCreateFromDate(InternalDate idate) { } catch (IllegalArgumentException e) { throw ExceptionFactory.createException(WrongArgumentException.class, e.getMessage(), e); } + } finally { + calLock.unlock(); } } diff --git a/src/main/core-impl/java/com/mysql/cj/result/SqlTimeValueFactory.java b/src/main/core-impl/java/com/mysql/cj/result/SqlTimeValueFactory.java index a4d37e99f..48bc68f33 100644 --- a/src/main/core-impl/java/com/mysql/cj/result/SqlTimeValueFactory.java +++ b/src/main/core-impl/java/com/mysql/cj/result/SqlTimeValueFactory.java @@ -33,6 +33,7 @@ import java.util.Calendar; import java.util.Locale; import java.util.TimeZone; +import java.util.concurrent.locks.ReentrantLock; import com.mysql.cj.Messages; import com.mysql.cj.WarningListener; @@ -51,7 +52,8 @@ public class SqlTimeValueFactory extends AbstractDateTimeValueFactory