Description:
When I test performance of MGR, i found there is a performance optimization point on pipeline.
How to repeat:
applier.cc(apply_data_packet)
while ((payload != payload_end) && !error) {
uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);
Data_packet *new_packet = new Data_packet(payload, event_len);
payload = payload + event_len;
Pipeline_event *pevent = new Pipeline_event(new_packet, fde_evt, cache);
error = inject_event_into_pipeline(pevent, cont);
delete pevent;
DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
error = 1;
}
});
Suggested fix:
new_packet object can be reuse, which can improve performance when high concurrency.
Below is my modification:
applier.h:
int apply_data_packet(Data_packet *data_packet,
Format_description_log_event *fde_evt,
IO_CACHE *cache,
Continuation *cont,
Data_packet *new_packet);
pipeline_interfaces.h:
1) class Data_packet add functions and variable
Data_packet(ulong len)
: Packet(DATA_PACKET_TYPE), payload(NULL), max_len(len)
{
payload= (uchar*)my_malloc(
PSI_NOT_INSTRUMENTED,
len, MYF(0));
}
void set_payload(const uchar *data, ulong len)
{
if (max_len < len )
{
my_free(payload);
payload= (uchar*)my_malloc(
PSI_NOT_INSTRUMENTED,
len * 2, MYF(0));
max_len = len * 2;
}
this->len = len;
memcpy(payload, data, len);
}
ulong max_len;
2) class Pipeline_event destructor no longer release variable of packet
/*if (packet != NULL)
{
delete packet;
}*/
3) reset_pipeline_event function no longer delete variable of packet
if (packet != NULL)
{
//delete packet; /* purecov: inspected */
packet = NULL; /* purecov: inspected */
}
4) convert_packet_to_log_event function no longer delete variable of packet
//delete packet;
packet= NULL;
applier.cc
5) apply_data_packet function
int Applier_module::apply_data_packet(Data_packet *data_packet,
Format_description_log_event *fde_evt,
IO_CACHE *cache, Continuation *cont,
Data_packet *new_packet) {
int error = 0;
uchar *payload = data_packet->payload;
uchar *payload_end = data_packet->payload + data_packet->len;
if (check_single_primary_queue_status()) return 1; /* purecov: inspected */
while ((payload != payload_end) && !error) {
uint event_len = uint4korr(((uchar *)payload) + EVENT_LEN_OFFSET);
//Data_packet *new_packet = new Data_packet(payload, event_len);
new_packet->set_payload(payload, event_len);
payload = payload + event_len;
Pipeline_event *pevent = new Pipeline_event(new_packet, fde_evt, cache);
error = inject_event_into_pipeline(pevent, cont);
delete pevent;
DBUG_EXECUTE_IF("stop_applier_channel_after_reading_write_rows_log_event", {
if (payload[EVENT_TYPE_OFFSET] == binary_log::WRITE_ROWS_EVENT) {
error = 1;
}
});
}
return error;
}
6) applier_thread_handle function
Packet *packet = NULL;
new_packet = new Data_packet(4096);
...
case DATA_PACKET_TYPE:
packet_application_error =
apply_data_packet((Data_packet *)packet, fde_evt, cache, cont, new_packet);