From 351703d6ec9c5164cd59f0c9251173518592009e Mon Sep 17 00:00:00 2001 From: Zhao Junwang Date: Sat, 21 Nov 2020 20:29:57 +0800 Subject: [PATCH] Use mmap for binlog The main point of this patch is to use mmap to accelerate binlog writing speed, it's fairly an easy task to implement due to the inherant hirarchy. I just need to add a new class that extend `Truncatable_ostream`(see `MMAP_ostream` for detail) and maintain the underling mmap info in `struct MMAP_INFO`. But there is something that need to point out. 1. The mmaped file will be truncated to the max_binlog_size at the beginning, so the real events size will not be showed when `ls` the binlog files. 2. When some command is reading the current writting binlog file, for example `show binlog events in ''` uses Binlog_event_data_istream::read_event_header to read the file, as this file was truncated to the max_binlog_size, it will continue to read '0's after the last event in that file. I add a special condition to solve this problem, that is if (m_event_length == 0 && m_cur_binlog) return m_error->set_type(Binlog_read_error::READ_EOF); 3. relay log hasn't been tested 4. during recover process, use IO_CACHE_ostream to handle existed binlog Signed-off-by: Zhao Junwang --- include/my_sys.h | 29 +++++++++++++++ mysys/my_mmap.cc | 40 ++++++++++++++++++++ sql/basic_ostream.cc | 87 ++++++++++++++++++++++++++++++++++++++++++++ sql/basic_ostream.h | 52 ++++++++++++++++++++++++++ sql/binlog.cc | 33 ++++++++++++++--- sql/binlog.h | 4 -- sql/binlog_reader.cc | 7 +++- sql/binlog_reader.h | 10 +++-- sql/mysqld.cc | 15 ++++++++ sql/mysqld.h | 2 + sql/sys_vars.cc | 12 ++++++ 11 files changed, 276 insertions(+), 15 deletions(-) diff --git a/include/my_sys.h b/include/my_sys.h index 284960137dd..979c0d492ff 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -541,6 +541,35 @@ inline size_t my_b_bytes_in_cache(const IO_CACHE *info) { return *info->current_end - *info->current_pos; } +/* This struct is used for mmap for binlog */ +struct MMAP_INFO { + // end position of file + my_off_t end_pos_of_file{0}; + // mmap return address + uchar *addr{nullptr}; + // next write position + uchar *write_pos{nullptr}; + // next sync postion, when sync is called, it will sync from positon sync_pos, + // and the sync length will be static_cast(write_pos - sync_pos) + uchar *sync_pos{nullptr}; + // non-inclusive boundary of mmap end pos + uchar *mmap_end{nullptr}; + + // file descriptor + File file{-1}; + // instrumented file key + PSI_file_key file_key{PSI_NOT_INSTRUMENTED}; + + int error{0}; + // mmap length + size_t mmap_length; +}; + +// implemented in mysys/my_mmap.cc +int init_mmap_info(MMAP_INFO *info, File file, size_t mmap_length, + my_off_t seek_offset = 0); +int end_mmap_info(MMAP_INFO *info); + typedef uint32 ha_checksum; /* diff --git a/mysys/my_mmap.cc b/mysys/my_mmap.cc index eec03df244f..c4331cd945e 100644 --- a/mysys/my_mmap.cc +++ b/mysys/my_mmap.cc @@ -104,3 +104,43 @@ int my_msync(int fd, void *addr, size_t len, int flags) { #else #error "no mmap!" #endif + +int init_mmap_info(MMAP_INFO *info, File file, size_t mmap_length, + my_off_t seek_offset) { + DBUG_TRACE; + DBUG_ASSERT(file > 0); + info->file = file; + void *addr = mmap(NULL, mmap_length, PROT_WRITE, MAP_SHARED, file, 0); + if (addr == MAP_FAILED) { + DBUG_PRINT("mysys", ("mmap failed when init mmap info")); + return 1; + } + + info->addr = static_cast(addr); + info->mmap_length = mmap_length; + info->mmap_end = info->addr + mmap_length; + + // for mmap file, we always mmap from 0, forward the pointers seek_offset byte + info->end_pos_of_file = seek_offset; + info->write_pos = info->addr + seek_offset; + info->sync_pos = info->addr + seek_offset; + + DBUG_PRINT("info", ("init_map_info: addr = %p, mmap_length = %lu", + info->addr, static_cast(mmap_length))); + + return 0; +} + +int end_mmap_info(MMAP_INFO *info) { + DBUG_TRACE; + DBUG_PRINT("enter", ("mmap info: %p", info)); + int ret = 0; + if (info->write_pos > info->sync_pos) { + ret = my_msync(info->file, info->sync_pos, + static_cast(info->write_pos - info->sync_pos), + MS_SYNC); + info->sync_pos = info->write_pos; + } + + return ret || my_munmap(info->addr, info->mmap_length); +} diff --git a/sql/basic_ostream.cc b/sql/basic_ostream.cc index 6e0235fa1ba..8de18499022 100644 --- a/sql/basic_ostream.cc +++ b/sql/basic_ostream.cc @@ -88,6 +88,93 @@ bool IO_CACHE_ostream::sync() { return mysql_file_sync(m_io_cache.file, MYF(MY_WME)) != 0; } +MMAP_ostream::MMAP_ostream() {} +MMAP_ostream::~MMAP_ostream() { close(); } + +bool MMAP_ostream::open(PSI_file_key log_file_key, const char *file_name, + ulong mmap_length) { + File file{-1}; + if ((file = mysql_file_open(log_file_key, file_name, O_CREAT | O_RDWR, + MYF(MY_WME))) < 0) { + return true; + } + + if (my_chsize(file, mmap_length, 0, MYF(MY_WME))) { + mysql_file_close(file, MYF(0)); + return true; + } + + if (init_mmap_info(&m_mmap_info, file, mmap_length, 0)) { + mysql_file_close(file, MYF(0)); + return true; + } + + return false; +} + +bool MMAP_ostream::close() { + int ret = end_mmap_info(&m_mmap_info); + DBUG_ASSERT(m_mmap_info.file > 0); + DBUG_ASSERT(m_mmap_info.sync_pos == m_mmap_info.write_pos); + DBUG_ASSERT(m_mmap_info.end_pos_of_file >= + static_cast(m_mmap_info.write_pos - m_mmap_info.addr)); + ret |= my_chsize(m_mmap_info.file, + static_cast(m_mmap_info.end_pos_of_file), 0, + MYF(MY_WME)); + ret |= mysql_file_close(m_mmap_info.file, MYF(MY_WME)); + return ret != 0; +} + +bool MMAP_ostream::seek(my_off_t offset) { + DBUG_ASSERT(offset < m_mmap_info.mmap_length); + m_mmap_info.write_pos = m_mmap_info.addr + offset; + m_mmap_info.sync_pos = m_mmap_info.write_pos; + if (m_mmap_info.end_pos_of_file < offset) { + m_mmap_info.end_pos_of_file = offset; + } + + return false; +} + +bool MMAP_ostream::write(const unsigned char *buffer, my_off_t length) { + DBUG_ASSERT(m_mmap_info.write_pos + length < m_mmap_info.mmap_end); + memcpy(m_mmap_info.write_pos, buffer, length); + m_mmap_info.write_pos += length; + my_off_t offset = + static_cast(m_mmap_info.write_pos - m_mmap_info.addr); + if (offset > m_mmap_info.end_pos_of_file) { + m_mmap_info.end_pos_of_file = offset; + } + + return false; +} + +bool MMAP_ostream::truncate(my_off_t offset) { + if (my_chsize(m_mmap_info.file, offset, 0, MYF(MY_WME))) { + return true; + } + if (unlikely(offset < m_mmap_info.end_pos_of_file)) { + m_mmap_info.end_pos_of_file = offset; + } + + // See IO_CACHE_ostream::truncate again to check truncate behaviour + if (m_mmap_info.write_pos > m_mmap_info.addr + offset) { + m_mmap_info.write_pos = m_mmap_info.addr + offset; + m_mmap_info.sync_pos = m_mmap_info.write_pos; + } + + return false; +} + +bool MMAP_ostream::sync() { + my_msync(m_mmap_info.file, m_mmap_info.sync_pos, + static_cast(m_mmap_info.write_pos - m_mmap_info.sync_pos), + MS_SYNC); + m_mmap_info.sync_pos = m_mmap_info.write_pos; + + return false; +} + Compressed_ostream::Compressed_ostream() : m_compressor(nullptr) {} Compressed_ostream::~Compressed_ostream() {} diff --git a/sql/basic_ostream.h b/sql/basic_ostream.h index 2b8e41e108c..97a6c89f175 100644 --- a/sql/basic_ostream.h +++ b/sql/basic_ostream.h @@ -148,6 +148,58 @@ class IO_CACHE_ostream : public Truncatable_ostream { IO_CACHE m_io_cache; }; +/** + An output stream based on mmap + */ +class MMAP_ostream : public Truncatable_ostream { + public: + MMAP_ostream(); + MMAP_ostream(const MMAP_ostream &) = delete; + MMAP_ostream &operator=(const MMAP_ostream &) = delete; + ~MMAP_ostream() override; + + /** + Open stream. It opens related file and initializes MMAP_INFO. + + @param[in] log_file_key The PSI_file_key for this stream + @param[in] file_name The file that will be opened + @param[in] mmap_length Length used for mmap + @retval false Success + @retval true Error + */ + bool open(PSI_file_key log_file_key, const char* file_name, + ulong mmap_length); + + /** + Close stream. It deinitializes MMAP_INFO and closes the file it opened. + + @retval false Success + @retval true Error + */ + bool close(); + + bool write(const unsigned char *buffer, my_off_t length) override; + bool seek(my_off_t offset) override; + bool truncate(my_off_t offset) override; + + // when using mmap, It does't have a user space buffer, so no need to flush, + // just return false + bool flush() override { + return false; + } + + /** + Sync changes made to the mmap area back to disk + + @retval false Success + @retval true Error + */ + bool sync() override; + +private: + MMAP_INFO m_mmap_info; +}; + /** A basic output stream based on StringBuffer class. It has a stack buffer of size BUFFER_SIZE. It will allocate memory to create a heap buffer if diff --git a/sql/binlog.cc b/sql/binlog.cc index 980efc4c986..687ab87194b 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -270,7 +270,8 @@ class MYSQL_BIN_LOG::Binlog_ofile : public Basic_ostream { #ifdef HAVE_PSI_INTERFACE PSI_file_key log_file_key, #endif - const char *binlog_name, myf flags, bool existing = false) { + const char *binlog_name, myf flags MY_ATTRIBUTE((unused)), + bool existing = false) { DBUG_TRACE; DBUG_ASSERT(m_pipeline_head == nullptr); @@ -287,10 +288,18 @@ class MYSQL_BIN_LOG::Binlog_ofile : public Basic_ostream { } #endif - std::unique_ptr file_ostream(new IO_CACHE_ostream); - if (file_ostream->open(log_file_key, binlog_name, flags)) return true; - - m_pipeline_head = std::move(file_ostream); + // during recover process, use IO_CACHE_ostream to handle existed binlog. + if (existing|| !opt_binlog_use_mmap ) { + std::unique_ptr file_ostream(new IO_CACHE_ostream); + if (file_ostream->open(log_file_key, binlog_name, flags)) return true; + m_pipeline_head = std::move(file_ostream); + } else { + std::unique_ptr file_ostream(new MMAP_ostream); + if (file_ostream->open(log_file_key, binlog_name, + m_max_size + opt_binlog_mmap_extra_map_size)) + return true; + m_pipeline_head = std::move(file_ostream); + } /* Setup encryption for new files if needed */ if (!existing && rpl_encryption.is_enabled()) { @@ -465,12 +474,17 @@ class MYSQL_BIN_LOG::Binlog_ofile : public Basic_ostream { Set that the log file is encrypted. */ void set_encrypted() { m_encrypted = true; } + /** + Set max binlog size before rotation. + */ + void set_max_size(ulong max_size) { m_max_size = max_size; } private: my_off_t m_position = 0; int m_encrypted_header_size = 0; std::unique_ptr m_pipeline_head; bool m_encrypted = false; + ulong m_max_size = 0; }; /** @@ -3273,6 +3287,7 @@ bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log) { char search_file_name[FN_REFLEN], *name; const char *log_file_name = lex_mi->log_file_name; Log_event *ev = nullptr; + bool cur_binlog = false; // show events of current written binlog file unit->set_limit(thd, thd->lex->current_select()); limit_start = unit->offset_limit_cnt; @@ -3291,6 +3306,10 @@ bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log) { goto err; } + if (!strcmp(binary_log->get_log_fname(), name)) { + cur_binlog = true && opt_binlog_use_mmap; + } + mysql_mutex_lock(&thd->LOCK_thd_data); thd->current_linfo = &linfo; mysql_mutex_unlock(&thd->LOCK_thd_data); @@ -3298,7 +3317,8 @@ bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log) { BINLOG_FILE_READER binlog_file_reader( opt_master_verify_checksum, std::max(thd->variables.max_allowed_packet, - binlog_row_event_max_size + MAX_LOG_EVENT_HEADER)); + binlog_row_event_max_size + MAX_LOG_EVENT_HEADER), + cur_binlog); if (binlog_file_reader.open(linfo.log_file_name, pos)) { errmsg = binlog_file_reader.get_error_str(); @@ -4811,6 +4831,7 @@ bool MYSQL_BIN_LOG::open_binlog( write_error = false; + m_binlog_file->set_max_size(max_size_arg); /* open the main log file */ if (open(m_key_file_log, log_name, new_name, new_index_number)) { close_purge_index_file(); diff --git a/sql/binlog.h b/sql/binlog.h index fb90982ceee..57f64b394c6 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -131,10 +131,6 @@ struct LOG_INFO { } }; -/* - TODO use mmap instead of IO_CACHE for binlog - (mmap+fsync is two times faster than write+fsync) -*/ class MYSQL_BIN_LOG : public TC_LOG { public: class Binlog_ofile; diff --git a/sql/binlog_reader.cc b/sql/binlog_reader.cc index 9dfc5f89408..dbb57cb9bb2 100644 --- a/sql/binlog_reader.cc +++ b/sql/binlog_reader.cc @@ -60,8 +60,9 @@ static void debug_corrupt_event(unsigned char *buffer, unsigned int event_len) { Binlog_event_data_istream::Binlog_event_data_istream( Binlog_read_error *error, Basic_istream *istream, - unsigned int max_event_size) - : m_error(error), m_istream(istream), m_max_event_size(max_event_size) {} + unsigned int max_event_size, bool cur_binlog) + : m_error(error), m_istream(istream), m_max_event_size(max_event_size), + m_cur_binlog(cur_binlog) {} bool Binlog_event_data_istream::read_event_header() { return read_fixed_length( @@ -98,6 +99,8 @@ bool Binlog_event_data_istream::fill_event_data( bool Binlog_event_data_istream::check_event_header() { m_event_length = uint4korr(m_header + EVENT_LEN_OFFSET); + if (m_event_length == 0 && m_cur_binlog) + return m_error->set_type(Binlog_read_error::READ_EOF); if (m_event_length < LOG_EVENT_MINIMAL_HEADER_LEN) return m_error->set_type(Binlog_read_error::BOGUS); if (m_event_length > m_max_event_size) diff --git a/sql/binlog_reader.h b/sql/binlog_reader.h index 43b240d15e6..4da62dd9e2d 100644 --- a/sql/binlog_reader.h +++ b/sql/binlog_reader.h @@ -57,7 +57,8 @@ class Default_binlog_event_allocator { class Binlog_event_data_istream { public: Binlog_event_data_istream(Binlog_read_error *error, Basic_istream *istream, - unsigned int max_event_size); + unsigned int max_event_size, + bool cur_binlog = false); Binlog_event_data_istream() = delete; Binlog_event_data_istream(const Binlog_event_data_istream &) = delete; Binlog_event_data_istream &operator=(const Binlog_event_data_istream &) = @@ -153,6 +154,8 @@ class Binlog_event_data_istream { Basic_istream *m_istream = nullptr; unsigned int m_max_event_size; unsigned int m_event_length = 0; + // indicate whether its the current writting binlog and it's a mmap file + bool m_cur_binlog = false; /** Fill the event data into the given buffer and verify checksum if @@ -249,9 +252,10 @@ class Basic_binlog_file_reader { typedef EVENT_OBJECT_ISTREAM Event_object_istream; Basic_binlog_file_reader(bool verify_checksum, - unsigned int max_event_size = UINT_MAX) + unsigned int max_event_size = UINT_MAX, + bool cur_binlog = false) : m_ifile(&m_error), - m_data_istream(&m_error, &m_ifile, max_event_size), + m_data_istream(&m_error, &m_ifile, max_event_size, cur_binlog), m_object_istream(&m_error, &m_data_istream), m_fde(BINLOG_VERSION, ::server_version), m_verify_checksum(verify_checksum) {} diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 519926135ed..e180d6cb3b5 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -1110,6 +1110,8 @@ LEX_STRING opt_mandatory_roles; bool opt_mandatory_roles_cache = false; bool opt_always_activate_granted_roles = false; bool opt_bin_log; +bool opt_binlog_use_mmap = false; +ulong opt_binlog_mmap_extra_map_size; bool opt_general_log, opt_slow_log, opt_general_log_raw; ulonglong log_output_options; bool opt_log_queries_not_using_indexes = false; @@ -5622,6 +5624,9 @@ static int init_server_components() { if (opt_log_slave_updates && !opt_bin_log) { LogErr(WARNING_LEVEL, ER_NEED_LOG_BIN, "--log-slave-updates"); } + if (opt_binlog_use_mmap && !opt_bin_log) { + LogErr(WARNING_LEVEL, ER_NEED_LOG_BIN, "--binlog-use-mmap"); + } if (binlog_format_used && !opt_bin_log) LogErr(WARNING_LEVEL, ER_NEED_LOG_BIN, "--binlog-format"); @@ -8217,6 +8222,16 @@ struct my_option my_long_options[] = { "binary logging, use the --skip-log-bin or --disable-log-bin option.", &opt_bin_logname, &opt_bin_logname, nullptr, GET_STR_ALLOC, OPT_ARG, 0, 0, 0, nullptr, 0, nullptr}, + {"binlog-use-mmap", 0, "Use mmap for binlog.", &opt_binlog_use_mmap, + &opt_binlog_use_mmap, nullptr, GET_BOOL, NO_ARG, 0, 0, 0, nullptr, 0, + nullptr}, + {"binlog-mmap-extra-map-size", 0, + "When write a event to the underline " + "file, binlog allow the last event to exceed the max_binlog_size, so we " + "need to map some extra area to avoid sig_bus.", + &opt_binlog_mmap_extra_map_size, &opt_binlog_mmap_extra_map_size, nullptr, + GET_ULONG, REQUIRED_ARG, 4 * 1024 * 1024, 1024, 1024 * 1024 * 1024, + nullptr, 0, nullptr}, {"log-bin-index", 0, "File that holds the names for binary log files.", &opt_binlog_index_name, &opt_binlog_index_name, nullptr, GET_STR, REQUIRED_ARG, 0, 0, 0, nullptr, 0, nullptr}, diff --git a/sql/mysqld.h b/sql/mysqld.h index 0db3f7235d5..bf96fefd4a4 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -148,6 +148,8 @@ enum_server_operational_state get_server_state(); extern bool opt_large_files, server_id_supplied; extern bool opt_bin_log; +extern bool opt_binlog_use_mmap; +extern ulong opt_binlog_mmap_extra_map_size; extern bool opt_log_slave_updates; extern bool opt_log_unsafe_statements; extern bool opt_general_log, opt_slow_log, opt_general_log_raw; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 549c360b320..d4d5bfea469 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2261,6 +2261,18 @@ static Sys_var_bool Sys_log_bin("log_bin", "Whether the binary log is enabled", READ_ONLY NON_PERSIST GLOBAL_VAR(opt_bin_log), NO_CMD_LINE, DEFAULT(true)); +static Sys_var_bool Sys_binlog_use_mmap( + "binlog_use_mmap", "Whether use mmap for binlog", + READ_ONLY NON_PERSIST GLOBAL_VAR(opt_binlog_use_mmap), NO_CMD_LINE, + DEFAULT(false)); + +static Sys_var_ulong Sys_binlog_mmap_extra_map_size( + "binlog_mmap_extra_map_size", + "Extra map area to avoid sig_bus when using binlog_use_mmap.", + READ_ONLY NON_PERSIST GLOBAL_VAR(opt_binlog_mmap_extra_map_size), + CMD_LINE(REQUIRED_ARG), VALID_RANGE(1024, 1024 * 1024 * 1024), + DEFAULT(4 * 1024 * 1024), BLOCK_SIZE(1)); + static bool transaction_write_set_check(sys_var *self, THD *thd, set_var *var) { if (check_session_admin(self, thd, var)) return true; // Can't change the algorithm when group replication is enabled.