접기
#include <ace/OS.h>
#include <ace/Log_Msg.h>
#include <ace/Message_Block.h>
#include <ace/INET_Addr.h>
#include <ace/Proactor.h>
#include <ace/Asynch_IO.h>
#include <ace/Asynch_Acceptor.h>
class Stream_Handler : public ACE_Service_Handler {
private :
ACE_Asynch_Read_Stream reader_;
ACE_Asynch_Write_Stream writer_;
ACE_INET_Addr remote_address_;
public :
~Stream_Handler() {
if( this->handle() != ACE_INVALID_HANDLE )
ACE_OS::closesocket(this->handle());
}
//override
virtual void open(ACE_HANDLE new_handle, ACE_Message_Block &message) {
ACE_TRACE("Stream_Handler::open");
ACE_ASSERT(new_handle != ACE_INVALID_HANDLE);
this->handle(new_handle);
if( reader_.open(*this) != 0 || writer_.open(*this) != 0 ) {
ACE_ERROR((LM_ERROR, "[ERROR%T](%N:%l) ### %p\n", "Stream_Handler::open"));
delete this;
return;
}
ACE_Message_Block *mb;
ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));
if( reader_.read(*mb, mb->space()) != 0 ) {
ACE_ERROR((LM_ERROR, "[ERROR%T](%N:%l) ### %p\n", "Stream_Handler::open"));
delete this;
return;
}
ACE_DEBUG((LM_INFO, "[DEBUG%T](%N:%l) ### New client accepted: %s:%u\n",
remote_address_.get_host_addr(), remote_address_.get_port_number()));
}
//override
virtual void addresses(const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address) {
ACE_TRACE("Stream_Handler::addresses");
ACE_UNUSED_ARG(local_address);
remote_address_ = remote_address;
}
//override
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result) {
ACE_TRACE("Stream_Handler::handle_read_stream");
ACE_Message_Block &mb = result.message_block();
if( !result.success() || result.bytes_transferred() == 0 ) {
ACE_DEBUG((LM_INFO, "[DEBUG%T](%N:%l) ### Connection close %s:%u\n",
remote_address_.get_host_addr(), remote_address_.get_port_number()));
mb.release();
delete this;
} else {
if ( writer_.write (mb, mb.length ()) != 0 ) {
ACE_ERROR((LM_ERROR, "[ERROR%T](%N:%l) ### %p\n",
"Stream_Handler::handle_read_stream"));
mb.release();
delete this;
} else {
ACE_Message_Block *new_mb;
ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));
reader_.read(*new_mb, new_mb->space());
}
}
}
//override
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result) {
ACE_TRACE("Stream_Handler::handle_write_stream");
result.message_block().release();
}
};
int ACE_TMAIN(int agrc, ACE_TCHAR *argv[])
{
ACE_Asynch_Acceptor<Stream_Handler> acceptor;
ACE_INET_Addr listen;
listen.set(9088);
acceptor.open(listen,
0, // byte to read
1, // call pass_address
ACE_DEFAULT_ASYNCH_BACKLOG,
1, // reuse_addr
0, // default proactor
0, // validate new connection
1, // reissue accept
-1);// number of initial accepts
ACE_Proactor::instance()->proactor_run_event_loop();
ACE_RETURN(0);
}
접기