diff --git a/include/my_sys.h b/include/my_sys.h index bc815af..4b30cf8 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -17,6 +17,7 @@ #define _my_sys_h #include "my_global.h" /* C_MODE_START, C_MODE_END */ +#include C_MODE_START @@ -856,12 +857,15 @@ static inline char *safe_strdup_root(MEM_ROOT *root, const char *str) } extern char *strmake_root(MEM_ROOT *root,const char *str,size_t len); extern void *memdup_root(MEM_ROOT *root,const void *str, size_t len); -extern my_bool my_compress(uchar *, size_t *, size_t *, uint level); +extern my_bool my_compress(NET *, uchar *, + size_t *, size_t *, uint level); extern void set_memroot_max_capacity(MEM_ROOT *mem_root, size_t size); extern void set_memroot_error_reporting(MEM_ROOT *mem_root, my_bool report_error); -extern my_bool my_uncompress(uchar *, size_t , size_t *); -extern uchar *my_compress_alloc(const uchar *packet, size_t *len, +extern my_bool my_uncompress(NET *, uchar *, + size_t , size_t *); +extern uchar *my_compress_alloc(NET *net, + const uchar *packet, size_t *len, size_t *complen, uint level); extern int packfrm(uchar *, size_t, uchar **, size_t *); extern int unpackfrm(uchar **, size_t *, const uchar *); diff --git a/include/mysql.h b/include/mysql.h index 116f075..604b684 100644 --- a/include/mysql.h +++ b/include/mysql.h @@ -187,7 +187,8 @@ enum mysql_option MYSQL_OPT_READ_TIMEOUT_MS, MYSQL_OPT_WRITE_TIMEOUT_MS, MYSQL_OPT_SSL_SESSION, - MYSQL_OPT_SSL_CONTEXT + MYSQL_OPT_SSL_CONTEXT, + MYSQL_OPT_COMP_LIB }; /** diff --git a/include/mysql.h.pp b/include/mysql.h.pp index 8d82b3b..6bbc0be 100644 --- a/include/mysql.h.pp +++ b/include/mysql.h.pp @@ -74,6 +74,12 @@ struct st_vio; typedef struct st_vio Vio; struct ssl_st; typedef struct ssl_st SSL; +enum mysql_compression_lib { + MYSQL_COMPRESSION_ZLIB, + MYSQL_COMPRESSION_ZSTD +}; +typedef struct ZSTD_CCtx_s ZSTD_CCtx; +typedef struct ZSTD_DCtx_s ZSTD_DCtx; typedef struct st_net { Vio *vio; unsigned char *buff,*buff_end,*write_pos,*read_pos; @@ -91,6 +97,9 @@ typedef struct st_net { my_bool unused2; my_bool compress; my_bool unused3; + enum mysql_compression_lib comp_lib; + ZSTD_CCtx *cctx; + ZSTD_DCtx *dctx; unsigned char *unused; unsigned int last_errno; unsigned char error; @@ -383,7 +392,8 @@ enum mysql_option MYSQL_OPT_READ_TIMEOUT_MS, MYSQL_OPT_WRITE_TIMEOUT_MS, MYSQL_OPT_SSL_SESSION, - MYSQL_OPT_SSL_CONTEXT + MYSQL_OPT_SSL_CONTEXT, + MYSQL_OPT_COMP_LIB }; struct st_mysql_options_extention; struct st_mysql_options { diff --git a/include/mysql_com.h b/include/mysql_com.h index 5dc5e53..740d6a4 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -429,6 +429,14 @@ typedef struct ssl_st SSL; #define NET_HEADER_SIZE 4 /* standard header size */ #define COMP_HEADER_SIZE 3 /* compression header extra size */ +enum mysql_compression_lib { + MYSQL_COMPRESSION_ZLIB, + MYSQL_COMPRESSION_ZSTD +}; + +typedef struct ZSTD_CCtx_s ZSTD_CCtx; +typedef struct ZSTD_DCtx_s ZSTD_DCtx; + typedef struct st_net { #if !defined(CHECK_EMBEDDED_DIFFERENCES) || !defined(EMBEDDED_LIBRARY) Vio *vio; @@ -452,6 +460,9 @@ typedef struct st_net { my_bool unused2; /* Please remove with the next incompatible ABI change */ my_bool compress; my_bool unused3; /* Please remove with the next incompatible ABI change. */ + enum mysql_compression_lib comp_lib; + ZSTD_CCtx *cctx; + ZSTD_DCtx *dctx; /* Pointer to query object in query cache, do not equal NULL (0) for queries in cache that have not stored its results yet diff --git a/include/sql_common.h b/include/sql_common.h index 0d5c317..72f4a0d 100644 --- a/include/sql_common.h +++ b/include/sql_common.h @@ -136,6 +136,7 @@ void mysql_client_plugin_deinit(); struct st_mysql_client_plugin; extern struct st_mysql_client_plugin *mysql_client_builtins[]; uchar * send_client_connect_attrs(MYSQL *mysql, uchar *buf); +enum mysql_compression_lib get_client_compression_enum(const char* comp_lib); extern my_bool libmysql_cleartext_plugin_enabled; #ifdef __cplusplus diff --git a/libmysql/CMakeLists.txt b/libmysql/CMakeLists.txt index 127970d..e21a94c 100644 --- a/libmysql/CMakeLists.txt +++ b/libmysql/CMakeLists.txt @@ -24,6 +24,12 @@ INCLUDE_DIRECTORIES( ${ZLIB_INCLUDE_DIR}) ADD_DEFINITIONS(${SSL_DEFINES}) +IF(NOT "$ENV{WITH_ZSTD}" STREQUAL "") + SET(ZSTD_LIBRARY $ENV{WITH_ZSTD}/libzstd_pic.a) + ADD_DEFINITIONS(-DZSTD) + ADD_DEFINITIONS(-DHAVE_ZSTD_COMPRESS) +ENDIF() + SET(CLIENT_API_FUNCTIONS get_tty_password handle_options @@ -178,7 +184,7 @@ ELSE() SET(STATIC_MERGE_LIBS clientlib dbug strings vio mysys mysys_ssl ${LIBDL}) ENDIF() -SET(LIBS ${STATIC_MERGE_LIBS} ${ZLIB_LIBRARY} ${SSL_LIBRARIES}) +SET(LIBS ${STATIC_MERGE_LIBS} ${ZLIB_LIBRARY} ${ZSTD_LIBRARY} ${SSL_LIBRARIES}) # # On Windows platform client library includes the client-side @@ -192,7 +198,7 @@ ENDIF() # Merge several convenience libraries into one big fbmysqlclient MERGE_LIBRARIES(fbmysqlclient STATIC ${STATIC_MERGE_LIBS} COMPONENT Development) -TARGET_LINK_LIBRARIES(fbmysqlclient ${ZLIB_LIBRARY} ${SSL_LIBRARIES}) +TARGET_LINK_LIBRARIES(fbmysqlclient ${ZLIB_LIBRARY} ${ZSTD_LIBRARY} ${SSL_LIBRARIES}) # Visual Studio users need debug static library for debug projects IF(MSVC) diff --git a/mysql-test/suite/rpl/r/rpl_test_zstd.result b/mysql-test/suite/rpl/r/rpl_test_zstd.result new file mode 100644 index 0000000..b39a41d --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_test_zstd.result @@ -0,0 +1,44 @@ +include/master-slave.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +[connection master] +set @save_comp_lib = @@global.slave_compression_lib; +set @save_slave_compress = @@global.slave_compressed_protocol; +set global slave_compressed_protocol=on; +include/stop_slave_io.inc +include/start_slave_io.inc +CREATE TABLE t1(c VARCHAR(1000)); +INSERT INTO t1 (c) VALUES( +'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb' +'ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' +'ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd' +'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' +); +include/sync_slave_sql_with_master.inc +set global slave_compression_lib=zstd; +include/stop_slave_io.inc +include/start_slave_io.inc +CREATE TABLE t2(c VARCHAR(1000)); +INSERT INTO t2 (c) VALUES( +'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb' +'ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' +'ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd' +'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' +); +include/sync_slave_sql_with_master.inc +select c from t1; +c +aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee +select c from t2; +c +aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee +compression_changed +1 +drop table t1; +drop table t2; +set @@global.slave_compression_lib = @save_comp_lib; +set @@global.slave_compressed_protocol = @save_slave_compress; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_test_zstd.test b/mysql-test/suite/rpl/t/rpl_test_zstd.test new file mode 100644 index 0000000..d41157d --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_test_zstd.test @@ -0,0 +1,69 @@ +--source include/have_compress.inc +--source include/master-slave.inc + +connection slave; +set @save_comp_lib = @@global.slave_compression_lib; +set @save_slave_compress = @@global.slave_compressed_protocol; + +set global slave_compressed_protocol=on; + +--source include/stop_slave_io.inc +--source include/start_slave_io.inc + +--let $zlib_before = query_get_value(show global status where variable_name='bytes_received', Value, 1) + +connection master; + +CREATE TABLE t1(c VARCHAR(1000)); +# Insert large string to ensure replication needs to compress this +INSERT INTO t1 (c) VALUES( +'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb' +'ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' +'ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd' +'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' +); + +--source include/sync_slave_sql_with_master.inc + +connection slave; +--let $zlib_after = query_get_value(show global status where variable_name='bytes_received', Value, 1) + +set global slave_compression_lib=zstd; +--source include/stop_slave_io.inc +--source include/start_slave_io.inc + +--let $zstd_before = query_get_value(show global status where variable_name='bytes_received', Value, 1) + +connection master; + +CREATE TABLE t2(c VARCHAR(1000)); +# Insert large string to ensure replication needs to compress this +INSERT INTO t2 (c) VALUES( +'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb' +'ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' +'ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd' +'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' +); + +--source include/sync_slave_sql_with_master.inc + +connection slave; +--let $zstd_after = query_get_value(show global status where variable_name='bytes_received', Value, 1) +select c from t1; +select c from t2; + +--disable_query_log +--eval select ($zlib_after - $zlib_before) <> ($zstd_after - $zstd_before) as "compression_changed" +--enable_query_log + +connection master; +drop table t1; +drop table t2; + +connection slave; +set @@global.slave_compression_lib = @save_comp_lib; +set @@global.slave_compressed_protocol = @save_slave_compress; + +--source include/rpl_end.inc diff --git a/mysql-test/suite/sys_vars/r/slave_compression_lib_basic.result b/mysql-test/suite/sys_vars/r/slave_compression_lib_basic.result new file mode 100644 index 0000000..23416a5 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/slave_compression_lib_basic.result @@ -0,0 +1,26 @@ +SET @start_global_value = @@global.slave_compression_lib; +SELECT @start_global_value; +@start_global_value +zlib +SET @@global.slave_compression_lib='zlib'; +SELECT @@global.slave_compression_lib; +@@global.slave_compression_lib +zlib +SET @@global.slave_compression_lib='zstd'; +SELECT @@global.slave_compression_lib; +@@global.slave_compression_lib +zstd +SET @@global.slave_compression_lib=DEFAULT; +SELECT @@global.slave_compression_lib; +@@global.slave_compression_lib +zlib +SET @@global.slave_compression_lib='voodoo_compression'; +ERROR 42000: Variable 'slave_compression_lib' can't be set to the value of 'voodoo_compression' +SELECT @@global.slave_compression_lib; +@@global.slave_compression_lib +zlib +SET @new = @@global.slave_compression_lib; +SELECT @new; +@new +zlib +SET @@global.slave_compression_lib = @start_global_value; diff --git a/mysql-test/suite/sys_vars/r/zstd_net_compression_level_basic.result b/mysql-test/suite/sys_vars/r/zstd_net_compression_level_basic.result new file mode 100644 index 0000000..95a64c8 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/zstd_net_compression_level_basic.result @@ -0,0 +1,16 @@ +SET @orig = @@global.zstd_net_compression_level; +SELECT @orig; +@orig +3 +SET @@global.zstd_net_compression_level = 2; +SET @new = @@global.zstd_net_compression_level; +SELECT @new; +@new +2 +SET @@global.zstd_net_compression_level = 50; +Warnings: +Warning 1292 Truncated incorrect zstd_net_compression_level value: '50' +SELECT @@global.zstd_net_compression_level; +@@global.zstd_net_compression_level +22 +SET @@global.zstd_net_compression_level = @orig; diff --git a/mysql-test/suite/sys_vars/t/slave_compression_lib_basic.test b/mysql-test/suite/sys_vars/t/slave_compression_lib_basic.test new file mode 100644 index 0000000..b3a1b10 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/slave_compression_lib_basic.test @@ -0,0 +1,33 @@ +############## mysql-test\t\slave_compression_lib_basic.test ######## +# # +# Variable Name: slave_compression_lib # +# Scope: GLOBAL # +# Access Type: Dynamic # +# Data Type: enum # +# Default Value: 'zlib' # +# Values: zlib, zstd # +# Descriptrion: Which compression algorithm to use for replication # +# # +############################################################################### +--source include/load_sysvars.inc + +SET @start_global_value = @@global.slave_compression_lib; +SELECT @start_global_value; + +SET @@global.slave_compression_lib='zlib'; +SELECT @@global.slave_compression_lib; + +SET @@global.slave_compression_lib='zstd'; +SELECT @@global.slave_compression_lib; + +SET @@global.slave_compression_lib=DEFAULT; +SELECT @@global.slave_compression_lib; + +--error 1231 +SET @@global.slave_compression_lib='voodoo_compression'; +SELECT @@global.slave_compression_lib; + +SET @new = @@global.slave_compression_lib; +SELECT @new; + +SET @@global.slave_compression_lib = @start_global_value; diff --git a/mysql-test/suite/sys_vars/t/zstd_net_compression_level_basic.test b/mysql-test/suite/sys_vars/t/zstd_net_compression_level_basic.test new file mode 100644 index 0000000..486de2f --- /dev/null +++ b/mysql-test/suite/sys_vars/t/zstd_net_compression_level_basic.test @@ -0,0 +1,24 @@ +############## mysql-test\t\zstd_net_compression_level_basic.test ######## +# # +# Variable Name: zstd_net_compression_level # +# Scope: GLOBAL # +# Access Type: Dynamic # +# Data Type: int # +# Default Value: 3 # +# Range: 0-22 # +# Description: Level of compression to use for zstd operations # +# # +############################################################################### + +SET @orig = @@global.zstd_net_compression_level; +SELECT @orig; + +SET @@global.zstd_net_compression_level = 2; + +SET @new = @@global.zstd_net_compression_level; +SELECT @new; + +SET @@global.zstd_net_compression_level = 50; +SELECT @@global.zstd_net_compression_level; + +SET @@global.zstd_net_compression_level = @orig; diff --git a/mysys/CMakeLists.txt b/mysys/CMakeLists.txt index 8f0bd25..c2155b5 100644 --- a/mysys/CMakeLists.txt +++ b/mysys/CMakeLists.txt @@ -70,8 +70,14 @@ IF(HAVE_MLOCK) SET(MYSYS_SOURCES ${MYSYS_SOURCES} my_lockmem.c) ENDIF() +IF(NOT "$ENV{WITH_ZSTD}" STREQUAL "") + SET(ZSTD_LIBRARY $ENV{WITH_ZSTD}/libzstd_pic.a) + ADD_DEFINITIONS(-DZSTD) + ADD_DEFINITIONS(-DHAVE_ZSTD_COMPRESS) +ENDIF() + ADD_CONVENIENCE_LIBRARY(mysys ${MYSYS_SOURCES}) -TARGET_LINK_LIBRARIES(mysys dbug strings ${ZLIB_LIBRARY} +TARGET_LINK_LIBRARIES(mysys dbug strings ${ZLIB_LIBRARY} ${ZSTD_LIBRARY} ${LIBNSL} ${LIBM} ${LIBRT} ${LIBEXECINFO}) DTRACE_INSTRUMENT(mysys) diff --git a/mysys/my_compress.c b/mysys/my_compress.c index 6e77d2a..ad04fd3 100644 --- a/mysys/my_compress.c +++ b/mysys/my_compress.c @@ -23,11 +23,23 @@ #endif #include +#ifdef HAVE_ZSTD_COMPRESS +#include +#include +#endif + +#ifdef MYSQL_SERVER +extern uint zstd_net_compression_level; +#else /* MYSQL_SERVER */ +#define zstd_net_compression_level 3 +#endif // MYSQL_SERVER + /* This replaces the packet with a compressed packet SYNOPSIS my_compress() + net Contains options like compression lib and other contexts packet Data to compress. This is is replaced with the compressed data. len Length of data to compress at 'packet' complen out: 0 if packet was not compressed @@ -35,11 +47,28 @@ RETURN 1 error. 'len' is not changed' 0 ok. In this case 'len' contains the size of the compressed packet + + The compression library can be modified using the compression_lib connection + attribute. If a client requests a library that is not supported by the server, + the server will default to zlib. If the initial client packet is not + compressed then the client will be able to seamlessly fall back to zlib as + well. Otherwise the client will receive an error. + + The slave thread can change the compression library it uses for replication + through the 'slave_compression_lib' global variable. The slave IO thread + needs to be restarted for the change to take effect. The compression level + for zstd can be modified at runtime using 'zstd_net_compression_level'. + Changes to compression level will take place immediately. This will not + affect connections using zlib compression. */ -my_bool my_compress(uchar *packet, size_t *len, size_t *complen, uint level) +my_bool my_compress(NET *net, uchar *packet, + size_t *len, size_t *complen, uint level) { DBUG_ENTER("my_compress"); + DBUG_ASSERT(packet != NULL); + DBUG_ASSERT(len != NULL); + DBUG_ASSERT(complen != NULL); if (*len < MIN_COMPRESS_LENGTH) { *complen=0; @@ -47,7 +76,7 @@ my_bool my_compress(uchar *packet, size_t *len, size_t *complen, uint level) } else { - uchar *compbuf=my_compress_alloc(packet,len,complen, level); + uchar *compbuf=my_compress_alloc(net, packet, len, complen, level); if (!compbuf) DBUG_RETURN(*complen ? 0 : 1); memcpy(packet,compbuf,*len); @@ -56,10 +85,95 @@ my_bool my_compress(uchar *packet, size_t *len, size_t *complen, uint level) DBUG_RETURN(0); } +#ifdef HAVE_ZSTD_COMPRESS +uchar *zstd_compress_alloc(NET *net, const uchar *packet, size_t *len, + size_t *complen, uint level __attribute__((unused))) +{ + DBUG_ASSERT(net != NULL); + if (!net->cctx) { + if (!(net->cctx = ZSTD_createCCtx())) { + return NULL; + } + } + size_t zstd_len = ZSTD_compressBound(*len); + void *compbuf; + size_t zstd_res; + + if (!(compbuf = my_malloc(zstd_len, MYF(MY_WME)))) { + return NULL; + } + + zstd_res = ZSTD_compressCCtx(net->cctx, compbuf, zstd_len, + (const void *)packet, *len, + zstd_net_compression_level); + if (ZSTD_isError(zstd_res)) { + DBUG_PRINT("error", ("Can't compress zstd packet, error: %zd, %s", + zstd_res, ZSTD_getErrorName(zstd_res))); + my_free(compbuf); + return NULL; + } + + if (zstd_res > *len) { + *complen = 0; + my_free(compbuf); + DBUG_PRINT("note", + ("Packet got longer on zstd compression; Not compressed")); + return NULL; + } + + *complen = *len; + *len = zstd_res; + return compbuf; +} + +// Returns 0 on success +my_bool zstd_uncompress(NET *net, uchar *packet, size_t len, size_t *complen) +{ + DBUG_ASSERT(net != NULL); + size_t zstd_res; + void *compbuf; -uchar *my_compress_alloc(const uchar *packet, size_t *len, + if (!net->dctx) { + if (!(net->dctx = ZSTD_createDCtx())) { + return TRUE; + } + } + + if (!(compbuf = my_malloc(*complen, MYF(MY_WME)))) { + return TRUE; + } + + zstd_res = ZSTD_decompressDCtx(net->dctx, compbuf, *complen, + (const void *)packet, len); + + if (ZSTD_isError(zstd_res) || zstd_res != *complen) { + DBUG_PRINT("error", ("Can't uncompress zstd packet, error: %zd, %s", + zstd_res, ZSTD_getErrorName(zstd_res))); + my_free(compbuf); + return TRUE; + } + + memcpy(packet, compbuf, *complen); + my_free(compbuf); + return FALSE; +} +#endif + +uchar *my_compress_alloc(NET *net, + const uchar *packet, size_t *len, size_t *complen, uint level) { +#ifdef HAVE_ZSTD_COMPRESS + enum mysql_compression_lib comp_lib = net ? net->comp_lib + : MYSQL_COMPRESSION_ZLIB; + + if (comp_lib == MYSQL_COMPRESSION_ZSTD) { + return zstd_compress_alloc(net, packet, len, complen, level); + } +#else + (void)net; +#endif + uchar *compbuf; uLongf tmp_complen; int res; @@ -108,13 +222,24 @@ uchar *my_compress_alloc(const uchar *packet, size_t *len, real data. */ -my_bool my_uncompress(uchar *packet, size_t len, size_t *complen) +my_bool my_uncompress(NET *net, uchar *packet, + size_t len, size_t *complen) { uLongf tmp_complen; DBUG_ENTER("my_uncompress"); if (*complen) /* If compressed */ { +#ifdef HAVE_ZSTD_COMPRESS + enum mysql_compression_lib comp_lib = net ? net->comp_lib + : MYSQL_COMPRESSION_ZLIB; + if (comp_lib == MYSQL_COMPRESSION_ZSTD) { + DBUG_RETURN(zstd_uncompress(net, packet, len, complen)); + } +#else + (void)net; +#endif + uchar *compbuf= (uchar *) my_malloc(*complen,MYF(MY_WME)); int error; if (!compbuf) @@ -180,7 +305,8 @@ int packfrm(uchar *data, size_t len, error= 1; org_len= len; - if (my_compress((uchar*)data, &org_len, &comp_len, Z_DEFAULT_COMPRESSION)) + if (my_compress(NULL, (uchar*)data, &org_len, &comp_len, + Z_DEFAULT_COMPRESSION)) goto err; DBUG_PRINT("info", ("org_len: %lu comp_len: %lu", (ulong) org_len, @@ -250,7 +376,7 @@ int unpackfrm(uchar **unpack_data, size_t *unpack_len, DBUG_RETURN(2); memcpy(data, pack_data + BLOB_HEADER, complen); - if (my_uncompress(data, complen, &orglen)) + if (my_uncompress(NULL, data, complen, &orglen)) { my_free(data); DBUG_RETURN(3); diff --git a/sql-common/client.c b/sql-common/client.c index 3d1ccb5..1d1bd80 100644 --- a/sql-common/client.c +++ b/sql-common/client.c @@ -4927,6 +4927,18 @@ csm_begin_connect(mysql_csm_context *ctx) { DBUG_RETURN(STATE_MACHINE_CONTINUE); } +const char* mysql_compression_lib_names[] = {"zlib", "zstd", NullS}; +enum mysql_compression_lib get_client_compression_enum(const char* comp_lib) { + unsigned i = 0; + while(mysql_compression_lib_names[i]) { + if (strcmp(mysql_compression_lib_names[i], comp_lib) == 0) { + return i; + } + i++; + } + return MYSQL_COMPRESSION_ZLIB; +} + /* Complete the connection itself, setting options on the now-connected socket. */ static mysql_state_machine_status @@ -4988,6 +5000,15 @@ csm_complete_connect(mysql_csm_context *ctx) DBUG_RETURN(STATE_MACHINE_FAILED); } vio_keepalive(net->vio,TRUE); + LEX_STRING *comp_lib = (LEX_STRING *) + my_hash_search(&mysql->options.extension->connection_attributes, + (const void *)"compression_lib", strlen("compression_lib")); + if (comp_lib) { + LEX_STRING *val = comp_lib + 1; + if (val->str) { + net->comp_lib = get_client_compression_enum(val->str); + } + } /* If user set read_timeout, let it override the default */ if (timeout_is_nonzero(mysql->options.read_timeout)) @@ -6318,6 +6339,15 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg) mysql->options.write_timeout = timeout_from_millis((*(uint*) arg)); fixup_zero_timeout(&mysql->options.write_timeout); break; + case MYSQL_OPT_COMP_LIB: + mysql_options(mysql, MYSQL_OPT_CONNECT_ATTR_DELETE, "compression_lib"); + const char *lib_name = "zlib"; + if ((enum mysql_compression_lib)arg == MYSQL_COMPRESSION_ZSTD) { + lib_name = "zstd"; + } + mysql_options4(mysql, MYSQL_OPT_CONNECT_ATTR_ADD, + "compression_lib", lib_name); + // Intentional fall through to enable compression case MYSQL_OPT_COMPRESS: mysql->options.compress= 1; /* Remember for connect */ mysql->options.client_flag|= CLIENT_COMPRESS; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index d06014a..f3334a8 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -508,6 +508,7 @@ my_bool opt_skip_slave_start = 0; ///< If set, slave is not autostarted my_bool opt_reckless_slave = 0; my_bool opt_enable_named_pipe= 0; my_bool opt_local_infile, opt_slave_compressed_protocol; +ulong opt_slave_compression_lib; my_bool opt_safe_user_create = 0; my_bool opt_show_slave_auth_info; my_bool opt_log_slave_updates= 0; @@ -579,6 +580,7 @@ ulonglong histogram_binlog_group_commit_values[NUMBER_OF_COUNTER_HISTOGRAM_BINS]; uint net_compression_level = 6; +uint zstd_net_compression_level = 3; #if defined(ENABLED_DEBUG_SYNC) MYSQL_PLUGIN_IMPORT uint opt_debug_sync_timeout= 0; diff --git a/sql/mysqld.h b/sql/mysqld.h index 6257684..ab4ab48 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -267,6 +267,8 @@ extern CHARSET_INFO *error_message_charset_info; extern CHARSET_INFO *character_set_filesystem; +extern const char* mysql_compression_lib_names[3]; + extern MY_BITMAP temp_pool; extern bool opt_large_files, server_id_supplied; extern bool opt_update_log, opt_bin_log, opt_error_log; @@ -295,6 +297,7 @@ extern ulong opt_srv_fatal_semaphore_timeout; extern my_bool opt_safe_user_create; extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap; extern my_bool opt_slave_compressed_protocol, use_temp_pool; +extern ulong opt_slave_compression_lib; extern ulong slave_exec_mode_options; extern ulong slave_use_idempotent_for_recovery_options; extern ulong slave_run_triggers_for_rbr; @@ -374,6 +377,7 @@ extern my_bool opt_log_slow_extra; extern ulonglong binlog_fsync_count; extern uint net_compression_level; +extern uint zstd_net_compression_level; extern ulong relay_io_connected; diff --git a/sql/net_serv.cc b/sql/net_serv.cc index 73f1b4e..e19e48a 100644 --- a/sql/net_serv.cc +++ b/sql/net_serv.cc @@ -49,6 +49,10 @@ #include +#ifdef HAVE_ZSTD_COMPRESS +#include +#endif + using std::min; using std::max; @@ -116,6 +120,11 @@ my_bool my_net_init(NET *net, Vio* vio) net->write_pos=net->read_pos = net->buff; net->last_error[0]=0; net->compress=0; net->reading_or_writing=0; + net->comp_lib = MYSQL_COMPRESSION_ZLIB; +#ifdef HAVE_ZSTD_COMPRESS + net->cctx = NULL; + net->dctx = NULL; +#endif net->where_b = net->remain_in_buf=0; net->last_errno=0; net->unused= 0; @@ -147,6 +156,12 @@ void net_end(NET *net) #ifdef HAVE_COMPRESS reset_packet_write_state(net); #endif +#ifdef HAVE_ZSTD_COMPRESS + if (net->cctx) ZSTD_freeCCtx(net->cctx); + if (net->dctx) ZSTD_freeDCtx(net->dctx); + net->cctx = NULL; + net->dctx = NULL; +#endif my_free(net->buff); net->buff=0; #ifdef HAVE_OPENSSL @@ -946,7 +961,8 @@ compress_packet(NET *net, const uchar *packet, size_t *length) memcpy(compr_packet + header_length, packet, *length); /* Compress the encapsulated packet. */ - if (my_compress(compr_packet + header_length, length, &compr_length, + if (my_compress(net, compr_packet + header_length, + length, &compr_length, net_compression_level)) { /* @@ -1441,7 +1457,7 @@ end: if (net->compress) { *complen = net->async_packet_uncompressed_length; - if (my_uncompress(net->buff + net->where_b, + if (my_uncompress(net, net->buff + net->where_b, net->async_packet_length, complen)) { net->error = 2; // caller will close socket net->last_errno = ER_NET_UNCOMPRESS_ERROR; @@ -1826,7 +1842,7 @@ my_net_read(NET *net) MYSQL_NET_READ_DONE(1, 0); return packet_error; } - if (my_uncompress(net->buff + net->where_b, packet_len, + if (my_uncompress(net, net->buff + net->where_b, packet_len, &complen)) { net->error= 2; /* caller will close socket */ diff --git a/sql/rpl_slave.cc b/sql/rpl_slave.cc index a078093..9d294e7 100644 --- a/sql/rpl_slave.cc +++ b/sql/rpl_slave.cc @@ -7820,8 +7820,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, mi->events_until_exit = disconnect_slave_event_count; #endif ulong client_flag= CLIENT_REMEMBER_OPTIONS; - if (opt_slave_compressed_protocol) + if (opt_slave_compressed_protocol) { client_flag|= CLIENT_COMPRESS; /* We will use compression */ + mysql_options(mysql, MYSQL_OPT_COMP_LIB, (void *)opt_slave_compression_lib); + } mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index d02f2cb..5c58913 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -9930,6 +9930,20 @@ read_client_connect_attrs(char **ptr, size_t *max_bytes_available, DBUG_RETURN(0); } +// We don't need to worry about obtaining a lock here because the connection +// attributes are written in only one location, and that is during +// initialization. Since this is also the writer thread, we are sure the map +// is safe to read +static void parse_compression_connect_attr(NET *net, THD *thd) { + auto it = thd->connection_attrs_map.find("compression_lib"); + if (it != thd->connection_attrs_map.end()) { + net->comp_lib = get_client_compression_enum(it->second.c_str()); + } else { + // When no lib is specified, default to zlib for backwards compatibility + net->comp_lib = MYSQL_COMPRESSION_ZLIB; + } +} + #endif @@ -10564,6 +10578,8 @@ skip_to_ssl: mpvio->charset_adapter->charset())) return packet_error; + parse_compression_connect_attr(net, current_thd); + char db_buff[NAME_LEN + 1]; // buffer to store db in utf8 char user_buff[USERNAME_LENGTH + 1]; // buffer to store user in utf8 uint dummy_errors; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 56c0ba7..8366e23 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -3338,6 +3338,19 @@ static Sys_var_uint Sys_net_compression_level( GLOBAL_VAR(net_compression_level), CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, 9), DEFAULT(6), BLOCK_SIZE(1)); +static Sys_var_enum slave_compression_lib_enum( + "slave_compression_lib", "Compression library for replication stream", + GLOBAL_VAR(opt_slave_compression_lib), + CMD_LINE(OPT_ARG), mysql_compression_lib_names, + DEFAULT(MYSQL_COMPRESSION_ZLIB), NO_MUTEX_GUARD, NOT_IN_BINLOG); + +static Sys_var_uint Sys_zstd_net_compression_level( + "zstd_net_compression_level", + "Compression level for compressed protocol when zstd library is" + " selected. Valid values 0-22.", + GLOBAL_VAR(zstd_net_compression_level), CMD_LINE(OPT_ARG), + VALID_RANGE(0, 22), DEFAULT(3), BLOCK_SIZE(1)); + static Sys_var_ulong Sys_sort_buffer( "sort_buffer_size", "Each thread that needs to do a sort allocates a buffer of this size", diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index 31bdaa3..52e7e0b 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -29,6 +29,11 @@ INCLUDE_DIRECTORIES( ${CMAKE_SOURCE_DIR}/sql/backup ) +IF(NOT "$ENV{WITH_ZSTD}" STREQUAL "") + ADD_DEFINITIONS(-DZSTD) + ADD_DEFINITIONS(-DHAVE_ZSTD_COMPRESS) +ENDIF() + SET(GEN_SOURCES ${CMAKE_BINARY_DIR}/sql/sql_yacc.h ${CMAKE_BINARY_DIR}/sql/sql_yacc.cc diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 0baee44..0101ea1 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -22,6 +22,11 @@ INCLUDE_DIRECTORIES( ${CMAKE_BINARY_DIR}/sql ) +IF(NOT "$ENV{WITH_ZSTD}" STREQUAL "") + ADD_DEFINITIONS(-DZSTD) + ADD_DEFINITIONS(-DHAVE_ZSTD_COMPRESS) +ENDIF() + SET(GEN_SOURCES ${CMAKE_CURRENT_BINARY_DIR}/sql_yacc.h ${CMAKE_CURRENT_BINARY_DIR}/sql_yacc.cc