//built using:
//       gcc bug20540.c -o bug20540 -L/home/sbester/servers/5.0-bk/lib -I/home/sbester/servers/5.0-bk/include  -lmysqlclient_r -lz -lpthread 
//run using:
//       ./bug20540 200 300 10 1 "call p1()" 192.168.250.3 3306 test root
//try maintain 200 threads for 300 seconds, each thread running the specified SQL 10 times.




#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <sys/resource.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#include "mysql.h"

char query[4096]={0};
char db[128]={0};
char host[128]={0};
char user[128]={0};
char pass[128]={0};
int port=3306;
int thrd=0;
int numb=0;
int leng=0;
int mssleep=0;

sem_t hSemaphore;
int dwWaitResult; 
int num_threads=0;
pthread_mutex_t num_threads_mutex = PTHREAD_MUTEX_INITIALIZER;
//sem_t num_threads_mutex;
time_t timestart=0;


void thread_counter(signed int diff)
{
	//sem_wait(&num_threads_mutex);
	pthread_mutex_lock(&num_threads_mutex);
	num_threads+=diff;
	pthread_mutex_unlock(&num_threads_mutex);
	//sem_post(&num_threads_mutex);
}

void* thread_function(void* arg)
{
   
    int retval=0;
	MYSQL *m;
	MYSQL_RES *r;
	MYSQL_ROW w;
	int res=0,i=0,f=0,numfields;
	mysql_thread_init();
	m=mysql_init(NULL);
	thread_counter(1);
	if(!mysql_real_connect(m, host, user, pass,NULL, port, NULL, CLIENT_FOUND_ROWS|CLIENT_MULTI_STATEMENTS))
	{
		printf("cannot connect (%d - %s)\n",mysql_errno(m),mysql_error(m));
		thread_counter(-1);
		pthread_exit(NULL);
	}
	mysql_select_db(m,db);
	for(i=0;i<numb;i++)
	{
		res=mysql_query(m,query);
		if(res!=0)
		{
			printf("Query failed (%d) - %s: %s\n",mysql_errno(m),mysql_error(m),query);
			usleep(1000 * 500);//sleep in order to not be flooded with errors
			continue;
		}
		do
		{
			r= mysql_store_result(m);
			if (r)
			{
				numfields=mysql_num_fields(r);
				while (w= mysql_fetch_row(r))
				{
					//for(f=0;f<numfields;f++)
					//{
					//	printf("field %03d: %s\n",f,w[f]);
					//}
				}
				mysql_free_result(r);
			}
			else //no rows returned. was it a select?
			{
				if(mysql_field_count(m)>0)
				{
					printf("No results for %s.  (%d) - %s\n",query,mysql_errno(m),mysql_error(m));
				}
			}
		} while (!mysql_next_result(m));
		//usleep(1000*mssleep);//sleep inbetween repetitions.
	}
	mysql_close(m);
	thread_counter(-1);
	mysql_thread_end();
	
	sem_post(&hSemaphore);
	pthread_exit(NULL);
}


void help()
{
	printf("Query load tester for MySQL - Written by Shane Bester (MySQL inc.)\n\n");
	printf("Usage: Run as follows:\n");
	printf("test1.exe [num threads] [run for x seconds] [loops per thread] [ms to sleep between queries] [\"query\"] [host] [port] [database] [user] <password>\n");
	printf("example:\n");
	printf("test1.exe 50 300 5 1 \"call p1()\" 127.0.0.1 3306 test root 12345\n");
	printf("The above will run 50 concurrent threads, The test will last 300 seconds.\n");
	printf("Enjoy!\n");
	exit(1);
}

int main(int argc, const char* argv[])
{
	int nRetCode = 0,j=0,res=0;
	int a_threadId;
	unsigned long i=0;
	unsigned long oldi=0;
	pthread_t a_thread;

	if(argc!=10 && argc!=11)
			help();

	timestart=time(NULL);;
	thrd=atoi(argv[1]);
	leng=atoi(argv[2]);
	numb=atoi(argv[3]);
	mssleep=atoi(argv[4]);
	strncpy(query,argv[5],sizeof(query));
	strncpy(host,argv[6],sizeof(host));
	port=atoi(argv[7]);
	strncpy(db,argv[8],sizeof(db));
	strncpy(user,argv[9],sizeof(user));
	if(argc==11)
		strncpy(pass,argv[10],sizeof(pass));


	
    i = sem_init(&hSemaphore,0,thrd);
	if (i != 0)
    {
        perror("Semaphore initialization failed for hSemaphore");
        exit(1);
    }

	MYSQL *m;
	printf("Testing DB connection before spawning threads....\n");
	m=mysql_init(NULL);
	if(!mysql_real_connect(m, host, user, pass,NULL, port, NULL, CLIENT_FOUND_ROWS|CLIENT_MULTI_STATEMENTS))
	{
		printf("cannot connect (%d - %s)\n",mysql_errno(m),mysql_error(m));
		perror("Please fix connectivity problems :-0\n");
		exit(1);
	}
	printf("Connected....\n");

	if (!mysql_thread_safe())
		fprintf(stderr, "Thread Safe OFF\n");
	else
		fprintf(stderr, "Thread Safe ON\n");

	if(mysql_select_db(m,db))
	{
		printf("unable to select '%s' db. Does it exist and user have permissions?\n",db);
		mysql_close(m);
		exit(1);
	}
	mysql_close(m);
	
	printf("Spawning %d new threads\n",thrd);
	
	for(i=0;(time(NULL)-timestart) < leng;)
	{
		if(i>oldi)
			printf("Current Threads: %u     Total Created: %d     \r",num_threads,i);
		usleep(1000 * 1);//fire threads off at a rate of one per 10 milliseconds.
		//must be quick enough otherwise you'll never get up to thread_count
		sem_wait(&hSemaphore);
		oldi=i;
		i++;
		res = pthread_create(&a_thread, NULL, thread_function, (void*)NULL);
		if(res!=0)
		{
			printf("Thread creation failed (errno: %d)\n",res);
			printf("Better check ulimit settings.  Perhaps lower stack size!!\n");
			exit(1);	
		}
		pthread_detach(a_thread);
	}
		
	for(j=0;j<600;j++)
	{
		printf("\nWaiting for %d threads to die now...\n",num_threads);
		if(num_threads<1)
		{
			goto done;
		}
		usleep(1000 * 500);
	}
done:
	printf("\n\nFinished. A total of %u threads got launched\n",i);
	printf("Start time: %u \n",timestart);
	printf("End time:   %u \n",time(NULL));
	printf("Time taken: %u \n",time(NULL)-timestart);

	sem_destroy(&hSemaphore);
	//sem_destroy(&num_threads_mutex);
	
	return nRetCode;
}
