diff -ur mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/include/transporter/TransporterRegistry.hpp mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/include/transporter/TransporterRegistry.hpp --- mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/include/transporter/TransporterRegistry.hpp 2007-12-19 18:52:46.000000000 +0100 +++ mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-01-29 16:11:44.000000000 +0100 @@ -339,11 +339,28 @@ */ void removeTransporter(NodeId nodeId); + /** * Used in polling if exists TCP_Transporter */ +#ifdef HAVE_POLL + + /** + * Gets the value of revents field for pollfd structure assigned for socket descriptor. + * + * @param socketfd socket descriptor. + * + * @return revents or 0 if no pollfd structure found for given socket desriptor + */ + int pollFdRevents(int socketfd); + + struct pollfd * pollFds; + // number of filledin structures in pollFds + int pollFdsCount; +#else // #ifdef HAVE_POLL int tcpReadSelectReply; fd_set tcpReadset; +#endif // #ifdef HAVE_POLL Uint32 poll_TCP(Uint32 timeOutMillis); Uint32 poll_SCI(Uint32 timeOutMillis); diff -ur mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/transporter/TCP_Transporter.cpp mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/transporter/TCP_Transporter.cpp --- mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2007-12-19 18:52:46.000000000 +0100 +++ mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-01-29 17:33:47.000000000 +0100 @@ -256,16 +256,27 @@ bool TCP_Transporter::sendIsPossible(struct timeval * timeout) { if(theSocket != NDB_INVALID_SOCKET){ + +#ifdef HAVE_POLL + struct pollfd ufds; + ufds.fd = theSocket; + ufds.events= POLLOUT; + ufds.revents = 0; + + int pollReply = poll(&ufds, 1, timeout->tv_sec * 1000 + timeout->tv_usec / 1000); + + // TODO should we handle POLLERR | POLLHUP ? + return (pollReply > 0) && (ufds.revents & POLLOUT); +#else fd_set writeset; FD_ZERO(&writeset); FD_SET(theSocket, &writeset); int selectReply = select(theSocket + 1, NULL, &writeset, NULL, timeout); - if ((selectReply > 0) && FD_ISSET(theSocket, &writeset)) - return true; - else - return false; + return (selectReply > 0) && FD_ISSET(theSocket, &writeset); +#endif + } return false; } diff -ur mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/transporter/Transporter.cpp mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/transporter/Transporter.cpp --- mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/transporter/Transporter.cpp 2007-12-19 18:53:11.000000000 +0100 +++ mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/transporter/Transporter.cpp 2008-01-29 16:27:57.000000000 +0100 @@ -152,6 +152,7 @@ return connect_client(sockfd); } + bool Transporter::connect_client(NDB_SOCKET_TYPE sockfd) { diff -ur mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/transporter/TransporterRegistry.cpp mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/transporter/TransporterRegistry.cpp --- mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2007-12-19 18:52:59.000000000 +0100 +++ mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-01-29 16:09:54.000000000 +0100 @@ -93,6 +93,11 @@ theTransporters = new Transporter * [maxTransporters]; performStates = new PerformState [maxTransporters]; ioStates = new IOState [maxTransporters]; + +#ifdef HAVE_POLL + pollFds = new struct pollfd [maxTransporters]; +#endif + // Initialize member variables nTransporters = 0; @@ -149,6 +154,10 @@ delete[] performStates; delete[] ioStates; +#ifdef HAVE_POLL + delete[] pollFds; +#endif + if (m_mgm_handle) ndb_mgm_destroy_handle(&m_mgm_handle); @@ -246,12 +255,21 @@ // wait for socket close for 1 second to let message arrive at client { - fd_set a_set; - FD_ZERO(&a_set); - FD_SET(sockfd, &a_set); - struct timeval timeout; - timeout.tv_sec = 1; timeout.tv_usec = 0; - select(sockfd+1, &a_set, 0, 0, &timeout); +#ifdef HAVE_POLL + struct pollfd ufds; + ufds.fd = sockfd; + ufds.events= POLLIN | POLLPRI; + poll(&ufds, 1, 1000); + +#else // HAVE_POLL + fd_set a_set; + FD_ZERO(&a_set); + FD_SET(sockfd, &a_set); + struct timeval timeout; + timeout.tv_sec = 1; timeout.tv_usec = 0; + select(sockfd+1, &a_set, 0, 0, &timeout); +#endif // HAVE_POLL + } DBUG_RETURN(false); } @@ -685,9 +703,14 @@ { retVal |= poll_TCP(timeOutMillis); } + +#ifndef HAVE_POLL + // not sure what is this member variable for; as far as I see it could easly be declared local in poll_TCP [lukasz osipiuk] else tcpReadSelectReply = 0; #endif + +#endif #ifdef NDB_SCI_TRANSPORTER if(nSCITransporters > 0) retVal |= poll_SCI(timeOutMillis); @@ -748,16 +771,22 @@ bool hasdata = false; if (false && nTCPTransporters == 0) { +#ifndef HAVE_POLL tcpReadSelectReply = 0; +#endif return 0; } +#ifdef HAVE_POLL + pollFdsCount = 0; +#else NDB_SOCKET_TYPE maxSocketValue = -1; // Needed for TCP/IP connections // The read- and writeset are used by select FD_ZERO(&tcpReadset); +#endif // Prepare for sending and receiving for (int i = 0; i < nTCPTransporters; i++) { @@ -768,18 +797,33 @@ if (is_connected(node_id) && t->isConnected()) { const NDB_SOCKET_TYPE socket = t->getSocket(); + + +#ifdef HAVE_POLL + pollFds[pollFdsCount].fd = socket; + pollFds[pollFdsCount].events = POLLIN | POLLPRI; + pollFds[pollFdsCount].revents = 0; + ++pollFdsCount; +#else // Find the highest socket value. It will be used by select if (socket > maxSocketValue) - maxSocketValue = socket; - + maxSocketValue = socket; // Put the connected transporters in the socket read-set FD_SET(socket, &tcpReadset); +#endif + } hasdata |= t->hasReceiveData(); } timeOutMillis = hasdata ? 0 : timeOutMillis; +// TODO error control should be added here + +#ifdef HAVE_POLL + int pollReply = poll(pollFds, pollFdsCount, timeOutMillis); +#else +// TODO maxSocketValue should be tester for >= FD_SETSIZE struct timeval timeout; timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; @@ -790,6 +834,7 @@ tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout); if(false && tcpReadSelectReply == -1 && errno == EINTR) g_eventLogger.info("woke-up by signal"); +#endif #ifdef NDB_WIN32 if(tcpReadSelectReply == SOCKET_ERROR) @@ -798,11 +843,28 @@ } #endif +#ifdef HAVE_POLL + return pollReply || hasdata; +#else return tcpReadSelectReply || hasdata; +#endif + + } #endif +int +TransporterRegistry::pollFdRevents(int socketfd) { + for (int i = 0; i < pollFdsCount; ++i) { + if (pollFds[i].fd == socketfd) { + return pollFds[i].revents; + } + } + return 0; +} + + void TransporterRegistry::performReceive() { @@ -816,9 +878,13 @@ if(is_connected(nodeId)){ if(t->isConnected()) { +#ifdef HAVE_POLL + if (pollFdRevents(socket) & (POLLIN | POLLPRI)) +#else if (FD_ISSET(socket, &tcpReadset)) - { - t->doReceive(); +#endif + { + t->doReceive(); } if (t->hasReceiveData()) @@ -828,7 +894,7 @@ transporter_recv_from(callbackObj, nodeId); Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]); t->updateReceiveDataPtr(szUsed); - } + } } } } diff -ur mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/util/SocketClient.cpp mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/util/SocketClient.cpp --- mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/util/SocketClient.cpp 2007-12-19 18:53:13.000000000 +0100 +++ mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/util/SocketClient.cpp 2008-01-29 16:35:13.000000000 +0100 @@ -111,6 +111,7 @@ bool use_timeout; SOCKOPT_OPTLEN_TYPE len; int flags; + NDB_SOCKET_TYPE sockfd; if (m_sockfd == NDB_INVALID_SOCKET) { @@ -145,9 +146,41 @@ goto done; // connected immediately. if (r < 0 && (errno != EINPROGRESS)) { - NDB_CLOSE_SOCKET(m_sockfd); - m_sockfd= NDB_INVALID_SOCKET; - return NDB_INVALID_SOCKET; + goto socket_error; + } + + +#ifdef HAVE_POLL + struct pollfd ufds; + ufds.fd= m_sockfd; + ufds.events= POLLIN | POLLPRI | POLLOUT; + ufds.revents = 0; + + if (poll(&ufds, + 1, + use_timeout? m_connect_timeout_sec * 1000 : -1 ) <= 0) { + + goto socket_error; + } + + if (ufds.revents & (POLLIN | POLLPRI | POLLOUT)) { + len= sizeof(r); + if (getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &r, &len) < 0 || r) + { + // Solaris got an error... different than others + goto socket_error; + } + } + else { + // select error or timeout + goto socket_error; + } + + +#else + + if (m_sockfd >= FD_SETSIZE) { + goto socket_error; } FD_ZERO(&rset); @@ -160,9 +193,7 @@ if ((r= select(m_sockfd+1, &rset, &wset, NULL, use_timeout? &tval : NULL)) == 0) { - NDB_CLOSE_SOCKET(m_sockfd); - m_sockfd= NDB_INVALID_SOCKET; - return NDB_INVALID_SOCKET; + goto socket_error; } if (FD_ISSET(m_sockfd, &rset) || FD_ISSET(m_sockfd, &wset)) @@ -171,18 +202,16 @@ if (getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &r, &len) < 0 || r) { // Solaris got an error... different than others - NDB_CLOSE_SOCKET(m_sockfd); - m_sockfd= NDB_INVALID_SOCKET; - return NDB_INVALID_SOCKET; + goto socket_error; } } else { // select error, probably m_sockfd not set. - NDB_CLOSE_SOCKET(m_sockfd); - m_sockfd= NDB_INVALID_SOCKET; - return NDB_INVALID_SOCKET; + goto socket_error; } +#endif // HAVE_POLL + done: fcntl(m_sockfd, F_SETFL, flags); @@ -190,13 +219,17 @@ if (m_auth) { if (!m_auth->client_authenticate(m_sockfd)) { - NDB_CLOSE_SOCKET(m_sockfd); - m_sockfd= NDB_INVALID_SOCKET; - return NDB_INVALID_SOCKET; + goto socket_error; } } - NDB_SOCKET_TYPE sockfd= m_sockfd; + sockfd= m_sockfd; m_sockfd= NDB_INVALID_SOCKET; return sockfd; + +socket_error: + NDB_CLOSE_SOCKET(m_sockfd); + m_sockfd= NDB_INVALID_SOCKET; + return NDB_INVALID_SOCKET; + } diff -ur mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/util/SocketServer.cpp mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/util/SocketServer.cpp --- mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/util/SocketServer.cpp 2007-12-19 18:52:58.000000000 +0100 +++ mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/util/SocketServer.cpp 2008-01-30 15:10:52.000000000 +0100 @@ -154,52 +154,97 @@ void SocketServer::doAccept(){ + +#ifdef HAVE_POLL +// TODO set MAX_SERVICES to reasonable value +#define MAX_SERVICES 1024 + struct pollfd pollFds[MAX_SERVICES]; +#else fd_set readSet, exceptionSet; FD_ZERO(&readSet); FD_ZERO(&exceptionSet); - - m_services.lock(); int maxSock = 0; +#endif + + m_services.lock(); for (unsigned i = 0; i < m_services.size(); i++){ +#ifdef HAVE_POLL + pollFds[i].fd = m_services[i].m_socket; + pollFds[i].events = POLLIN | POLLPRI; + pollFds[i].revents = 0; +#else const NDB_SOCKET_TYPE s = m_services[i].m_socket; FD_SET(s, &readSet); FD_SET(s, &exceptionSet); maxSock = (maxSock > s ? maxSock : s); +#endif } + +#ifdef HAVE_POLL + if(poll(pollFds, m_services.size(), 1000) > 0) { +#else struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; - +// TODO socket should be tested for >= FD_SETSIZE if select() is in use if(select(maxSock + 1, &readSet, 0, &exceptionSet, &timeout) > 0){ - for (unsigned i = 0; i < m_services.size(); i++){ +#endif + for (unsigned i = 0; i < m_services.size(); i++) { ServiceInstance & si = m_services[i]; - + + +#ifdef HAVE_POLL + struct pollfd* pollFd = NULL; + // find matching pollFd. + for (unsigned j = 0; pollFd == NULL && j < m_services.size(); ++j) { + if (pollFds[i].fd = si.m_socket) { + pollFd = &pollFds[i]; + } + } + + if (pollFd == NULL) { + // should not happen + DEBUG("could not find socket in pollFds"); + continue; + } + + if (pollFd->revents & (POLLIN | POLLPRI)) { +#else if(FD_ISSET(si.m_socket, &readSet)){ - NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0); - if(childSock == NDB_INVALID_SOCKET){ - continue; - } +#endif + NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0); + if(childSock == NDB_INVALID_SOCKET){ + continue; + } - SessionInstance s; - s.m_service = si.m_service; - s.m_session = si.m_service->newSession(childSock); - if(s.m_session != 0) - { - m_session_mutex.lock(); - m_sessions.push_back(s); - startSession(m_sessions.back()); - m_session_mutex.unlock(); - } + SessionInstance s; + s.m_service = si.m_service; + s.m_session = si.m_service->newSession(childSock); + if(s.m_session != 0) + { + m_session_mutex.lock(); + m_sessions.push_back(s); + startSession(m_sessions.back()); + m_session_mutex.unlock(); + } - continue; + continue; } - + +#ifdef HAVE_POLL + if (pollFd->revents & (POLLERR | POLLHUP)) { + DEBUG("POLLERR | POLLHUP set for socket"); + continue; + } +#else if(FD_ISSET(si.m_socket, &exceptionSet)){ - DEBUG("socket in the exceptionSet"); - continue; + DEBUG("socket in the exceptionSet"); + continue; } +#endif } } + m_services.unlock(); } diff -ur mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/util/socket_io.cpp mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/util/socket_io.cpp --- mysql-5.1.23-ndb-6.3.7-telco-orig/storage/ndb/src/common/util/socket_io.cpp 2007-12-19 18:52:45.000000000 +0100 +++ mysql-5.1.23-ndb-6.3.7-telco-poll/storage/ndb/src/common/util/socket_io.cpp 2008-01-29 17:25:25.000000000 +0100 @@ -20,13 +20,25 @@ #include #include + // TODO socket should be tested for >= FD_SETSIZE if select() is in use + + extern "C" int read_socket(NDB_SOCKET_TYPE socket, int timeout_millis, char * buf, int buflen){ if(buflen < 1) return 0; - + + int res; + +#ifdef HAVE_POLL + struct pollfd ufds; + ufds.fd = socket; + ufds.events= POLLIN | POLLPRI; + ufds.revents = 0; + res = poll(&ufds, 1, timeout_millis); +#else fd_set readset; FD_ZERO(&readset); FD_SET(socket, &readset); @@ -35,13 +47,18 @@ timeout.tv_sec = (timeout_millis / 1000); timeout.tv_usec = (timeout_millis % 1000) * 1000; - const int selectRes = select(socket + 1, &readset, 0, 0, &timeout); - if(selectRes == 0) - return 0; - - if(selectRes == -1){ + res = select(socket + 1, &readset, 0, 0, &timeout); +#endif + + if (res <= 0) { + return res; + } + +#ifdef HAVE_POLL + if (ufds.revents & (POLLERR | POLLHUP)) { return -1; } +#endif return recv(socket, &buf[0], buflen, 0); } @@ -53,6 +70,14 @@ if(buflen <= 1) return 0; + int res; + +#ifdef HAVE_POLL + struct pollfd ufds; + ufds.fd = socket; + ufds.events= POLLIN | POLLPRI; + ufds.revents = 0; +#else fd_set readset; FD_ZERO(&readset); FD_SET(socket, &readset); @@ -60,23 +85,32 @@ struct timeval timeout; timeout.tv_sec = (timeout_millis / 1000); timeout.tv_usec = (timeout_millis % 1000) * 1000; +#endif if(mutex) NdbMutex_Unlock(mutex); Uint64 tick= NdbTick_CurrentMillisecond(); - const int selectRes = select(socket + 1, &readset, 0, 0, &timeout); +#ifdef HAVE_POLL + res = poll(&ufds, 1, timeout_millis); +#else + res = select(socket + 1, &readset, 0, 0, &timeout); +#endif *time= NdbTick_CurrentMillisecond() - tick; if(mutex) NdbMutex_Lock(mutex); - if(selectRes == 0){ - return 0; + + if(res <= 0) { + return res; } - if(selectRes == -1){ +#ifdef HAVE_POLL + if (ufds.revents & (POLLERR | POLLHUP)) { return -1; } +#endif + char* ptr = buf; int len = buflen; @@ -128,18 +162,36 @@ tmp -= t; } +#ifdef HAVE_POLL + struct pollfd ufds; + ufds.fd = socket; + ufds.events= POLLIN | POLLPRI; + ufds.revents = 0; +#else FD_ZERO(&readset); FD_SET(socket, &readset); timeout.tv_sec = ((timeout_millis - *time) / 1000); timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000; +#endif tick= NdbTick_CurrentMillisecond(); - const int selectRes = select(socket + 1, &readset, 0, 0, &timeout); +#ifdef HAVE_POLL + res = poll(&ufds, 1, timeout_millis); +#else + res = select(socket + 1, &readset, 0, 0, &timeout); +#endif *time= NdbTick_CurrentMillisecond() - tick; - if(selectRes != 1){ + if(res != 1){ return -1; } + +#ifdef HAVE_POLL + if (ufds.revents & (POLLERR | POLLHUP)) { + return -1; + } +#endif + } while (len > 0); return -1; @@ -149,22 +201,42 @@ int write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time, const char buf[], int len){ + + int res; + +#ifdef HAVE_POLL + struct pollfd ufds; + ufds.fd = socket; + ufds.events= POLLOUT; + ufds.revents = 0; +#else fd_set writeset; FD_ZERO(&writeset); FD_SET(socket, &writeset); struct timeval timeout; timeout.tv_sec = (timeout_millis / 1000); timeout.tv_usec = (timeout_millis % 1000) * 1000; +#endif Uint64 tick= NdbTick_CurrentMillisecond(); - const int selectRes = select(socket + 1, 0, &writeset, 0, &timeout); +#ifdef HAVE_POLL + res = poll(&ufds, 1, timeout_millis); +#else + res = select(socket + 1, 0, &writeset, 0, &timeout); +#endif *time= NdbTick_CurrentMillisecond() - tick; - if(selectRes != 1){ + if(res != 1){ return -1; } +#ifdef HAVE_POLL + if (ufds.revents & (POLLERR | POLLHUP)) { + return -1; + } +#endif + const char * tmp = &buf[0]; while(len > 0){ const int w = send(socket, tmp, len, 0); @@ -176,17 +248,28 @@ if(len == 0) break; - + +#ifdef HAVE_POLL + struct pollfd ufds; + ufds.fd = socket; + ufds.events= POLLOUT; + ufds.revents = 0; +#else FD_ZERO(&writeset); FD_SET(socket, &writeset); timeout.tv_sec = ((timeout_millis - *time) / 1000); timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000; +#endif Uint64 tick= NdbTick_CurrentMillisecond(); - const int selectRes2 = select(socket + 1, 0, &writeset, 0, &timeout); +#ifdef HAVE_POLL + res = poll(&ufds, 1, timeout_millis); +#else + res = select(socket + 1, 0, &writeset, 0, &timeout); +#endif *time= NdbTick_CurrentMillisecond() - tick; - if(selectRes2 != 1){ + if(res != 1){ return -1; } }