2011年2月26日 星期六

用ACE練習寫個簡單的server class(下)

剩下最後的關鍵了,HA_Proactive_Service主要做傳送接收的處理.


tatic ACE_Recursive_Thread_Mutex _lock;

int HA_Proactive_Service::initiate_read_stream(void)//開始一筆新的接收
{
// ACE_DEBUG( (LM_DEBUG, ACE_TEXT("client id : %d\n"),stAppid.id));
ACE_NEW_NORETURN (recv_data_, ACE_Message_Block (sizeof(PacketHeader), SBN_QUE_NONE));
ACE_HANDLE handle = this->handle ();
this->recv_data_->copy ((const char *)&handle, sizeof(ACE_HANDLE));
this->reader_.read (*recv_data_, recv_data_->space ());
return 0;
}
HA_Proactive_Service::HA_Proactive_Service()
{
recv_data_ = NULL;
stAppid.bOnline = false;
}
HA_Proactive_Service::~HA_Proactive_Service ()
{
if (this->handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle ());
}

void HA_Proactive_Service::init( HA_Proactive_Acceptor *acceptor, int id )
{
  this->acceptor_ = acceptor;
  //this->acceptor_->ser[id].stAppid.id = id;
  stAppid.id = id;
  stAppid.num = 0;
}
void HA_Proactive_Service::SendData(char *buf)
{
#ifdef WIN32
size_t len = strlen(buf)+1;
#else
int len = strlen(buf)+1;
#endif

ACE_Message_Block *temp = new ACE_Message_Block(len);
temp->copy(buf);
temp->wr_ptr(len+1);
int nResult = writer_.write(*temp,len);

//呼叫了傳送後temp這個指標不可再用了

if( nResult != 0 )
ACE_DEBUG( (LM_DEBUG,"Write data failed:%s\n",buf) );
else
ACE_DEBUG( (LM_DEBUG,"send data(%d):%s\n",stAppid.id, buf) );//temp->rd_ptr()
}


void HA_Proactive_Service::handle_time_out(const ACE_Time_Value &tv, const void *p)
{
//ACE_DEBUG((LM_DEBUG,"time out: %d\n",(int*)p));
ACE_Proactor::instance()->cancel_timer(*this);
}

void HA_Proactive_Service::open (ACE_HANDLE h, ACE_Message_Block& message_block)//在這裡完成初始化的準備
{
this->handle(h);//新的socket句柄產生了

if (this->reader_.open (*this) != 0 || this->writer_.open (*this) != 0   )
{

this->acceptor_->free_handler( this );
return;
}

if (this->initiate_read_stream () == -1)
return;

   //取客戶端資料
ACE_INET_Addr addr;
ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(h);
size_t addr_size=1;
//ass.get_local_addrs(&addr,addr_size);//取本地端
ass.get_remote_addrs(&addr,addr_size);
stAppid.iPort = addr.get_port_number();
stAppid.sIp = addr.get_host_addr();
stAppid.bOnline = true;
stAppid.num = this->acceptor_->cNum;
++this->acceptor_->cNum;
//ACE_Guard <ACE_Recursive_Thread_Mutex> locker (_lock);

acceptor_->mpInfo.insert(std::make_pair(stAppid.id, stAppid));

ACE_Proactor::instance()->schedule_timer(*this, (void *)stAppid.id, ACE_Time_Value(1), ACE_Time_Value(1));

//******************************************************************************************
ACE_Message_Block *putdata;
CmdPacker pack;
pack.id = stAppid.id;
pack.num = stAppid.num;
pack.bsize = 0;
pack.cmd = SBN_QUE_CONNECT;
ACE_NEW_NORETURN (putdata, ACE_Message_Block(sizeof(CmdPacker), SBN_QUE_CONNECT));
putdata->copy ((const char *)&pack, sizeof(CmdPacker));
acceptor_->RQData.putq(putdata);
//*****************************************************************************************

//ACE_DEBUG( (LM_DEBUG, ACE_TEXT("connect: %d %s\n"), stAppid.iPort ,stAppid.sIp.c_str() ) );

}
void HA_Proactive_Service::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
//result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';//數到的字串流結尾補結束字元

