时间:2019-05-10 标签:c++threadeddbclassmemorymixup

Posted

技术标签:

【中文标题】时间:2019-05-10 标签:c++threadeddbclassmemorymixup【英文标题】:c++ threaded db class memory mixup 【发布时间】:2014-09-02 09:37:22 【问题描述】:

我一直在从事一个项目,该项目接受来自不同主机的数据并将其注入数据库。该项目通过 boost::thread 进行线程化,并使用 mysql++ 执行实际的数据库注入。

最近,我的线程抓取不正确的信息时遇到了严重问题。 例如: 线程 A 将从服务器 foo.b.bar.foobar.com 接收数据,并将正确确定它的主机名,它的主机短名称为 foo.bar,线程 B 将从服务器 fi.b.ban.foobar.com 接收数据并正确确定它的主机名,并且它的主机短名称是 fi.ban。这一切都会发生,没问题。

然后它被移交给数据库模块,接下来我看到的是“INSERT INTO foo_servers VALUES('foo.b.bar.foobar.com', 'fi.ban',...”作为字符串传递给mysql。

所有服务器每五分钟发送一次数据,因此每五分钟创建大约 1k 左右的线程,而这总是我开始看到问题发生的时候。

这是数据库类头:

// db.h
// MySQL database connections and DB search/parsing.
#ifndef __DBASE_H
#define __DBASE_H
// database conection details go here
//#include"serial.h"
class dbase
public:
    dbase();
    void runQuery();
    bool runDriveQuery( const std::vector<std::string> &qvec );
    bool getQueryData( const std::string &Frstate, const std::string &Fstype, const std::string &Fsite, const std::string &Fhost, const std::string &Fatype, const std::string &Fsstate, const std::string &Fanum, const std::string &Fshanme, const std::string &Fmac, const int &vfill, const int &dnum );
private:
    void updateServer();
    void inServer();
    bool checkExists( const std::string &data, const std::string &table, const std::string &field, const std::string &f2, const std::string &d2 );
    void storeQueryData( const std::string &fsval, const int &posid ); // written
    std::vector<std::string> inval;
    std::string getLocFromSite( const std::string &site );
    void insert( const std::string &qstring );
    void update( const std::string &qquery );
;
extern dbase thread;
#endif

这里是 dbase.cpp:

// db.h
// MySQL database connections and DB search/parsing.
// the final step of the data flow - everything ends up in an indexable mysql database in this module.
#include<iostream>
#include<sstream>
#include<string>
#include<mysql++/mysql++.h>
#include<vector>
#include<iomanip>
#include"dbase.h"
#include"log.h"
bool dbase::runDriveQuery( const std::vector<std::string> &qvec ) 
bool ex = checkExists( qvec[0], "gd_drives", "host_shortname", "drive_label", qvec[1]);
std::stringstream qqquery;
if( ex ) 
qqquery << "UPDATE `" << dbname << "`.`gd_drives` SET spool_number=" << mysqlpp::quote_only << qvec[2];
qqquery << ", drive_status=" << mysqlpp::quote_only << qvec[3] <<", spool_status=" <<mysqlpp::quote_only;
qqquery << ", pending_sectors=" << mysqlpp::quote_only << qvec[6];
qqquery << ", reallocated_sectors=" << mysqlpp::quote_only << qvec[7];
qqquery << ";";
std::string fquery = qqquery.str();
qqquery.str("");
GDLogger.log( fquery, 1 );
update( fquery );
return true;

else 
qqquery << "INSERT INTO `" << dbname << "`.`gd_drives` VALUES ( " << mysqlpp::quote_only << "Hard Drive";
qqquery << ", " << "'" << qvec[0] << "/" << qvec[1] << "', " << mysqlpp::quote_only << inval[5];
qqquery << ", " << mysqlpp::quote_only << qvec[0] << ", " << mysqlpp::quote_only << qvec[1];
qqquery << ", " << mysqlpp::quote_only << "0";
qqquery << ", " << mysqlpp::quote_only << qvec[2];
qqquery << ", " << mysqlpp::quote_only << qvec[3];
qqquery << ", " << mysqlpp::quote_only << qvec[5] << ", " << mysqlpp::quote_only << qvec[4];
qqquery << ", " << mysqlpp::quote_only << qvec[6] << ", " << mysqlpp::quote_only << qvec[7];
qqquery << ", " << mysqlpp::quote_only << "0" << ", '1'";
qqquery << ");";
std::string ffquery = qqquery.str();
qqquery.str("");
GDLogger.log( ffquery, 1 );
insert( ffquery );
return true;


void dbase::runQuery() 
    GDLogger.log( "runQuery() called.", 0 );
    bool ex = checkExists( inval[1], "gd_servers", "hostname", "0", "0" );
    if ( ex == true ) 
        updateServer();
    
    else 
        inServer();
    
    GDLogger.log( "runQuery() completed.", 0 );

bool dbase::checkExists( const std::string &data, const std::string &table, const std::string &field, const std::string &f2, const std::string &d2 )  // check if the received data matches anything already in the database
    GDLogger.log( "checkExists() called.", 0 );
    GDLogger.log ( "Checking if host already in Database...", 1 );
    mysqlpp::Connection con3( false );
    if ( con3.connect( dbname, dbhost, dbuser, dbpass ) ) 
        std::stringstream queryss;
        queryss<< "SELECT " << field << " FROM `" << dbname << "`.`" << table << "` WHERE " << field << " = " << mysqlpp::quote_only << data;
        if( f2 == "0" ) 
            queryss << ";";
        
        else 
            queryss << "AND " << f2 << "=" << mysqlpp::quote_only << d2 << ";";
        
        std::string qstr = queryss.str();
        GDLogger.log( qstr, 1 );
        mysqlpp::Query query = con3.query( qstr );
        mysqlpp::StoreQueryResult res = query.store();
        std::stringstream res1;
        for ( std::vector<mysqlpp::Row>::iterator it= res.begin(); it != res.end(); ++it ) 
            mysqlpp::Row row = *it;
            res1 << row[0];
        
        std::string result = res1.str();
        GDLogger.log( result, 1 );
        if ( result == inval[1] ) 
            GDLogger.log( "Host exists in Database.", 1 );
            return true;
        
        else 
            GDLogger.log( "Host does not exist in Database.", 1 );
            return false;
        
    
    GDLogger.log( "checkExists() completed.", 0 );

void dbase::updateServer() 
    std::stringstream uss;
    uss<< " UPDATE `" << dbname << "`.`gd_servers` SET asset_type=" << mysqlpp::quote_only << inval[0] << ", recover_status=" << mysqlpp::quote_only << inval[10] << ", gc_status=" << mysqlpp::quote_only << inval[11] << ", var_fill=" << mysqlpp::quote_only << 0 << ", open_ticket=" << mysqlpp::quote_only << "0" << ", last_report = NOW() WHERE hostname = " << mysqlpp::quote_only << inval[1] << ";";
    std::string upstr = uss.str();
    GDLogger.log( upstr, 1 );
    update( upstr );

void dbase::update( const std::string &qquery )  // update the selected entry in the database
    GDLogger.log( "update() called.", 0 );
    GDLogger.log( "Updating database entry.", 1 );
    mysqlpp::Connection con2( false );
    if ( con2.connect( dbname, dbhost, dbuser, dbpass ) ) 
        mysqlpp::Query query = con2.query( qquery );
        mysqlpp::SimpleResult res = query.execute();
        if ( res ) 
            GDLogger.log( "Record updated sucessfully.", 1 );
        
        else 
            GDLogger.log( "Error updating record.", 2 );
        
    
    else 
        GDLogger.log( "Error establishing connection to database.", 3 );
    
    GDLogger.log( "update() completed.", 0 );

std::string dbase::getLocFromSite( const std::string &site )  // use the site that is transmitted to determine what the region is
    GDLogger.log( "getLocFromSite() called.", 0 );
    mysqlpp::Connection conn( false );
    if ( conn.connect( dbname, dbhost, dbuser, dbpass ) ) 
        std::stringstream lfs;
        lfs<< "SELECT srg_code FROM `" << dbname << "`.`gd_sites` WHERE site_name = " << mysqlpp::quote_only << site << ";";
        std::string loc = lfs.str();
        GDLogger.log( loc, 1 );
        mysqlpp::Query query = conn.query( loc );
        if (mysqlpp::StoreQueryResult res = query.store() ) 
            std::stringstream res1;
            for ( std::vector<mysqlpp::Row>::iterator it= res.begin(); it != res.end(); ++it ) 
                mysqlpp::Row row = *it;
                res1 << row[0] << std::endl;
            
            std::string result = res1.str();
            GDLogger.log( result, 1 );
            return result;
        
        if ( conn.errnum() ) 
            GDLogger.log( "Error Received in fetching a row.", 2 );
            return "ERR";
        
        else 
            GDLogger.log( "Unknown Error Occurred or Unhandled Exception.", 3 );
            return "ERR";
        
    
    else 
        GDLogger.log( "Unable to connect to database!", 4 );
        return "ERR";
    
    GDLogger.log( "getLocFromSite() completed.", 0 );

void dbase::inServer() 
    std::stringstream iss;
    iss<< "INSERT INTO `" << dbname << "`.`gd_servers` VALUES( ";
    for( int i = 0; i < 13; i++ ) 
        iss << mysqlpp::quote_only << inval[i] << ", ";
    
    iss << mysqlpp::quote_only << "0" << ", NOW() );";
    std::string is = iss.str();
    GDLogger.log( is, 1 );
    insert( is );

void dbase::insert( const std::string &qstring )  // insert server info into the mysql database (only if not already present)
    GDLogger.log( "insert() called.", 0 );
    mysqlpp::Connection conn( false );
    if ( conn.connect( dbname, dbhost, dbuser, dbpass ) ) 
        mysqlpp::Query query = conn.query( qstring );
        mysqlpp::SimpleResult res = query.execute();
        if( res ) 
            GDLogger.log( "Data inserted successfully.", 1 );
        
        else 
            GDLogger.log( "Failed to add data to database.", 2 );
        
    
    else 
        GDLogger.log( "Failed to connect to database. Please ensure mysql is running, and that your credentials are correct.", 3 );
    
    GDLogger.log("insert() completed.", 0 );

bool dbase::getQueryData( const std::string &Frstate, const std::string &Fstype, const std::string &Fsite, const std::string &Fhost, const std::string &Fatype, const std::string &Fsstate, const std::string &Fanum, const std::string &Fshname, const std::string &Fmac, const int &vfill, const int &dnum ) 
    GDLogger.log( "getQueryData() called.", 0 ); // takes all the deserialized data from the serial class and sorts them into containers in preparation of db injection
    std::string Fshorthost = Fshname; // host shortname
    std::string Floc = getLocFromSite( Fsite ); // location, queried from database based on string
    std::string FOrt = "false"; // true/false value for whether there is an open RT for the server. default is false.
    std::stringstream ssfill;
    ssfill << vfill; // /ivar fill percentage
    std::string Fvfill = ssfill.str();
    std::stringstream ssdnum;
    ssdnum << dnum; // number of hard drives
    std::string Fdnum = ssdnum.str();
    std::string uid= ""; // queried from database, default is 0;
    if ( Fstype == "r" || Fstype == "h" || Fstype == "t" ) 
        std::stringstream ss;
        ss << Fstype << mysqlpp::quote << "_server";
        std::string sstype = ss.str();
        storeQueryData( sstype, 4 );
    
    else 
        storeQueryData( Fstype, 4 );
    
    int num = 5; // this is temporary until I write it's hook in the serial class.
    std::stringstream anum1;
    anum1 << num;
    GDLogger.log( "Retrieving values for storage in database.", 1 );
    std::string tanum = anum1.str(); // storing values in a vector for easier access
    storeQueryData( Fatype, 0 );
    storeQueryData( Fhost, 1 );
    storeQueryData( Fshorthost, 2 );
    storeQueryData( uid, 3 );
    storeQueryData( Fsite, 5 );
    storeQueryData( Floc, 6 );
    storeQueryData( Fmac, 7 );
    storeQueryData( Fdnum, 8 );
    storeQueryData( tanum , 9 );
    storeQueryData( Frstate, 10 );
    storeQueryData( Fsstate, 11 );
    storeQueryData( Fvfill, 12 );
    storeQueryData( FOrt, 13 );
    GDLogger.log( "getQueryData() completed.", 0 );

void dbase::storeQueryData( const std::string &fsval, const int &posid ) 
    GDLogger.log( "storeQueryData() called.", 0 );
    inval[posid] = fsval;

dbase::dbase()  // overrides default constructor for dbase class
    GDLogger.log( "Initializing Database Module.", 1 );
    GDLogger.log( "dbase() called.", 0 );
    // initialize the size of the query data storage vector
    inval.resize(16);
    // getQueryData(); // retrieve data to be entered into the database

dbase thread;

这里是创建线程的地方:

void server( boost::asio::io_service& io_service, short port ) 
    tcp::acceptor a( io_service, tcp::endpoint( tcp::v4(), port ) ); // starts
    for( ; ; ) 
        socket_ptr sock( new tcp::socket( io_service ) ); // creates a new socket for every connection and accepts
        a.accept( *sock );
        boost::thread t( boost::bind( session, sock ) ); // starts session in a new thread
     

线程创建和传递给数据库模块之间发生的事情都通过了我在所述模块中内置的验证检查,但是一旦它进入数据库模块,就好像它正在从其他线程获取数据......或其他东西。几周以来我一直在梳理这段代码,但我根本不知道发生了什么。如果socket_ptr是线程之间唯一的共享内存,它是一个指向正在处理的原始帧缓冲区的指针,它只被引用一次然后被丢弃......

编辑:如果我尝试在它的线程内实例化 dbase,它拒绝链接,说未定义的引用。这里是实例化的地方:

bool handle_data( std::vector<int>& buffer_data )  // data handling happens hhere
    GDLogger.log( "handle_data() called.", 0 );
    try 
        dbase thread; // dbase instantiates here
        boost::system::error_code ec; // declare an error object ot collect exceptions
        ser.readBits( buffer_data ); //read from buffer into saod object
        std::vector<int> dbuff;
        dbuff.resize(616);
        if( buffer_data.size() >=616 ) 
            //ser.readDVec( dbuff );
            for( int i = 0; i < 576; i++ ) 
                dbuff[i] = ser.Sdbits[i];
             
         
         int ddnum = ser.Sdnum;
         std::string dshname = ser.Sshname;
         ddrive dd( ddnum, dshname, dbuff ); // drive module instantiates here
         bool dsvalidated = ser.deSerialize(); // deserialize data
         if(!dsvalidated)  // self explanatory
             GDLogger.log( "Deserialized data validation returned as false.", 3 );
        
        else  // I hate this function. Reads from the serial object to the DB object.
            thread.getQueryData( ser.Srstate, ser.Sstype, ser.Ssite, ser.Shname, ser.Satype, ser.Sservices, ser.Sanum, ser.Sshname, ser.Smac, ser.Svarfill, ser.Sdnum ); // handing all the data off to the database module.
            thread.runQuery(); // runs all the queries necessary to update or insert
            dd.readDriveData( dbuff );
        
        if( !ec ) 
            GDLogger.log( "handle_data() call completed without error.", 0 ); // if ec is still empty, we're done!
            return true;
         
         else 
              GDLogger.log( "handle_data() call unsuccsessful, errors occurred.", 3 ); // things broke~somewhere~, but not fatally
              return true;
          
    
    catch( std::exception& e )  // catch allocation exceptions.
        std::stringstream errstr;
        errstr << "Exception in thread: " << e.what();
        std::string logentry = errstr.str();
        errstr.str("");
        GDLogger.log( logentry, 3 );
    

内部调用:

void session( socket_ptr sock )  // the actual TCP session starts here
    bool inproc = false; // this will be set to true once input has been sucessfully processed.
    try 
        GDLogger.log( "Connection Established.", 1 );
        int icont = 0;
        for( ; ;) 
            char data[max_length];
            boost::system::error_code error;
            size_t length = sock->read_some( boost::asio::buffer( data ), error ); // reads the length of the buffer, then reads the buffer into the char* data
            char cdata[max_length];
            for( int i = 0; i < max_length; i++ ) 
                cdata[i] = data[i];
            
            std::vector<int> idata; // our hero, the vector going into handle_data
            idata.resize( max_length );
            idata = sanData( cdata ); // sanitize
            bool dvalid = valSerialData( idata ); // validate
            if( !dvalid ) 
                GDLogger.log("Data rejected by server.", 3 );
            
            if( dvalid ) 
                GDLogger.log("Valid Datastream accepted. Processing.", 1 );
                inproc = handle_data( idata );
            
            if( inproc == true && error == boost::asio::error::eof )  // if EOF reached and
                GDLogger.log( "Connection Terminated, data processed.", 1 ); // if handle_data success, close connection
                break;
            
            else if( !inproc && error == boost::asio::error::eof && icont > 0) 
                GDLogger.log( "Connection Terminated before data was successfully processed.", 2 );
                break;
            
            else if( error ) 
                throw boost::system::system_error( error ); //some other error
            
            icont++; // to ensure that handle_data only gets called ONCE
        
    
    catch( std::exception& e ) 
        std::stringstream errstr;
        errstr << "Exception in thread: " << e.what() << "\n";
        std::string logentry = errstr.str(); // catch all the lovely boost errors
        errstr.str("");
        GDLogger.log( logentry, 3 );
    
    GDLogger.log( "Thread Exited.", 0 );

它抱怨的地方是另一个模块,驱动模块,它读取序列化的驱动数据:

bool ddrive::readDriveData( const vector<int> &data1 ) 
    GDLogger.log( "readDriveData() called.", 0 );
    int SequenceNumber = 0;
    vector<bool> validated;
    vector<bool> dbadded;
    vector<bool> rval;
    GDLogger.log( "Resizing vectors.", 0 );
    rval.resize(Ddnum);
    dbadded.resize( Ddnum );
    validated.resize( Ddnum );
    for( int i = 0; i < Ddnum; i++ ) 
        GDLogger.log( "Setting rval to false...", 0 );
        rval[i] = false;
    
    for( int i = 0; i < Ddnum; i++ ) 
        for( int j = 0; j < vdlen; j++ ) 
            Ddcode[j] = data1[SequenceNumber];
            std::stringstream ff;
            ff << "SN: "<< SequenceNumber << " " << Ddcode[j];
            std::string mmm = ff.str();
            ff.str("");
            GDLogger.log( mmm, 0 );
            SequenceNumber++;
        
        std::stringstream lstr;
        lstr << "Sequence Number: " << SequenceNumber;
        std::string logm = lstr.str();
        lstr.str("");
        GDLogger.log( logm, 0 );
        Ddpath = mapp.getKeyFromMap( Ddcode[0], Ddcode[1], 3 );
        Dspnum = readSpoolNumber( Ddcode[2], Ddcode[3] );
        Dhstatus = readDHState( Ddcode[4] );
        Dspfill = read3DigitVals( Ddcode[5], Ddcode[6], Ddcode[7] );
        Dspstate = readSpoolState( Ddcode[8] );
        Dpsec = read3DigitVals( Ddcode[9], Ddcode[10], Ddcode[11] );
        Drsec = read3DigitVals( Ddcode[12], Ddcode[13], Ddcode[14] );
        bool DDValid = true;
        if( DDValid ) 
            validated[i] = true;
            GDLogger.log( "Converting data to strings for database insertion.", 1 );
            std::stringstream qq;
            qq << Ddcode[2] << Ddcode[3];
            std::string qspnum = qq.str();
            GDLogger.log( qspnum, 0 );
            qq.str("");
            qq << Dspfill;
            std::string qspfill = qq.str();
            GDLogger.log( qspfill, 0 );
            qq.str("");
            qq << Dpsec;
            std::string qpsec = qq.str();
            GDLogger.log( qpsec, 0 );
            qq.str("");
            qq << Drsec;
            std::string qrsec = qq.str();
            GDLogger.log( qrsec, 0);
            qq.str("");
            GDLogger.log( Dhsname, 0 );
            qdata[0] = Dhsname;
            qdata[1] = Ddpath;
            qdata[2] = qspnum;
            qdata[3] = Dhstatus;
            qdata[4] = qspfill;
            qdata[5] = Dspstate;
            qdata[6] = qpsec;
            qdata[7] = qrsec;
            GDLogger.log( Dhsname, 0);
            GDLogger.log( Ddpath, 0 );
            dbadded[i] = thread.runDriveQuery( qdata ); // this is where it complains
        
        else 
            GDLogger.log( "Drive datastream rejected by server.", 3 );
            validated[i] = false;
        
    
    bool valid = true;
    bool dbadd = true;
    GDLogger.log( "Validating that operation performed successfully.", 1 );
    for( int i = 0; i < Ddnum; i++ ) 
        if( !validated[i] ) 
            valid = false;
        
        if( !dbadded[i] ) 
            dbadd = false;
        
    
    if( dbadd && valid ) 
        return true;
    

更新: 我意识到安东在我睡觉时所说的话,并按照他说的做了,而是更改了函数 drive::readDriveData 以便它将 dbase 的实例作为参数,如下所示:

bool ddrive::readDriveData( const vector<int> &data1, dbase db );

然后编译。我看看现在会发生什么。

【问题讨论】:

dbase thread; - 如果这一行只是写在 cpp 文件中,那么你有一个全局 dbase 对象 正要问类似的。所有线程是否共享dbase 对象的同一个实例? (命名为thread)?该类的实例看起来什么都不是线程安全的。 【参考方案1】:

您有一个全局 dbase 对象:

extern dbase thread; // in h file
dbase thread; // in cpp file

它在线程之间共享,所以它必须是线程安全的,但事实并非如此。

编辑:如果您在线程函数中创建 dbase 对象(这是正确的),请从 h 文件中删除 extern dbase thread;,因为您不再拥有全局对象,否则您将收到链接器错误。

【讨论】:

当我在线程中声明 dbase 对象时,我的编译器拒绝链接,说“未定义对“线程”的引用,即使它是在 main.o 中定义的 :( @Oblivious12 在你的问题中显示你是如何做到的。 我已更新以显示实例化的位置和方式。 @Oblivious12 从 h 文件中删除 extern dbase thread - 检查更新的答案 @Oblivious12 在您从 h 文件中删除 extern dbase thread; 和从 cpp 文件中删除 dbase thread; 之后,您在哪里声明/实例化了 dbase 对象?

以上是关于时间:2019-05-10 标签:c++threadeddbclassmemorymixup的主要内容,如果未能解决你的问题,请参考以下文章

时间:2019-05-10 标签:c#socketconnectionwithmultithreadingtelnet

时间:2019-05-10 标签:c#paypalrestapitransactionsearch

时间:2019-05-10 标签:c#socketThread

时间:2019-05-10 标签:c++threadeddbclassmemorymixup

时间:2019-05-10 标签:c#applicationautostartwindows7

时间:2019-05-10 标签:c++freadjibberish