Description:
When I test performance of MGR, i found there is a performance optimization point on pipeline.
How to repeat:
applier.cc(apply_data_packet)
while ((payload != payload_end) && !error) {
uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);
Data_packet *new_packet = new Data_packet(payload, event_len);
payload = payload + event_len;
Pipeline_event *pevent = new Pipeline_event(new_packet, fde_evt, cache);
error = inject_event_into_pipeline(pevent, cont);
delete pevent;
DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
error = 1;
}
});
Suggested fix:
new_packet object can be reuse, which can improve performance when high concurrency.
Below is my modification:
applier.h:
int apply_data_packet(Data_packet *data_packet,
Format_description_log_event *fde_evt,
IO_CACHE *cache,
Continuation *cont,
Data_packet *new_packet);
pipeline_interfaces.h:
1) class Data_packet add functions and variable
Data_packet(ulong len)
: Packet(DATA_PACKET_TYPE), payload(NULL), max_len(len)
{
payload= (uchar*)my_malloc(
PSI_NOT_INSTRUMENTED,
len, MYF(0));
}
void set_payload(const uchar *data, ulong len)
{
if (max_len < len )
{
my_free(payload);
payload= (uchar*)my_malloc(
PSI_NOT_INSTRUMENTED,
len * 2, MYF(0));
max_len = len * 2;
}
this->len = len;
memcpy(payload, data, len);
}
ulong max_len;
2) class Pipeline_event destructor no longer release variable of packet
/*if (packet != NULL)
{
delete packet;
}*/
3) reset_pipeline_event function no longer delete variable of packet
if (packet != NULL)
{
//delete packet; /* purecov: inspected */
packet = NULL; /* purecov: inspected */
}
4) convert_packet_to_log_event function no longer delete variable of packet
//delete packet;
packet= NULL;
applier.cc
5) apply_data_packet function
int Applier_module::apply_data_packet(Data_packet *data_packet,
Format_description_log_event *fde_evt,
IO_CACHE *cache, Continuation *cont,
Data_packet *new_packet) {
int error = 0;
uchar *payload = data_packet->payload;
uchar *payload_end = data_packet->payload + data_packet->len;
if (check_single_primary_queue_status()) return 1; /* purecov: inspected */
while ((payload != payload_end) && !error) {
uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);
//Data_packet *new_packet = new Data_packet(payload, event_len);
new_packet->set_payload(payload, event_len);
payload = payload + event_len;
Pipeline_event *pevent = new Pipeline_event(new_packet, fde_evt, cache);
error = inject_event_into_pipeline(pevent, cont);
delete pevent;
DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
error = 1;
}
});
}
return error;
}
6) applier_thread_handle function
Packet *packet = NULL;
new_packet = new Data_packet(4096);
...
case DATA_PACKET_TYPE:
packet_application_error =
apply_data_packet((Data_packet *)packet, fde_evt, cache, cont, new_packet);
Description: When I test performance of MGR, i found there is a performance optimization point on pipeline. How to repeat: applier.cc(apply_data_packet) while ((payload != payload_end) && !error) { uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET); Data_packet *new_packet = new Data_packet(payload, event_len); payload = payload + event_len; Pipeline_event *pevent = new Pipeline_event(new_packet, fde_evt, cache); error = inject_event_into_pipeline(pevent, cont); delete pevent; DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", { if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) { error = 1; } }); Suggested fix: new_packet object can be reuse, which can improve performance when high concurrency. Below is my modification: applier.h: int apply_data_packet(Data_packet *data_packet, Format_description_log_event *fde_evt, IO_CACHE *cache, Continuation *cont, Data_packet *new_packet); pipeline_interfaces.h: 1) class Data_packet add functions and variable Data_packet(ulong len) : Packet(DATA_PACKET_TYPE), payload(NULL), max_len(len) { payload= (uchar*)my_malloc( PSI_NOT_INSTRUMENTED, len, MYF(0)); } void set_payload(const uchar *data, ulong len) { if (max_len < len ) { my_free(payload); payload= (uchar*)my_malloc( PSI_NOT_INSTRUMENTED, len * 2, MYF(0)); max_len = len * 2; } this->len = len; memcpy(payload, data, len); } ulong max_len; 2) class Pipeline_event destructor no longer release variable of packet /*if (packet != NULL) { delete packet; }*/ 3) reset_pipeline_event function no longer delete variable of packet if (packet != NULL) { //delete packet; /* purecov: inspected */ packet = NULL; /* purecov: inspected */ } 4) convert_packet_to_log_event function no longer delete variable of packet //delete packet; packet= NULL; applier.cc 5) apply_data_packet function int Applier_module::apply_data_packet(Data_packet *data_packet, Format_description_log_event *fde_evt, IO_CACHE *cache, Continuation *cont, Data_packet *new_packet) { int error = 0; uchar *payload = data_packet->payload; uchar *payload_end = data_packet->payload + data_packet->len; if (check_single_primary_queue_status()) return 1; /* purecov: inspected */ while ((payload != payload_end) && !error) { uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET); //Data_packet *new_packet = new Data_packet(payload, event_len); new_packet->set_payload(payload, event_len); payload = payload + event_len; Pipeline_event *pevent = new Pipeline_event(new_packet, fde_evt, cache); error = inject_event_into_pipeline(pevent, cont); delete pevent; DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", { if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) { error = 1; } }); } return error; } 6) applier_thread_handle function Packet *packet = NULL; new_packet = new Data_packet(4096); ... case DATA_PACKET_TYPE: packet_application_error = apply_data_packet((Data_packet *)packet, fde_evt, cache, cont, new_packet);