Bug #96985 MySQL group replication pipeline memory reuse
Submitted: 24 Sep 2019 7:17 Modified: 24 Oct 2019 7:06
Reporter: lieyia -- Email Updates:
Status: Verified Impact on me:
None 
Category:MySQL Server: Replication Severity:S5 (Performance)
Version:5.7.22;8.0.11 OS:Any
Assigned to: CPU Architecture:Any
Tags: pipeline; memory

[24 Sep 2019 7:17] lieyia --
Description:
    When I test performance of MGR, i found there is a performance optimization point on pipeline.

How to repeat:
applier.cc(apply_data_packet)
while ((payload != payload_end) && !error) {
    uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);

    Data_packet *new_packet = new Data_packet(payload, event_len);
    payload = payload + event_len;

    Pipeline_event *pevent = new Pipeline_event(new_packet, fde_evt, cache);
    error = inject_event_into_pipeline(pevent, cont);

    delete pevent;
    DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
      if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
        error = 1;
      }
    });

Suggested fix:
  new_packet object can be reuse,  which can improve performance when high concurrency.
  Below is my modification:

applier.h:
  int apply_data_packet(Data_packet *data_packet,
                        Format_description_log_event *fde_evt,
                        IO_CACHE *cache,
                        Continuation *cont,
                        Data_packet *new_packet);

pipeline_interfaces.h:
1) class Data_packet add functions and variable
Data_packet(ulong len)
    : Packet(DATA_PACKET_TYPE), payload(NULL), max_len(len)
  {
    payload= (uchar*)my_malloc(
                               PSI_NOT_INSTRUMENTED,
                               len, MYF(0));
  }

  void set_payload(const uchar *data, ulong len) 
  {
     if (max_len < len )
     {
         my_free(payload);
         payload= (uchar*)my_malloc(
                               PSI_NOT_INSTRUMENTED,
                               len * 2, MYF(0));
         max_len = len * 2;
     }
     this->len = len;
     memcpy(payload, data, len);
  }

  ulong max_len;

2)  class Pipeline_event  destructor no longer release variable of packet
  /*if (packet != NULL)
    {
      delete packet;
    }*/

3) reset_pipeline_event function no longer delete variable of packet
   if (packet != NULL)
    {
      //delete packet; /* purecov: inspected */
      packet = NULL; /* purecov: inspected */
    }

4) convert_packet_to_log_event function no longer delete variable of packet
  //delete packet;
    packet= NULL;

applier.cc
5) apply_data_packet function
 int Applier_module::apply_data_packet(Data_packet *data_packet,
                                      Format_description_log_event *fde_evt,
                                      IO_CACHE *cache, Continuation *cont, 
                                      Data_packet *new_packet) {
  int error = 0;
  uchar *payload = data_packet->payload;
  uchar *payload_end = data_packet->payload + data_packet->len;

  if (check_single_primary_queue_status()) return 1; /* purecov: inspected */

  while ((payload != payload_end) && !error) {
    uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);

    //Data_packet *new_packet = new Data_packet(payload, event_len);
   new_packet->set_payload(payload, event_len); 
   payload = payload + event_len;

    Pipeline_event *pevent = new Pipeline_event(new_packet, fde_evt, cache);
    error = inject_event_into_pipeline(pevent, cont);

    delete pevent;
    DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
      if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
        error = 1;
      }
    });
  }

  return error;
}

6) applier_thread_handle  function
    Packet *packet = NULL;
    new_packet = new Data_packet(4096);
    ...
    case DATA_PACKET_TYPE:
        packet_application_error =
            apply_data_packet((Data_packet *)packet, fde_evt, cache, cont, new_packet);
[24 Sep 2019 7:21] lieyia --
Data_packet *new_packet = NULL;
[24 Oct 2019 3:51] WANG GUANGYOU
How much performance gain do you get ? Can you help to give some data?
[24 Oct 2019 7:06] MySQL Verification Team
Hi,

In order to submit contributions you must first sign the Oracle Contribution Agreement (OCA).
For additional information please check http://www.oracle.com/technetwork/community/oca-486395.html.
If you have any questions, please contact the MySQL community team. 
https://dev.mysql.com/community/