if (!result.success () || result.bytes_transferred () == 0)//傳輸失敗或傳輸0個字節,客戶斷開連接
{
//ACE_DEBUG( (LM_DEBUG, ACE_TEXT("client close : %d %d\n"), result.handle (),this->handle() ) );
result.message_block ().release ();
this->acceptor_->free_handler( this );
}
else
{
//result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';//數到的字串流結尾補結束字元
int len=0;
 
if (this->recv_data_->length() < sizeof(PacketHeader))// 數據包長度信息還未接收完
{
this->reader_.read (*recv_data_, recv_data_->space ());
return;
}
PacketHeader * hdr = reinterpret_cast<PacketHeader *> (this->recv_data_->rd_ptr());
ACE_Message_Block * data_mb = this->recv_data_->cont();
//將收到的4個字元轉成數值
len = (hdr->length[0]-'0')*1000 + (hdr->length[1]-'0')*100 + (hdr->length[2]-'0')*10 + (hdr->length[3]-'0');

if (!data_mb)// 剛剛接收完長度信息
{
//長度不對要做錯誤處理
if( len > 1024 )
len = 1024;
if( len < 1 )
len = 1;

ACE_NEW (data_mb, ACE_Message_Block(len));
this->recv_data_->cont (data_mb);
}
if (data_mb->length () == len)// 數據已接收完
{
//int cmd = (data_mb->rd_ptr()[0]-'0')*1000 + (data_mb->rd_ptr ()[1]-'0')*100 + (data_mb->rd_ptr ()[2]-'0')*10 + (data_mb->rd_ptr ()[3]-'0');//將4個字元轉成命令
ACE_Message_Block *putdata;
//CmdPacker pack;
//if( cmd < CMD_NONE )
// cmd = CMD_NONE;
recvpack.id = stAppid.id;
recvpack.num = stAppid.num;
recvpack.bsize = len;
recvpack.cmd = SBN_QUE_DATA;
#ifdef WIN32
memcpy(recvpack.data,data_mb->rd_ptr(),len);//strcpy_s(recvpack.data,256,data_mb->rd_ptr());
#else
strcpy(recvpack.data,data_mb->rd_ptr());
#endif
ACE_NEW_NORETURN (putdata, ACE_Message_Block(sizeof(CmdPacker), SBN_QUE_DATA));
putdata->copy((const char *)&recvpack, sizeof(CmdPacker));
//ACE_DEBUG( (LM_DEBUG,"read_stream:%d %d %d %s\n", pack.id, pack.num, pack.bsize, pack.data) );

acceptor_->RQData.putq(putdata);//,(ACE_Time_Value *) &ACE_Time_Value::zero);
recv_data_->release();
this->initiate_read_stream();// 再繼續接收下一個數據包
return;
}

this->reader_.read (*data_mb, data_mb->space ());// 否則繼續接收該數據包
}
}

void HA_Proactive_Service::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
//ACE_DEBUG( (LM_DEBUG,"handle_write_stream:%s\n",result.message_block().rd_ptr ()) );
result.message_block ().release ();
}

而連線,傳送,接收的運用就由 HA_Proactive_Acceptor驅動


HA_Proactive_Acceptor::HA_Proactive_Acceptor() : ACE_Asynch_Acceptor<HA_Proactive_Service>()
{
cNum = 1;
ser = NULL;
}

HA_Proactive_Acceptor::~HA_Proactive_Acceptor(void)
{
Destroy();
}
void HA_Proactive_Acceptor::Destroy(void)
{
if( ser )
delete[] ser;
}
void HA_Proactive_Acceptor::free_handler( HA_Proactive_Service * service )
{
ACE_Guard <ACE_Recursive_Thread_Mutex> locker (_lock);
ACE_OS::closesocket(service->handle ());
std::map<int, CData>::iterator mpIter;
for (mpIter = mpInfo.begin(); mpIter != mpInfo.end(); ++mpIter)  
{
//if( service->stAppid.iPort != mpIter->second.iPort ) continue;
//if( service->stAppid.sIp != mpIter->second.sIp ) continue;
if( service->stAppid.id != mpIter->second.id ) continue;
mpInfo.erase(mpIter);
break;
}


//******************************************************************************************
ACE_Message_Block *putdata;
CmdPacker pack;
pack.id = service->stAppid.id;
pack.num = service->stAppid.num;
pack.bsize = 0;
pack.cmd = SBN_QUE_DISCONNECT;
ACE_NEW_NORETURN (putdata, ACE_Message_Block(sizeof(CmdPacker), SBN_QUE_DISCONNECT));
putdata->copy((const char *)&pack, sizeof(CmdPacker));
RQData.putq(putdata);
//*****************************************************************************************

service->stAppid.bOnline = false;
service->stAppid.num = 0;
this->handler_list_.push_back( service );

}

int HA_Proactive_Acceptor::validate_connection(const ACE_Asynch_Accept::Result& result,  const ACE_INET_Addr &remote, const ACE_INET_Addr& local)
{
return 0;
}

HA_Proactive_Service * HA_Proactive_Acceptor::make_handler (void)
{
if( this->handler_list_.empty() )
{
ACE_DEBUG( (LM_DEBUG,"Connect full\n") );
return 0;
}
HA_Proactive_Service *service = this->handler_list_.front();
this->handler_list_.pop_front();
return service; //得到處理事件的句柄後,前攝器會調用處理事件的open方法   HA_Proactive_Service::open
}

void HA_Proactive_Acceptor::init_handlers(int scount)
{
ServiceCount = scount;
ser = new HA_Proactive_Service[ServiceCount];
for( int i = 0; i < ServiceCount; ++i )
{
//HA_Proactive_Service * service;
//ACE_NEW( service, HA_Proactive_Service );
//service->init( this );
ser[i].init(this,i);
this->handler_list_.push_back( &ser[i] );//service );
}
}

其實主要的用意就只是把ACE再包成一個容易使用的類別,大家可以參考看看

沒有留言:

張貼留言