From 269e85a22cf5e613a785d0ff783dace5f836c09d Mon Sep 17 00:00:00 2001 From: Stuart Lang Date: Tue, 18 Jul 2023 14:07:59 +0100 Subject: [PATCH 1/3] Try to re-add some synchronization --- MySQL.Data/src/CharSetMap.cs | 58 +++-- MySQL.Data/src/Driver.cs | 13 + .../MySqlPromotableTransaction.cs | 59 +++-- MySQL.Data/src/MySqlCommand.cs | 236 ++++++++++-------- MySQL.Data/src/MySqlConnection.cs | 22 +- MySQL.Data/src/MySqlPool.cs | 158 +++++++++--- MySQL.Data/src/MySqlPoolManager.cs | 103 +++++--- MySQL.Data/src/Releaser.cs | 19 ++ .../src/Replication/ReplicationManager.cs | 87 ++++--- MySQL.Data/src/common/Ssl.cs | 33 ++- 10 files changed, 525 insertions(+), 263 deletions(-) create mode 100644 MySQL.Data/src/Releaser.cs diff --git a/MySQL.Data/src/CharSetMap.cs b/MySQL.Data/src/CharSetMap.cs index 553ca4160..85ce16a7a 100644 --- a/MySQL.Data/src/CharSetMap.cs +++ b/MySQL.Data/src/CharSetMap.cs @@ -42,13 +42,12 @@ internal class CharSetMap private static Dictionary _defaultCollations; private static Dictionary _maxLengths; private static Dictionary _mapping; - private static readonly object LockObject; + private static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); // we use a static constructor here since we only want to init // the mapping once static CharSetMap() { - LockObject = new Object(); InitializeMapping(); } @@ -57,8 +56,8 @@ public static CharacterSet GetCharacterSet(string charSetName) if (charSetName == null) throw new ArgumentNullException("CharSetName is null"); CharacterSet cs = null; - if (_mapping.ContainsKey(charSetName)) - cs = _mapping[charSetName]; + if (_mapping.TryGetValue(charSetName, out var charset)) + cs = charset; if (cs == null) throw new NotSupportedException("Character set '" + charSetName + "' is not supported by .Net Framework."); @@ -181,27 +180,50 @@ internal static async Task InitCollectionsAsync(MySqlConnection connection, bool internal static async Task GetDefaultCollationAsync(string charset, MySqlConnection connection, bool execAsync, CancellationToken cancellationToken = default) { - SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - semaphoreSlim.Wait(); - - if (_defaultCollations == null) - await InitCollectionsAsync(connection, execAsync, cancellationToken).ConfigureAwait(false); + if (execAsync) + { + await _semaphoreSlim.WaitAsync(cancellationToken); + } + else + { + _semaphoreSlim.Wait(cancellationToken); + } - semaphoreSlim.Release(); - return !_defaultCollations.ContainsKey(charset) ? null : _defaultCollations[charset]; + try + { + if (_defaultCollations == null) + await InitCollectionsAsync(connection, execAsync, cancellationToken).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); + } + + return !_defaultCollations.TryGetValue(charset, out string collation) ? null : collation; } internal static async Task GetMaxLengthAsync(string charset, MySqlConnection connection, bool execAsync, CancellationToken cancellationToken = default) { - SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - semaphoreSlim.Wait(); - - if (_maxLengths == null) - await InitCollectionsAsync(connection, execAsync, cancellationToken).ConfigureAwait(false); + if (execAsync) + { + await _semaphoreSlim.WaitAsync(cancellationToken); + } + else + { + _semaphoreSlim.Wait(cancellationToken); + } - semaphoreSlim.Release(); + try + { + if (_maxLengths == null) + await InitCollectionsAsync(connection, execAsync, cancellationToken).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); + } - return !_maxLengths.ContainsKey(charset) ? 1 : _maxLengths[charset]; + return !_maxLengths.TryGetValue(charset, out int maxLength) ? 1 : maxLength; } } diff --git a/MySQL.Data/src/Driver.cs b/MySQL.Data/src/Driver.cs index 6e96c9f23..b29ebb914 100644 --- a/MySQL.Data/src/Driver.cs +++ b/MySQL.Data/src/Driver.cs @@ -54,6 +54,7 @@ internal class Driver : IDisposable protected IDriver handler; internal MySqlDataReader reader; private bool disposed; + private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1, 1); /// /// For pooled connections, time when the driver was @@ -481,6 +482,18 @@ public virtual async Task ExecuteStatementAsync(MySqlPacket packetToExecute, boo await handler.ExecuteStatementAsync(packetToExecute, execAsync).ConfigureAwait(false); } + public async Task LockAsync() + { + await semaphoreSlim.WaitAsync(); + return new Releaser(semaphoreSlim); + } + + public Releaser Lock() + { + semaphoreSlim.Wait(); + return new Releaser(semaphoreSlim); + } + public virtual async Task CloseStatementAsync(int id, bool execAsync) { diff --git a/MySQL.Data/src/Framework/netstandard2_0/MySqlPromotableTransaction.cs b/MySQL.Data/src/Framework/netstandard2_0/MySqlPromotableTransaction.cs index cbafb651b..89e067f81 100644 --- a/MySQL.Data/src/Framework/netstandard2_0/MySqlPromotableTransaction.cs +++ b/MySQL.Data/src/Framework/netstandard2_0/MySqlPromotableTransaction.cs @@ -32,6 +32,7 @@ using System.Collections.Generic; using System.Data; using System.Data.Common; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -60,32 +61,56 @@ public async Task RollbackAsync(SinglePhaseEnlistment singlePhaseEnlistment, boo // prevent commands in main thread to run concurrently Driver driver = connection.driver; - SemaphoreSlim semaphoreSlim = new(1); - semaphoreSlim.Wait(); - - rollbackThreadId = Thread.CurrentThread.ManagedThreadId; - while (connection.Reader != null) + Releaser releaser; + if (execAsync) { - // wait for reader to finish. Maybe we should not wait - // forever and cancel it after some time? - System.Threading.Thread.Sleep(100); + releaser = await driver.LockAsync().ConfigureAwait(false); } - simpleTransaction.Rollback(); - singlePhaseEnlistment.Aborted(); - DriverTransactionManager.RemoveDriverInTransaction(baseTransaction); + else + { + releaser = driver.Lock(); + } + + using (releaser) + { + rollbackThreadId = Thread.CurrentThread.ManagedThreadId; + while (connection.Reader != null) + { + // wait for reader to finish. Maybe we should not wait + // forever and cancel it after some time? + System.Threading.Thread.Sleep(100); + } + + if (execAsync) + { + await simpleTransaction.RollbackAsync().ConfigureAwait(false); + } + else + { + simpleTransaction.Rollback(); + } - driver.currentTransaction = null; + singlePhaseEnlistment.Aborted(); + DriverTransactionManager.RemoveDriverInTransaction(baseTransaction); - if (connection.State == ConnectionState.Closed) - await connection.CloseFullyAsync(execAsync).ConfigureAwait(false); - rollbackThreadId = 0; + driver.currentTransaction = null; - semaphoreSlim.Release(); + if (connection.State == ConnectionState.Closed) + await connection.CloseFullyAsync(execAsync).ConfigureAwait(false); + rollbackThreadId = 0; + } } public async Task SinglePhaseCommitAsync(SinglePhaseEnlistment singlePhaseEnlistment, bool execAsync) { - simpleTransaction.Commit(); + if (execAsync) + { + await simpleTransaction.CommitAsync().ConfigureAwait(false); + } + else + { + simpleTransaction.Commit(); + } singlePhaseEnlistment.Committed(); DriverTransactionManager.RemoveDriverInTransaction(baseTransaction); connection.driver.currentTransaction = null; diff --git a/MySQL.Data/src/MySqlCommand.cs b/MySQL.Data/src/MySqlCommand.cs index 489e0b38f..71ebe0d4a 100644 --- a/MySQL.Data/src/MySqlCommand.cs +++ b/MySQL.Data/src/MySqlCommand.cs @@ -658,12 +658,14 @@ internal async Task ResetSqlSelectLimitAsync(bool execAsync) protected override async Task ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) => await ExecuteReaderAsync(behavior, true, cancellationToken).ConfigureAwait(false); - internal async Task ExecuteReaderAsync(CommandBehavior behavior, bool execAsync, CancellationToken cancellationToken = default) + internal async Task ExecuteReaderAsync(CommandBehavior behavior, bool execAsync, + CancellationToken cancellationToken = default) { // give our interceptors a shot at it first MySqlDataReader interceptedReader = null; - if (connection?.commandInterceptor != null && connection.commandInterceptor.ExecuteReader(CommandText, behavior, ref interceptedReader)) + if (connection?.commandInterceptor != null && + connection.commandInterceptor.ExecuteReader(CommandText, behavior, ref interceptedReader)) return interceptedReader; // interceptors didn't handle this so we fall through @@ -679,140 +681,154 @@ internal async Task ExecuteReaderAsync(CommandBehavior behavior // Load balancing getting a new connection if (connection.hasBeenOpen && !driver.HasStatus(ServerStatusFlags.InTransaction)) - await ReplicationManager.GetNewConnectionAsync(connection.Settings.Server, !IsReadOnlyCommand(sql), connection, execAsync, cancellationToken).ConfigureAwait(false); + await ReplicationManager.GetNewConnectionAsync(connection.Settings.Server, !IsReadOnlyCommand(sql), connection, + execAsync, cancellationToken).ConfigureAwait(false); - SemaphoreSlim semaphoreSlim = new(1); - semaphoreSlim.Wait(); - - // We have to recheck that there is no reader, after we got the lock - if (connection.Reader != null) - Throw(new MySqlException(Resources.DataReaderOpen)); - - System.Transactions.Transaction curTrans = System.Transactions.Transaction.Current; - - if (curTrans != null) + Releaser releaser; + if (execAsync) + { + releaser = await driver.LockAsync().ConfigureAwait(false); + } + else + { + releaser = driver.Lock(); + } + + using (releaser) { - bool inRollback = false; - //TODO: ADD support for 452 and 46X - if (driver.currentTransaction != null) - inRollback = driver.currentTransaction.InRollback; + // We have to recheck that there is no reader, after we got the lock + if (connection.Reader != null) + Throw(new MySqlException(Resources.DataReaderOpen)); + + System.Transactions.Transaction curTrans = System.Transactions.Transaction.Current; - if (!inRollback) + if (curTrans != null) { - System.Transactions.TransactionStatus status = System.Transactions.TransactionStatus.InDoubt; - try + bool inRollback = false; + //TODO: ADD support for 452 and 46X + if (driver.currentTransaction != null) + inRollback = driver.currentTransaction.InRollback; + + if (!inRollback) { - // in some cases (during state transitions) this throws - // an exception. Ignore exceptions, we're only interested - // whether transaction was aborted or not. - status = curTrans.TransactionInformation.Status; - } - catch (System.Transactions.TransactionException) { } + System.Transactions.TransactionStatus status = System.Transactions.TransactionStatus.InDoubt; + try + { + // in some cases (during state transitions) this throws + // an exception. Ignore exceptions, we're only interested + // whether transaction was aborted or not. + status = curTrans.TransactionInformation.Status; + } + catch (System.Transactions.TransactionException) + { + } - if (status == System.Transactions.TransactionStatus.Aborted) - Throw(new System.Transactions.TransactionAbortedException()); + if (status == System.Transactions.TransactionStatus.Aborted) + Throw(new System.Transactions.TransactionAbortedException()); + } } - } - - commandTimer = new CommandTimer(connection, CommandTimeout); - LastInsertedId = -1; - if (CommandType == CommandType.TableDirect) - sql = "SELECT * FROM " + sql; + commandTimer = new CommandTimer(connection, CommandTimeout); + LastInsertedId = -1; - // if we are on a replicated connection, we are only allow readonly statements - if (connection.Settings.Replication && !InternallyCreated) - EnsureCommandIsReadOnly(sql); - - if (statement == null || !statement.IsPrepared) - { - if (CommandType == CommandType.StoredProcedure) - statement = new StoredProcedure(this, sql); - else - statement = new PreparableStatement(this, sql); - } + if (CommandType == CommandType.TableDirect) + sql = "SELECT * FROM " + sql; - // stored procs are the only statement type that need do anything during resolve - statement.Resolve(false); + // if we are on a replicated connection, we are only allow readonly statements + if (connection.Settings.Replication && !InternallyCreated) + EnsureCommandIsReadOnly(sql); - // Now that we have completed our resolve step, we can handle our - // command behaviors - await HandleCommandBehaviorsAsync(execAsync, behavior).ConfigureAwait(false); + if (statement == null || !statement.IsPrepared) + { + if (CommandType == CommandType.StoredProcedure) + statement = new StoredProcedure(this, sql); + else + statement = new PreparableStatement(this, sql); + } - try - { - MySqlDataReader reader = new MySqlDataReader(this, statement, behavior); - connection.Reader = reader; - Canceled = false; - // execute the statement - await statement.ExecuteAsync(execAsync).ConfigureAwait(false); - // wait for data to return - await reader.NextResultAsync(execAsync, cancellationToken).ConfigureAwait(false); - success = true; - return reader; - } - catch (TimeoutException tex) - { - await connection.HandleTimeoutOrThreadAbortAsync(tex, execAsync).ConfigureAwait(false); - throw; //unreached - } - catch (ThreadAbortException taex) - { - await connection.HandleTimeoutOrThreadAbortAsync(taex, execAsync).ConfigureAwait(false); - throw; - } - catch (IOException ioex) - { - await connection.AbortAsync(execAsync).ConfigureAwait(false); // Closes connection without returning it to the pool - throw new MySqlException(Resources.FatalErrorDuringExecute, ioex); - } - catch (MySqlException ex) - { + // stored procs are the only statement type that need do anything during resolve + statement.Resolve(false); - if (ex.InnerException is TimeoutException) - throw; // already handled + // Now that we have completed our resolve step, we can handle our + // command behaviors + await HandleCommandBehaviorsAsync(execAsync, behavior).ConfigureAwait(false); try { - await ResetReaderAsync(execAsync).ConfigureAwait(false); - await ResetSqlSelectLimitAsync(execAsync).ConfigureAwait(false); + MySqlDataReader reader = new MySqlDataReader(this, statement, behavior); + connection.Reader = reader; + Canceled = false; + // execute the statement + await statement.ExecuteAsync(execAsync).ConfigureAwait(false); + // wait for data to return + await reader.NextResultAsync(execAsync, cancellationToken).ConfigureAwait(false); + success = true; + return reader; } - catch (Exception) + catch (TimeoutException tex) { - // Reset SqlLimit did not work, connection is hosed. - await Connection.AbortAsync(execAsync).ConfigureAwait(false); - throw new MySqlException(ex.Message, true, ex); + await connection.HandleTimeoutOrThreadAbortAsync(tex, execAsync).ConfigureAwait(false); + throw; //unreached } - - // if we caught an exception because of a cancel, then just return null - if (ex.IsQueryAborted) - return null; - if (ex.IsFatal) - await Connection.CloseAsync(execAsync).ConfigureAwait(false); - if (ex.Number == 0) - throw new MySqlException(Resources.FatalErrorDuringExecute, ex); - throw; - } - finally - { - if (connection != null) + catch (ThreadAbortException taex) + { + await connection.HandleTimeoutOrThreadAbortAsync(taex, execAsync).ConfigureAwait(false); + throw; + } + catch (IOException ioex) { - if (connection.Reader == null) + await connection.AbortAsync(execAsync) + .ConfigureAwait(false); // Closes connection without returning it to the pool + throw new MySqlException(Resources.FatalErrorDuringExecute, ioex); + } + catch (MySqlException ex) + { + + if (ex.InnerException is TimeoutException) + throw; // already handled + + try { - // Something went seriously wrong, and reader would not - // be able to clear timeout on closing. - // So we clear timeout here. - ClearCommandTimer(); + await ResetReaderAsync(execAsync).ConfigureAwait(false); + await ResetSqlSelectLimitAsync(execAsync).ConfigureAwait(false); } - if (!success) + catch (Exception) { - // ExecuteReader failed.Close Reader and set to null to - // prevent subsequent errors with DataReaderOpen - await ResetReaderAsync(execAsync).ConfigureAwait(false); + // Reset SqlLimit did not work, connection is hosed. + await Connection.AbortAsync(execAsync).ConfigureAwait(false); + throw new MySqlException(ex.Message, true, ex); } + + // if we caught an exception because of a cancel, then just return null + if (ex.IsQueryAborted) + return null; + if (ex.IsFatal) + await Connection.CloseAsync(execAsync).ConfigureAwait(false); + if (ex.Number == 0) + throw new MySqlException(Resources.FatalErrorDuringExecute, ex); + throw; } + finally + { + if (connection != null) + { + if (connection.Reader == null) + { + // Something went seriously wrong, and reader would not + // be able to clear timeout on closing. + // So we clear timeout here. + ClearCommandTimer(); + } - semaphoreSlim.Release(); + if (!success) + { + // ExecuteReader failed.Close Reader and set to null to + // prevent subsequent errors with DataReaderOpen + await ResetReaderAsync(execAsync).ConfigureAwait(false); + } + } + + } } } diff --git a/MySQL.Data/src/MySqlConnection.cs b/MySQL.Data/src/MySqlConnection.cs index 54b9e5ae6..1bc147291 100644 --- a/MySQL.Data/src/MySqlConnection.cs +++ b/MySQL.Data/src/MySqlConnection.cs @@ -534,14 +534,22 @@ internal async Task ChangeDatabaseAsync(string databaseName, bool execAsync, Can // This semaphore prevents promotable transaction rollback to run // in parallel - SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - - semaphoreSlim.Wait(); - // We use default command timeout for SetDatabase - using (new CommandTimer(this, (int)Settings.DefaultCommandTimeout)) - await driver.SetDatabaseAsync(databaseName, execAsync).ConfigureAwait(false); + Releaser releaser; + if (execAsync) + { + releaser = await driver.LockAsync().ConfigureAwait(false); + } + else + { + releaser = driver.Lock(); + } - semaphoreSlim.Release(); + using (releaser) + { + // We use default command timeout for SetDatabase + using (new CommandTimer(this, (int)Settings.DefaultCommandTimeout)) + await driver.SetDatabaseAsync(databaseName, execAsync).ConfigureAwait(false); + } _database = databaseName; } diff --git a/MySQL.Data/src/MySqlPool.cs b/MySQL.Data/src/MySqlPool.cs index a92dc0625..a9dd7030c 100644 --- a/MySQL.Data/src/MySqlPool.cs +++ b/MySQL.Data/src/MySqlPool.cs @@ -49,7 +49,9 @@ internal sealed class MySqlPool private readonly AutoResetEvent _autoEvent; private int _available; // Object used to lock the list of host obtained from DNS SRV lookup. - private readonly object _dnsSrvLock = new object(); + private readonly SemaphoreSlim _inUsePoolSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _idlePoolSemaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _dnsSrvSemaphore = new SemaphoreSlim(1, 1); private void EnqueueIdle(Driver driver) { @@ -121,7 +123,16 @@ private async Task GetPooledConnectionAsync(bool execAsync, Cancellation // if we don't have an idle connection but we have room for a new // one, then create it here. - lock ((_idlePool as ICollection).SyncRoot) + if (execAsync) + { + await _idlePoolSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + else + { + _idlePoolSemaphore.Wait(cancellationToken); + } + + try { if (HasIdleConnections) { @@ -129,6 +140,10 @@ private async Task GetPooledConnectionAsync(bool execAsync, Cancellation _idlePool.RemoveLast(); } } + finally + { + _idlePoolSemaphore.Release(); + } // Obey the connection timeout if (driver != null) @@ -164,10 +179,25 @@ private async Task GetPooledConnectionAsync(bool execAsync, Cancellation driver = await CreateNewPooledConnectionAsync(execAsync, cancellationToken).ConfigureAwait(false); Debug.Assert(driver != null); - lock ((_inUsePool as ICollection).SyncRoot) + + if (execAsync) + { + await _inUsePoolSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + else + { + _inUsePoolSemaphore.Wait(cancellationToken); + } + + try { _inUsePool.Add(driver); } + finally + { + _inUsePoolSemaphore.Release(); + } + return driver; } @@ -185,11 +215,24 @@ private async Task CreateNewPooledConnectionAsync(bool execAsync, Cancel public async Task ReleaseConnectionAsync(Driver driver, bool execAsync) { - lock ((_inUsePool as ICollection).SyncRoot) + if (execAsync) + { + await _inUsePoolSemaphore.WaitAsync().ConfigureAwait(false); + } + else + { + _inUsePoolSemaphore.Wait(); + } + + try { if (_inUsePool.Contains(driver)) _inUsePool.Remove(driver); } + finally + { + _inUsePoolSemaphore.Release(); + } if (driver.ConnectionLifetimeExpired() || BeingCleared) { @@ -198,33 +241,57 @@ public async Task ReleaseConnectionAsync(Driver driver, bool execAsync) } else { - lock ((_idlePool as ICollection).SyncRoot) + if (execAsync) + { + await _idlePoolSemaphore.WaitAsync().ConfigureAwait(false); + } + else + { + _idlePoolSemaphore.Wait(); + } + + try { EnqueueIdle(driver); } + finally + { + _idlePoolSemaphore.Release(); + } } - SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - semaphoreSlim.Wait(); - - if (driver.Settings.DnsSrv) + if (execAsync) { - var dnsSrvRecords = DnsSrv.GetDnsSrvRecords(DnsSrv.ServiceName); - FailoverManager.SetHostList(dnsSrvRecords.ConvertAll(r => new FailoverServer(r.Target, r.Port, null)), - FailoverMethod.Sequential); + await _dnsSrvSemaphore.WaitAsync().ConfigureAwait(false); + } + else + { + _dnsSrvSemaphore.Wait(); + } - foreach (var idleConnection in _idlePool) + try + { + if (driver.Settings.DnsSrv) { - string idleServer = idleConnection.Settings.Server; - if (!FailoverManager.FailoverGroup.Hosts.Exists(h => h.Host == idleServer) && !idleConnection.IsInActiveUse) + var dnsSrvRecords = DnsSrv.GetDnsSrvRecords(DnsSrv.ServiceName); + FailoverManager.SetHostList(dnsSrvRecords.ConvertAll(r => new FailoverServer(r.Target, r.Port, null)), + FailoverMethod.Sequential); + + foreach (var idleConnection in _idlePool) { - await idleConnection.CloseAsync(execAsync).ConfigureAwait(false); + string idleServer = idleConnection.Settings.Server; + if (!FailoverManager.FailoverGroup.Hosts.Exists(h => h.Host == idleServer) && !idleConnection.IsInActiveUse) + { + await idleConnection.CloseAsync(execAsync).ConfigureAwait(false); + } } } } - - semaphoreSlim.Release(); - + finally + { + _dnsSrvSemaphore.Release(); + } + Interlocked.Increment(ref _available); _autoEvent.Set(); } @@ -238,7 +305,8 @@ public async Task ReleaseConnectionAsync(Driver driver, bool execAsync) /// public void RemoveConnection(Driver driver) { - lock ((_inUsePool as ICollection).SyncRoot) + _inUsePoolSemaphore.Wait(); + try { if (_inUsePool.Contains(driver)) { @@ -247,6 +315,10 @@ public void RemoveConnection(Driver driver) _autoEvent.Set(); } } + finally + { + _inUsePoolSemaphore.Release(); + } // if we are being cleared and we are out of connections then have // the manager destroy us. @@ -301,21 +373,33 @@ public async Task GetConnectionAsync(bool execAsync, CancellationToken c /// internal async Task ClearAsync(bool execAsync) { - SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - semaphoreSlim.Wait(); + if (execAsync) + { + await _idlePoolSemaphore.WaitAsync().ConfigureAwait(false); + } + else + { + _idlePoolSemaphore.Wait(); + } - // first, mark ourselves as being cleared - BeingCleared = true; + try + { + // first, mark ourselves as being cleared + BeingCleared = true; - // then we remove all connections sitting in the idle pool - while (_idlePool.Count > 0) + // then we remove all connections sitting in the idle pool + while (_idlePool.Count > 0) + { + Driver d = _idlePool.Last.Value; + await d.CloseAsync(execAsync).ConfigureAwait(false); + _idlePool.RemoveLast(); + } + } + finally { - Driver d = _idlePool.Last.Value; - await d.CloseAsync(execAsync).ConfigureAwait(false); - _idlePool.RemoveLast(); + _idlePoolSemaphore.Release(); } - semaphoreSlim.Release(); // there is nothing left to do here. Now we just wait for all // in use connections to be returned to the pool. When they are // they will be closed. When the last one is closed, the pool will @@ -338,23 +422,29 @@ internal List RemoveOldIdleConnections() var connectionsToClose = new List(); DateTime now = DateTime.Now; - lock ((_idlePool as ICollection).SyncRoot) + _idlePoolSemaphore.Wait(); + + try { while (_idlePool.Count > _minSize) { - var iddleConnection = _idlePool.First.Value; - DateTime expirationTime = iddleConnection.IdleSince.Add( + var idleConnection = _idlePool.First.Value; + DateTime expirationTime = idleConnection.IdleSince.Add( new TimeSpan(0, 0, MySqlPoolManager.maxConnectionIdleTime)); if (expirationTime.CompareTo(now) < 0) { - connectionsToClose.Add(iddleConnection); + connectionsToClose.Add(idleConnection); _idlePool.RemoveFirst(); } else break; } } + finally + { + _idlePoolSemaphore.Release(); + } return connectionsToClose; } } diff --git a/MySQL.Data/src/MySqlPoolManager.cs b/MySQL.Data/src/MySqlPoolManager.cs index d903f8813..26a137fc7 100644 --- a/MySQL.Data/src/MySqlPoolManager.cs +++ b/MySQL.Data/src/MySqlPoolManager.cs @@ -48,6 +48,7 @@ internal class MySqlPoolManager { private static readonly Dictionary Pools = new Dictionary(); private static readonly List ClearingPools = new List(); + private static readonly SemaphoreSlim PoolsSemaphore = new SemaphoreSlim(1, 1); internal const int DEMOTED_TIMEOUT = 120000; #region Properties @@ -138,21 +139,34 @@ public static async Task GetPoolAsync(MySqlConnectionStringBuilder se { string text = GetKey(settings); - SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - semaphoreSlim.Wait(CancellationToken.None); - MySqlPool pool; - Pools.TryGetValue(text, out pool); - - if (pool == null) + if (execAsync) { - pool = await MySqlPool.CreateMySqlPoolAsync(settings, execAsync, cancellationToken).ConfigureAwait(false); - Pools.Add(text, pool); + await PoolsSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); } else - pool.Settings = settings; + { + PoolsSemaphore.Wait(cancellationToken); + } - semaphoreSlim.Release(); - return pool; + try + { + MySqlPool pool; + Pools.TryGetValue(text, out pool); + + if (pool == null) + { + pool = await MySqlPool.CreateMySqlPoolAsync(settings, execAsync, cancellationToken).ConfigureAwait(false); + Pools.Add(text, pool); + } + else + pool.Settings = settings; + + return pool; + } + finally + { + PoolsSemaphore.Release(); + } } public static void RemoveConnection(Driver driver) @@ -193,39 +207,62 @@ public static async Task ClearPoolAsync(MySqlConnectionStringBuilder settings, b private static async Task ClearPoolByTextAsync(string key, bool execAsync) { - SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - semaphoreSlim.Wait(); - - // if pools doesn't have it, then this pool must already have been cleared - if (!Pools.ContainsKey(key)) return; + if (execAsync) + { + await PoolsSemaphore.WaitAsync().ConfigureAwait(false); + } + else + { + PoolsSemaphore.Wait(); + } - // add the pool to our list of pools being cleared - MySqlPool pool = (Pools[key] as MySqlPool); - ClearingPools.Add(pool); + try + { + // if pools doesn't have it, then this pool must already have been cleared + if (!Pools.ContainsKey(key)) return; - // now tell the pool to clear itself - await pool.ClearAsync(execAsync).ConfigureAwait(false); + // add the pool to our list of pools being cleared + MySqlPool pool = (Pools[key] as MySqlPool); + ClearingPools.Add(pool); - // and then remove the pool from the active pools list - Pools.Remove(key); + // now tell the pool to clear itself + await pool.ClearAsync(execAsync).ConfigureAwait(false); - semaphoreSlim.Release(); + // and then remove the pool from the active pools list + Pools.Remove(key); + } + finally + { + PoolsSemaphore.Release(); + } } public static async Task ClearAllPoolsAsync(bool execAsync) { - SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - semaphoreSlim.Wait(); + if (execAsync) + { + await PoolsSemaphore.WaitAsync().ConfigureAwait(false); + } + else + { + PoolsSemaphore.Wait(); + } - // Create separate keys list. - List keys = new List(Pools.Count); - keys.AddRange(Pools.Keys); + try + { + // Create separate keys list. + List keys = new List(Pools.Count); + keys.AddRange(Pools.Keys); - // Remove all pools by key. - foreach (string key in keys) - await ClearPoolByTextAsync(key, execAsync).ConfigureAwait(false); + // Remove all pools by key. + foreach (string key in keys) + await ClearPoolByTextAsync(key, execAsync).ConfigureAwait(false); - semaphoreSlim.Release(); + } + finally + { + PoolsSemaphore.Release(); + } if (DemotedServersTimer != null) { diff --git a/MySQL.Data/src/Releaser.cs b/MySQL.Data/src/Releaser.cs new file mode 100644 index 000000000..cc5beaf2f --- /dev/null +++ b/MySQL.Data/src/Releaser.cs @@ -0,0 +1,19 @@ +using System; +using System.Threading; + +namespace MySql.Data.MySqlClient; + +internal class Releaser : IDisposable +{ + private readonly SemaphoreSlim semaphoreSlim; + + public Releaser(SemaphoreSlim semaphoreSlim) + { + this.semaphoreSlim = semaphoreSlim; + } + + public void Dispose() + { + semaphoreSlim.Release(); + } +} \ No newline at end of file diff --git a/MySQL.Data/src/Replication/ReplicationManager.cs b/MySQL.Data/src/Replication/ReplicationManager.cs index 4c43f44fc..06ab80c90 100644 --- a/MySQL.Data/src/Replication/ReplicationManager.cs +++ b/MySQL.Data/src/Replication/ReplicationManager.cs @@ -41,7 +41,7 @@ namespace MySql.Data.MySqlClient.Replication internal static class ReplicationManager { private static List groups = new List(); - private static Object thisLock = new Object(); + private static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1, 1); //private static Dictionary selectors = new Dictionary(); static ReplicationManager() @@ -147,55 +147,70 @@ internal static async Task GetNewConnectionAsync(string groupName, bool source, { do { - SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - semaphoreSlim.Wait(); - if (!IsReplicationGroup(groupName)) return; - - ReplicationServerGroup group = GetGroup(groupName); - ReplicationServer server = group.GetServer(source, connection.Settings); - - if (server == null) - throw new MySqlException(Resources.Replication_NoAvailableServer); + if (execAsync) + { + await semaphoreSlim.WaitAsync(cancellationToken); + } + else + { + semaphoreSlim.Wait(cancellationToken); + } try { - bool isNewServer = false; - if (connection.driver == null || !connection.driver.IsOpen) - { - isNewServer = true; - } - else + if (!IsReplicationGroup(groupName)) return; + + ReplicationServerGroup group = GetGroup(groupName); + ReplicationServer server = group.GetServer(source, connection.Settings); + + if (server == null) + throw new MySqlException(Resources.Replication_NoAvailableServer); + + try { - MySqlConnectionStringBuilder msb = new MySqlConnectionStringBuilder(server.ConnectionString); - if (!msb.Equals(connection.driver.Settings)) + bool isNewServer = false; + if (connection.driver == null || !connection.driver.IsOpen) { isNewServer = true; } + else + { + MySqlConnectionStringBuilder msb = new MySqlConnectionStringBuilder(server.ConnectionString); + if (!msb.Equals(connection.driver.Settings)) + { + isNewServer = true; + } + } + + if (isNewServer) + { + Driver driver = await Driver.CreateAsync(new MySqlConnectionStringBuilder(server.ConnectionString), + execAsync, cancellationToken).ConfigureAwait(false); + connection.driver = driver; + } + + return; } - if (isNewServer) + catch (MySqlException ex) { - Driver driver = await Driver.CreateAsync(new MySqlConnectionStringBuilder(server.ConnectionString), execAsync, cancellationToken).ConfigureAwait(false); - connection.driver = driver; + connection.driver = null; + server.IsAvailable = false; + MySqlTrace.LogError(ex.Number, ex.ToString()); + if (ex.Number == 1042) + { + // retry to open a failed connection and update its status + group.HandleFailover(server, ex); + } + else + throw; } - return; } - catch (MySqlException ex) + finally { - connection.driver = null; - server.IsAvailable = false; - MySqlTrace.LogError(ex.Number, ex.ToString()); - if (ex.Number == 1042) - { - // retry to open a failed connection and update its status - group.HandleFailover(server, ex); - } - else - throw; + semaphoreSlim.Release(); } - - semaphoreSlim.Release(); - + } while (true); } } diff --git a/MySQL.Data/src/common/Ssl.cs b/MySQL.Data/src/common/Ssl.cs index 6d6841631..f72dededf 100644 --- a/MySQL.Data/src/common/Ssl.cs +++ b/MySQL.Data/src/common/Ssl.cs @@ -64,7 +64,7 @@ internal class Ssl private static SslProtocols[] tlsProtocols = new SslProtocols[] { SslProtocols.Tls12 }; private static Dictionary tlsConnectionRef = new Dictionary(); private static Dictionary tlsRetry = new Dictionary(); - private static Object thisLock = new Object(); + private static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1, 1); #endregion @@ -209,21 +209,38 @@ private X509CertificateCollection GetPFXClientCertificates() tlsProtocols = listProtocols.ToArray(); } - if (tlsConnectionRef.ContainsKey(connectionId)) + if (execAsync) { - tlsProtocol = tlsConnectionRef[connectionId]; + await semaphoreSlim.WaitAsync(cancellationToken); } else { - if (!tlsRetry.ContainsKey(connectionId)) + semaphoreSlim.Wait(cancellationToken); + } + + try + { + if (tlsConnectionRef.TryGetValue(connectionId, out var protocol)) { - tlsRetry[connectionId] = 0; + tlsProtocol = protocol; } - for (int i = tlsRetry[connectionId]; i < tlsProtocols.Length; i++) + else { - tlsProtocol |= tlsProtocols[i]; + if (!tlsRetry.ContainsKey(connectionId)) + { + tlsRetry[connectionId] = 0; + } + for (int i = tlsRetry[connectionId]; i < tlsProtocols.Length; i++) + { + tlsProtocol |= tlsProtocols[i]; + } } } + finally + { + semaphoreSlim.Release(); + } + try { tlsProtocol = (tlsProtocol == SslProtocols.None) ? SslProtocols.Tls12 : tlsProtocol; @@ -236,7 +253,7 @@ private X509CertificateCollection GetPFXClientCertificates() else { using (cancellationToken.Register(() => throw new AggregateException($"Authentication to host '{_settings.Server}' failed.", new IOException()))) - sslStream.AuthenticateAsClientAsync(_settings.Server, certs, tlsProtocol, false).GetAwaiter().GetResult(); + sslStream.AuthenticateAsClient(_settings.Server, certs, tlsProtocol, false); } tlsConnectionRef[connectionId] = tlsProtocol; From 3cadbc09895140342a4a3a81f40e8aaa140730dc Mon Sep 17 00:00:00 2001 From: Stuart Lang Date: Tue, 18 Jul 2023 17:53:04 +0100 Subject: [PATCH 2/3] Add missing .ConfigureAwait(false) --- MySQL.Data/src/Authentication/MySQLAuthenticationPlugin.cs | 2 +- MySQL.Data/src/Driver.cs | 2 +- MySQL.Data/src/MySqlConnection.cs | 4 ++-- MySQL.Data/src/MySqlPoolManager.cs | 2 +- MySQL.Data/src/NativeDriver.cs | 4 ++-- MySQL.Data/src/Replication/ReplicationManager.cs | 2 +- MySQL.Data/src/X/XDevAPI/Common/BaseStatement.cs | 2 +- MySQL.Data/src/common/Ssl.cs | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/MySQL.Data/src/Authentication/MySQLAuthenticationPlugin.cs b/MySQL.Data/src/Authentication/MySQLAuthenticationPlugin.cs index a0362b6ed..b8a12ea8f 100644 --- a/MySQL.Data/src/Authentication/MySQLAuthenticationPlugin.cs +++ b/MySQL.Data/src/Authentication/MySQLAuthenticationPlugin.cs @@ -171,7 +171,7 @@ internal async Task AuthenticateAsync(bool reset, bool execAsync) await packet.WriteStringAsync(PluginName, execAsync).ConfigureAwait(false); await _driver.SetConnectAttrsAsync(execAsync).ConfigureAwait(false); - await _driver.SendPacketAsync(packet, execAsync); + await _driver.SendPacketAsync(packet, execAsync).ConfigureAwait(false); // Read server response. packet = await ReadPacketAsync(execAsync).ConfigureAwait(false); diff --git a/MySQL.Data/src/Driver.cs b/MySQL.Data/src/Driver.cs index b29ebb914..0e0198458 100644 --- a/MySQL.Data/src/Driver.cs +++ b/MySQL.Data/src/Driver.cs @@ -484,7 +484,7 @@ public virtual async Task ExecuteStatementAsync(MySqlPacket packetToExecute, boo public async Task LockAsync() { - await semaphoreSlim.WaitAsync(); + await semaphoreSlim.WaitAsync().ConfigureAwait(false); return new Releaser(semaphoreSlim); } diff --git a/MySQL.Data/src/MySqlConnection.cs b/MySQL.Data/src/MySqlConnection.cs index 1bc147291..097ac6fc2 100644 --- a/MySQL.Data/src/MySqlConnection.cs +++ b/MySQL.Data/src/MySqlConnection.cs @@ -643,7 +643,7 @@ internal async Task OpenAsync(bool execAsync, CancellationToken cancellationToke } else if (FailoverManager.FailoverGroup != null && !Settings.Pooling) { - string connectionString = await FailoverManager.AttemptConnectionAsync(this, Settings.ConnectionString, execAsync, cancellationToken); + string connectionString = await FailoverManager.AttemptConnectionAsync(this, Settings.ConnectionString, execAsync, cancellationToken).ConfigureAwait(false); currentSettings.ConnectionString = connectionString; } @@ -651,7 +651,7 @@ internal async Task OpenAsync(bool execAsync, CancellationToken cancellationToke { if (FailoverManager.FailoverGroup != null) { - string connectionString = await FailoverManager.AttemptConnectionAsync(this, Settings.ConnectionString, execAsync, cancellationToken, true); + string connectionString = await FailoverManager.AttemptConnectionAsync(this, Settings.ConnectionString, execAsync, cancellationToken, true).ConfigureAwait(false); currentSettings.ConnectionString = connectionString; } diff --git a/MySQL.Data/src/MySqlPoolManager.cs b/MySQL.Data/src/MySqlPoolManager.cs index 26a137fc7..9b170be03 100644 --- a/MySQL.Data/src/MySqlPoolManager.cs +++ b/MySQL.Data/src/MySqlPoolManager.cs @@ -184,7 +184,7 @@ public static async Task ReleaseConnectionAsync(Driver driver, bool execAsync) MySqlPool pool = driver.Pool; - await pool?.ReleaseConnectionAsync(driver, execAsync); + await pool?.ReleaseConnectionAsync(driver, execAsync).ConfigureAwait(false); } public static async Task ClearPoolAsync(MySqlConnectionStringBuilder settings, bool execAsync) diff --git a/MySQL.Data/src/NativeDriver.cs b/MySQL.Data/src/NativeDriver.cs index c7f518b19..dbd9abf32 100644 --- a/MySQL.Data/src/NativeDriver.cs +++ b/MySQL.Data/src/NativeDriver.cs @@ -117,7 +117,7 @@ private async Task HandleExceptionAsync(MySqlException ex, bool execAsync) internal async Task SendPacketAsync(MySqlPacket p, bool execAsync) { - await stream.SendPacketAsync(p, execAsync); + await stream.SendPacketAsync(p, execAsync).ConfigureAwait(false); } internal async Task SendEmptyPacketAsync(bool execAsync) @@ -421,7 +421,7 @@ public async Task AuthenticateAsync(string authMethod, bool reset, bool execAsyn authPlugin = await MySqlAuthenticationPlugin.GetPluginAsync(authMethod, this, encryptionSeed, execAsync).ConfigureAwait(false); } - await authPlugin.AuthenticateAsync(reset, execAsync); + await authPlugin.AuthenticateAsync(reset, execAsync).ConfigureAwait(false); } #endregion diff --git a/MySQL.Data/src/Replication/ReplicationManager.cs b/MySQL.Data/src/Replication/ReplicationManager.cs index 06ab80c90..b6ba69001 100644 --- a/MySQL.Data/src/Replication/ReplicationManager.cs +++ b/MySQL.Data/src/Replication/ReplicationManager.cs @@ -150,7 +150,7 @@ internal static async Task GetNewConnectionAsync(string groupName, bool source, if (!IsReplicationGroup(groupName)) return; if (execAsync) { - await semaphoreSlim.WaitAsync(cancellationToken); + await semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false); } else { diff --git a/MySQL.Data/src/X/XDevAPI/Common/BaseStatement.cs b/MySQL.Data/src/X/XDevAPI/Common/BaseStatement.cs index f28f811b9..19dd9017c 100644 --- a/MySQL.Data/src/X/XDevAPI/Common/BaseStatement.cs +++ b/MySQL.Data/src/X/XDevAPI/Common/BaseStatement.cs @@ -89,7 +89,7 @@ public async Task ExecuteAsync() }, CancellationToken.None, TaskCreationOptions.None, - Session._scheduler); + Session._scheduler).ConfigureAwait(false); } /// diff --git a/MySQL.Data/src/common/Ssl.cs b/MySQL.Data/src/common/Ssl.cs index f72dededf..21d144b90 100644 --- a/MySQL.Data/src/common/Ssl.cs +++ b/MySQL.Data/src/common/Ssl.cs @@ -211,7 +211,7 @@ private X509CertificateCollection GetPFXClientCertificates() if (execAsync) { - await semaphoreSlim.WaitAsync(cancellationToken); + await semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false); } else { From 8fc6046bc45f8f8f945d417dc2f2201eaa63680b Mon Sep 17 00:00:00 2001 From: Stuart Lang Date: Tue, 18 Jul 2023 18:00:58 +0100 Subject: [PATCH 3/3] Simplify driver lock code --- MySQL.Data/src/Driver.cs | 18 +++++++++--------- .../MySqlPromotableTransaction.cs | 12 +----------- MySQL.Data/src/MySqlCommand.cs | 12 +----------- MySQL.Data/src/MySqlConnection.cs | 11 +---------- 4 files changed, 12 insertions(+), 41 deletions(-) diff --git a/MySQL.Data/src/Driver.cs b/MySQL.Data/src/Driver.cs index 0e0198458..99661b683 100644 --- a/MySQL.Data/src/Driver.cs +++ b/MySQL.Data/src/Driver.cs @@ -482,19 +482,19 @@ public virtual async Task ExecuteStatementAsync(MySqlPacket packetToExecute, boo await handler.ExecuteStatementAsync(packetToExecute, execAsync).ConfigureAwait(false); } - public async Task LockAsync() + public async Task LockAsync(bool execAsync = true, CancellationToken cancellationToken = default) { - await semaphoreSlim.WaitAsync().ConfigureAwait(false); - return new Releaser(semaphoreSlim); - } - - public Releaser Lock() - { - semaphoreSlim.Wait(); + if (execAsync) + { + await semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false); + } + else + { + semaphoreSlim.Wait(cancellationToken); + } return new Releaser(semaphoreSlim); } - public virtual async Task CloseStatementAsync(int id, bool execAsync) { await handler.CloseStatementAsync(id, execAsync).ConfigureAwait(false); diff --git a/MySQL.Data/src/Framework/netstandard2_0/MySqlPromotableTransaction.cs b/MySQL.Data/src/Framework/netstandard2_0/MySqlPromotableTransaction.cs index 89e067f81..ed30d3ba8 100644 --- a/MySQL.Data/src/Framework/netstandard2_0/MySqlPromotableTransaction.cs +++ b/MySQL.Data/src/Framework/netstandard2_0/MySqlPromotableTransaction.cs @@ -61,17 +61,7 @@ public async Task RollbackAsync(SinglePhaseEnlistment singlePhaseEnlistment, boo // prevent commands in main thread to run concurrently Driver driver = connection.driver; - Releaser releaser; - if (execAsync) - { - releaser = await driver.LockAsync().ConfigureAwait(false); - } - else - { - releaser = driver.Lock(); - } - - using (releaser) + using (await driver.LockAsync(execAsync).ConfigureAwait(false)) { rollbackThreadId = Thread.CurrentThread.ManagedThreadId; while (connection.Reader != null) diff --git a/MySQL.Data/src/MySqlCommand.cs b/MySQL.Data/src/MySqlCommand.cs index 71ebe0d4a..585393993 100644 --- a/MySQL.Data/src/MySqlCommand.cs +++ b/MySQL.Data/src/MySqlCommand.cs @@ -684,17 +684,7 @@ internal async Task ResetSqlSelectLimitAsync(bool execAsync) await ReplicationManager.GetNewConnectionAsync(connection.Settings.Server, !IsReadOnlyCommand(sql), connection, execAsync, cancellationToken).ConfigureAwait(false); - Releaser releaser; - if (execAsync) - { - releaser = await driver.LockAsync().ConfigureAwait(false); - } - else - { - releaser = driver.Lock(); - } - - using (releaser) + using (await driver.LockAsync(execAsync, cancellationToken).ConfigureAwait(false)) { // We have to recheck that there is no reader, after we got the lock if (connection.Reader != null) diff --git a/MySQL.Data/src/MySqlConnection.cs b/MySQL.Data/src/MySqlConnection.cs index 097ac6fc2..c3b36c9ce 100644 --- a/MySQL.Data/src/MySqlConnection.cs +++ b/MySQL.Data/src/MySqlConnection.cs @@ -534,17 +534,8 @@ internal async Task ChangeDatabaseAsync(string databaseName, bool execAsync, Can // This semaphore prevents promotable transaction rollback to run // in parallel - Releaser releaser; - if (execAsync) - { - releaser = await driver.LockAsync().ConfigureAwait(false); - } - else - { - releaser = driver.Lock(); - } - using (releaser) + using (await driver.LockAsync(execAsync, cancellationToken).ConfigureAwait(false)) { // We use default command timeout for SetDatabase using (new CommandTimer(this, (int)Settings.DefaultCommandTimeout))