From 32eea56a11e2a337609eacf7e002ed8c871bf833 Mon Sep 17 00:00:00 2001 From: WilliamLee Date: Thu, 28 Jan 2021 10:18:11 +0800 Subject: [PATCH] add track session state change --- .../mysql/cj/conf/PropertyDefinitions.java | 4 + .../java/com/mysql/cj/conf/PropertyKey.java | 1 + .../a/NativeAuthenticationProvider.java | 22 ++--- .../mysql/cj/protocol/a/NativeConstants.java | 2 + .../cj/protocol/a/result/NativeResultset.java | 6 ++ .../mysql/cj/protocol/a/result/OkPacket.java | 89 ++++++++++++++++++- 6 files changed, 105 insertions(+), 19 deletions(-) diff --git a/src/main/core-api/java/com/mysql/cj/conf/PropertyDefinitions.java b/src/main/core-api/java/com/mysql/cj/conf/PropertyDefinitions.java index cde9f48aa..ff2878cee 100644 --- a/src/main/core-api/java/com/mysql/cj/conf/PropertyDefinitions.java +++ b/src/main/core-api/java/com/mysql/cj/conf/PropertyDefinitions.java @@ -260,6 +260,10 @@ new StringPropertyDefinition(PropertyKey.sessionVariables, DEFAULT_VALUE_NULL_STRING, RUNTIME_MODIFIABLE, Messages.getString("ConnectionProperties.sessionVariables"), "3.1.8", CATEGORY_SESSION, Integer.MAX_VALUE), + //todo sinceVersion=8.0.24??? + new BooleanPropertyDefinition(PropertyKey.sessionTrack, DEFAULT_VALUE_FALSE, RUNTIME_MODIFIABLE, + Messages.getString("ConnectionProperties.sessionTrack"), "8.0.24", CATEGORY_SESSION, Integer.MIN_VALUE), + // // CATEGORY_NETWORK // diff --git a/src/main/core-api/java/com/mysql/cj/conf/PropertyKey.java b/src/main/core-api/java/com/mysql/cj/conf/PropertyKey.java index 13dc30f86..bf7ba06bd 100644 --- a/src/main/core-api/java/com/mysql/cj/conf/PropertyKey.java +++ b/src/main/core-api/java/com/mysql/cj/conf/PropertyKey.java @@ -205,6 +205,7 @@ serverConfigCacheFactory("serverConfigCacheFactory", true), // serverRSAPublicKeyFile("serverRSAPublicKeyFile", true), // sessionVariables("sessionVariables", true), // + sessionTrack("sessionTrack", true), // slowQueryThresholdMillis("slowQueryThresholdMillis", true), // slowQueryThresholdNanos("slowQueryThresholdNanos", true), // socketFactory("socketFactory", true), // diff --git a/src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativeAuthenticationProvider.java b/src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativeAuthenticationProvider.java index 69d588cf7..5254d2b7d 100644 --- a/src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativeAuthenticationProvider.java +++ b/src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativeAuthenticationProvider.java @@ -29,13 +29,6 @@ package com.mysql.cj.protocol.a; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - import com.mysql.cj.Constants; import com.mysql.cj.Messages; import com.mysql.cj.conf.PropertyDefinitions.SslMode; @@ -52,15 +45,12 @@ import com.mysql.cj.protocol.a.NativeConstants.IntegerDataType; import com.mysql.cj.protocol.a.NativeConstants.StringLengthDataType; import com.mysql.cj.protocol.a.NativeConstants.StringSelfDataType; -import com.mysql.cj.protocol.a.authentication.AuthenticationLdapSaslClientPlugin; -import com.mysql.cj.protocol.a.authentication.CachingSha2PasswordPlugin; -import com.mysql.cj.protocol.a.authentication.MysqlClearPasswordPlugin; -import com.mysql.cj.protocol.a.authentication.MysqlNativePasswordPlugin; -import com.mysql.cj.protocol.a.authentication.MysqlOldPasswordPlugin; -import com.mysql.cj.protocol.a.authentication.Sha256PasswordPlugin; +import com.mysql.cj.protocol.a.authentication.*; import com.mysql.cj.protocol.a.result.OkPacket; import com.mysql.cj.util.StringUtils; +import java.util.*; + public class NativeAuthenticationProvider implements AuthenticationProvider { protected static final int AUTH_411_OVERHEAD = 33; @@ -155,10 +145,8 @@ public void connect(ServerSession sessState, String user, String password, Strin : (capabilityFlags & NativeServerSession.CLIENT_CONNECT_ATTRS)) | (this.propertySet.getEnumProperty(PropertyKey.sslMode).getValue() != SslMode.DISABLED ? (capabilityFlags & NativeServerSession.CLIENT_SSL) - : 0); - - // TODO MYSQLCONNJ-437 - // clientParam |= (capabilityFlags & NativeServerSession.CLIENT_SESSION_TRACK); + : 0) + | (this.propertySet.getBooleanProperty(PropertyKey.sessionTrack).getValue() ? (capabilityFlags & NativeServerSession.CLIENT_SESSION_TRACK) : 0); sessState.setClientParam(clientParam); diff --git a/src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativeConstants.java b/src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativeConstants.java index a30e15de6..1e1ad360c 100644 --- a/src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativeConstants.java +++ b/src/main/protocol-impl/java/com/mysql/cj/protocol/a/NativeConstants.java @@ -96,6 +96,8 @@ public static final int COM_DAEMON = 29; public static final int COM_BINLOG_DUMP_GTID = 30; public static final int COM_RESET_CONNECTION = 31; + public static final long SERVER_SESSION_STATE_CHANGED = 1 << 14; + /** * Used to indicate that the server sent no field-level character set information, so the driver should use the connection-level character encoding instead. diff --git a/src/main/protocol-impl/java/com/mysql/cj/protocol/a/result/NativeResultset.java b/src/main/protocol-impl/java/com/mysql/cj/protocol/a/result/NativeResultset.java index e258cbe6f..2d5be1d00 100644 --- a/src/main/protocol-impl/java/com/mysql/cj/protocol/a/result/NativeResultset.java +++ b/src/main/protocol-impl/java/com/mysql/cj/protocol/a/result/NativeResultset.java @@ -30,6 +30,8 @@ package com.mysql.cj.protocol.a.result; import java.util.HashMap; +import java.util.List; +import java.util.Map; import com.mysql.cj.protocol.ColumnDefinition; import com.mysql.cj.protocol.Resultset; @@ -62,6 +64,9 @@ */ protected String serverInfo = null; + /** track session state change info */ + protected Map> sessionStateChangeMap; + /** Pointer to current row data */ protected Row thisRow = null; // Values for current row @@ -78,6 +83,7 @@ public NativeResultset(OkPacket ok) { this.updateCount = ok.getUpdateCount(); this.updateId = ok.getUpdateID(); this.serverInfo = ok.getInfo(); + this.sessionStateChangeMap = ok.getSessionStateChangeMap(); this.columnDefinition = new DefaultColumnDefinition(new Field[0]); } diff --git a/src/main/protocol-impl/java/com/mysql/cj/protocol/a/result/OkPacket.java b/src/main/protocol-impl/java/com/mysql/cj/protocol/a/result/OkPacket.java index 87d292ccc..333bcb3d3 100644 --- a/src/main/protocol-impl/java/com/mysql/cj/protocol/a/result/OkPacket.java +++ b/src/main/protocol-impl/java/com/mysql/cj/protocol/a/result/OkPacket.java @@ -31,16 +31,22 @@ import com.mysql.cj.protocol.ProtocolEntity; import com.mysql.cj.protocol.a.NativeConstants.IntegerDataType; +import com.mysql.cj.protocol.a.NativeConstants.StringLengthDataType; import com.mysql.cj.protocol.a.NativeConstants.StringSelfDataType; import com.mysql.cj.protocol.a.NativePacketPayload; -public class OkPacket implements ProtocolEntity { +import java.util.*; + +import static com.mysql.cj.protocol.a.NativeConstants.SERVER_SESSION_STATE_CHANGED; +//Types of State Change Information +public class OkPacket implements ProtocolEntity { private long updateCount = -1; private long updateID = -1; private int statusFlags = 0; private int warningCount = 0; private String info = null; + private Map> sessionStateChangeMap = new HashMap<>(7); public OkPacket() { } @@ -56,9 +62,58 @@ public static OkPacket parse(NativePacketPayload buf, String errorMessageEncodin ok.setStatusFlags((int) buf.readInteger(IntegerDataType.INT2)); ok.setWarningCount((int) buf.readInteger(IntegerDataType.INT2)); ok.setInfo(buf.readString(StringSelfDataType.STRING_TERM, errorMessageEncoding)); // info + + // read session state changes info + if ((ok.getStatusFlags() & SERVER_SESSION_STATE_CHANGED) > 0) { + int totalLen = (int) buf.readInteger(IntegerDataType.INT_LENENC); + int start = buf.getPosition(); + int end = start + totalLen; + while (totalLen > 0 && end > start) { + int type = (int) buf.readInteger(IntegerDataType.INT1); + SessionStateType typeEnum = SessionStateType.valueOf(type); + if (typeEnum == null) { + break; + } + switch (typeEnum) { + case SESSION_TRACK_SYSTEM_VARIABLES: + // just skip one position + buf.readInteger(IntegerDataType.INT1); + ok.addChanges(type, buf.readString(StringSelfDataType.STRING_LENENC, errorMessageEncoding)); + ok.addChanges(type, buf.readString(StringSelfDataType.STRING_LENENC, errorMessageEncoding)); + break; + case SESSION_TRACK_TRANSACTION_CHARACTERISTICS: + case SESSION_TRACK_TRANSACTION_STATE: + case SESSION_TRACK_SCHEMA: + // just skip one position + buf.readInteger(IntegerDataType.INT1); + ok.addChanges(type, buf.readString(StringSelfDataType.STRING_LENENC, errorMessageEncoding)); + break; + case SESSION_TRACK_STATE_CHANGE: + int len = (int) buf.readInteger(IntegerDataType.INT1); + if (len == 1) { + ok.addChanges(type, buf.readString(StringLengthDataType.STRING_FIXED, errorMessageEncoding, len)); + } + break; + case SESSION_TRACK_GTID: + // just skip two position + buf.readInteger(IntegerDataType.INT2); + ok.addChanges(type, buf.readString(StringSelfDataType.STRING_LENENC, errorMessageEncoding)); + break; + default: + //nothing to do + } + start = buf.getPosition(); + } + } return ok; } + private void addChanges(int type, String msg) { + Map> sessionStateChangeMap = this.getSessionStateChangeMap(); + List msgList = sessionStateChangeMap.computeIfAbsent(type, ArrayList::new); + msgList.add(msg); + } + public long getUpdateCount() { return this.updateCount; } @@ -98,4 +153,34 @@ public int getWarningCount() { public void setWarningCount(int warningCount) { this.warningCount = warningCount; } -} + + public Map> getSessionStateChangeMap() { + return sessionStateChangeMap; + } + + public void setSessionStateChangeMap(Map> sessionStateChangeMap) { + this.sessionStateChangeMap = sessionStateChangeMap; + } + + public enum SessionStateType { + SESSION_TRACK_SYSTEM_VARIABLES(0x00), + SESSION_TRACK_SCHEMA(0x01), + SESSION_TRACK_STATE_CHANGE(0x02), + SESSION_TRACK_GTID(0x03), + SESSION_TRACK_TRANSACTION_CHARACTERISTICS(0x04), + SESSION_TRACK_TRANSACTION_STATE(0x05); + + int value; + + SessionStateType(int value) { + this.value = value; + } + + public static SessionStateType valueOf(int type) { + return Arrays.stream(SessionStateType.values()) + .filter(t -> Objects.equals(t.value, type)) + .findFirst() + .orElse(null); + } + } +} \ No newline at end of file