#include #include #include #include #include #include using namespace std; #define DATABASE "test" #define INTERVAL 1000 NdbDictionary::Dictionary *dict; vector tables; map evops; const char * status_t( NdbDictionary::Object::Status st) { switch (st) { case NdbDictionary::Object::New: return "New"; case NdbDictionary::Object::Changed: return "Changed"; case NdbDictionary::Object::Retrieved: return "Retrieved"; case NdbDictionary::Object::Invalid: return "Invalid"; case NdbDictionary::Object::Altered: return "Altered"; default: return "??"; } } const char * state_t( NdbDictionary::Object::State st) { switch( st) { case NdbDictionary::Object::StateUndefined: return "StateUndefined"; case NdbDictionary::Object::StateOffline: return "StateOffline"; case NdbDictionary::Object::StateBuilding: return "StateBuilding"; case NdbDictionary::Object::StateDropping: return "StateDropping"; case NdbDictionary::Object::StateOnline: return "StateOnline"; case NdbDictionary::Object::StateBackup: return "StateBackup"; case NdbDictionary::Object::StateBroken: return "StateBroken"; default: return "??"; } } const char * store_t( NdbDictionary::Object::Store st) { switch( st) { case NdbDictionary::Object::StoreUndefined: return "StoreUndefined"; // case NdbDictionary::Object::StoreTemporary: // return "StoreTemporary"; case NdbDictionary::Object::StorePermanent: return "StorePermanent"; default: return "??"; } } int showdict(NdbDictionary::Dictionary *dict) { NdbDictionary::Dictionary::List tl; vector tln; fprintf( stdout, "\n"); if ( dict->listObjects( tl, NdbDictionary::Object::UserTable) < 0 ) { fprintf( stderr, "listObjects():%s\n", dict->getNdbError().message); return -1; } for (int i = 0; i < tl.count; i++) { if ( strcmp( tl.elements[i].database,DATABASE) == 0) { tln.push_back( tl.elements[i].name); // fprintf( stdout, "Dict:%s\n", tl.elements[i].name); // fprintf( stdout, "\tState=%s\n", // state_t(tl.elements[i].state)); // fprintf( stdout, "\tStore=%s\n", // store_t(tl.elements[i].store)); } } for ( int i = 0; i < tln.size(); i++ ) { char sep = ' '; const NdbDictionary::Table *tab = \ dict->getTable( tln.at(i).c_str()); if ( tab == NULL) { fprintf( stderr, "getTable(%s):%s\n", tln.at(i).c_str(), dict->getNdbError().message); return -1; } fprintf( stdout,"%s( ", tln.at(i).c_str()); for ( int i = 0; i < tab->getNoOfColumns(); i++) { fprintf( stdout, " %c%s" ,sep ,tab->getColumn(i)->getName()); sep=','; } fprintf( stdout, ")\n"); fprintf( stdout, "\tStatus = %s\n" ,status_t( tab->getObjectStatus())); } fprintf( stdout, "\n"); return 0; } int initTables() { NdbDictionary::Dictionary::List tl; vector tln; if ( dict->listObjects( tl, NdbDictionary::Object::UserTable) < 0 ) { fprintf( stderr, "listObjects():%s\n", dict->getNdbError().message); return -1; } for (int i = 0; i < tl.count; i++) { if ( strcmp( tl.elements[i].database,DATABASE) == 0) { tln.push_back( tl.elements[i].name); fprintf( stderr, "Dictionary::Table:%s\n", tl.elements[i].name); } } for ( int i = 0; i < tln.size(); i++ ) { const NdbDictionary::Table *tab = \ dict->getTable( tln.at(i).c_str()); if ( tab == NULL) { fprintf( stderr, "getTable(%s):%s\n", tln.at(i).c_str(), dict->getNdbError().message); return -1; } tables.push_back( tab); } return 0; } int create_events(Ndb *ndb) { for ( int i = 0; i < tables.size(); i++ ) { string evn = (string) DATABASE "." + \ tables.at(i)->getName() + \ (string) ".TE_ALTER"; NdbDictionary::Event ev(evn.c_str(),*(tables.at(i))); ev.addTableEvent(NdbDictionary::Event::TE_ALTER); ev.mergeEvents( true); if ( dict->createEvent(ev) < 0 ) { fprintf( stderr, "createEvent(%s):%s\n" ,evn.c_str() ,dict->getNdbError().message); if ( dict->getNdbError().classification == NdbError::SchemaObjectExists ) { dict->dropEvent(evn.c_str()); if ( dict->createEvent( ev) < 0 ) { fprintf( stderr, "createEvent(%s):%s\n" ,evn.c_str() ,dict->getNdbError().message); return -1; } } } if (( evops[ evn ] = ndb->createEventOperation(evn.c_str())) == NULL ) { fprintf( stderr, "createEventOperation(%s):%s\n" ,evn.c_str() ,ndb->getNdbError().message); return -1; } evops[ evn ]->mergeEvents(true); if ( evops[ evn ]->execute() < 0) { fprintf( stderr, "op(%s)->execute():%s\n" ,evn.c_str() ,ndb->getNdbError().message); return -1; } } return 0; } int pollevents(Ndb *ndb) { int rc = 0; if ( ndb->pollEvents(INTERVAL) > 0 ) { NdbEventOperation *op; while ( op = ndb->nextEvent()) { switch ( op->getEventType()) { case NdbDictionary::Event::TE_ALTER: printf("TE_ALTER: "); rc++; break; default: printf("Ignored event\n"); break; } } } return rc; } int refresh( NdbDictionary::Dictionary *dict, bool force) { int flushed = 0; for ( int i = 0; i < tables.size(); i++) { const NdbDictionary::Table *tab = tables.at(i); if ( tab->getObjectStatus() == NdbDictionary::Object::Altered || force) { fprintf( stdout, "Flushing %s\n" ,tab->getName()); dict->removeCachedTable( tab->getName()); flushed++; } } return flushed; } int main( int argc, char *argv[]) { Ndb_cluster_connection *cc; Ndb *ndb; cc = new Ndb_cluster_connection( "10.0.1.1"); if ( cc == NULL ) { fprintf( stderr, "Failed to create cluster connection\n"); return 1; } cc->connect( 4,5,1 ); cc->wait_until_ready( 30, 0); ndb = new Ndb( cc, DATABASE); if ( ndb == NULL) { fprintf( stderr, "Failed to create Ndb object\n"); return 1; } ndb->init(); if ((dict = ndb->getDictionary()) == NULL ) { fprintf( stderr, "getDictionary():%s\n", ndb->getNdbError().message); return 1; } printf( "Loading ... \n"); showdict(dict); if ( initTables() < 0 ) return 1; if (create_events(ndb) < 0) return 1; printf( "Listining for changes ...\n"); while (1) { if (pollevents(ndb)) { showdict(dict); printf( "Attempt to refresh .... \n"); if ( refresh(dict, false) == 0) { printf( "Force refresh ...\n"); refresh( dict, true); } showdict(dict); } } ndb_end(2); delete ndb; delete cc; return 0; }