37 #include <boost/thread/thread.hpp>
40 #include <condition_variable>
62 const char *DEFAULT_HOST =
"localhost";
63 const int DEFAULT_PORT = 11255;
64 const int DEFAULT_TIMEOUT = 10000;
65 const char *usage[] = {
66 "usage: sampleClient [OPTIONS] <input-file>",
69 " --host=<name> Specifies the host to connect to (default = localhost)",
70 " --port=<n> Specifies the port to connect to (default = 11255)",
71 " --recv-addr=<addr> Let the server connect to us by listening for",
72 " connection request on <addr> (host:port). The address",
73 " that the server is connecting from should be the same",
74 " as in --host and --port or the defaults.",
76 " --timeout=<t> Connection timeout in milliseconds (default=10000)",
77 " --verbose Generate verbose output",
78 " --udp Operate in UDP mode instead of TCP",
80 "This is a sample program to test the AsyncComm library. It establishes",
81 "a connection with the sampleServer and sends each line of the input file",
82 "to the server. Each reply from the server is echoed to stdout.",
85 bool g_verbose =
false;
102 virtual void handle(
EventPtr &event_ptr) = 0;
104 virtual bool get_response(
EventPtr &event_ptr) = 0;
129 std::lock_guard<std::mutex> lock(m_mutex);
132 HT_INFOF(
"Connection Established - %s", event_ptr->to_str().c_str());
137 if (event_ptr->error != 0) {
151 m_queue.push(event_ptr);
157 std::unique_lock<std::mutex> lock(m_mutex);
165 std::unique_lock<std::mutex> lock(m_mutex);
166 while (m_queue.empty()) {
167 if (m_connected ==
false)
171 event_ptr = m_queue.front();
189 m_dispatch_handler_ptr = dhp;
192 dhp = m_dispatch_handler_ptr;
213 std::lock_guard<std::mutex> lock(m_mutex);
215 m_queue.push(event_ptr);
219 HT_INFOF(
"%s", event_ptr->to_str().c_str());
225 std::unique_lock<std::mutex> lock(m_mutex);
226 m_cond.wait(lock, [
this](){
return !m_queue.empty(); });
227 event_ptr = m_queue.front();
240 int main(
int argc,
char **argv) {
243 const char *host = DEFAULT_HOST;
244 struct sockaddr_in addr;
245 uint16_t port = DEFAULT_PORT;
246 time_t timeout = DEFAULT_TIMEOUT;
247 const char *in_file = 0;
252 bool udp_mode =
false;
255 int max_outstanding = 50;
258 sockaddr_in inet_addr;
262 memset(&inet_addr, 0,
sizeof(inet_addr));
269 for (
int i=1; i<argc; i++) {
270 if (!strncmp(argv[i],
"--host=", 7))
272 else if (!strncmp(argv[i],
"--port=", 7)) {
273 rval = atoi(&argv[i][7]);
274 if (rval <= 1024 || rval > 65535) {
275 cerr <<
"Invalid port. Must be in the range of 1024-65535." << endl;
280 else if (!strncmp(argv[i],
"--timeout=", 10))
281 timeout = (time_t)atoi(&argv[i][10]);
282 else if (!strcmp(argv[i],
"--udp"))
284 else if (!strncmp(argv[i],
"--recv-addr=", 12)) {
288 else if (!strcmp(argv[i],
"--verbose")) {
291 else if (in_file == 0)
305 ifstream myfile(in_file);
307 if (!myfile.is_open()) {
308 HT_ERRORF(
"Unable to open file '%s' : %s", in_file, strerror(errno));
313 assert(inet_addr.sin_port == 0);
314 dhp = make_shared<ResponseHandlerUDP>();
322 dhp = make_shared<ResponseHandlerTCP>();
325 if (inet_addr.sin_port == 0) {
333 comm->
listen(inet_addr, handler_factory, dhp);
341 const uint8_t *decode_ptr;
342 size_t decode_remain;
344 while (!myfile.eof()) {
345 getline (myfile,line);
346 if (line.length() > 0) {
348 cbp->append_str16(line);
358 while ((error = comm->
send_request(addr, timeout, cbp, resp_handler))
365 this_thread::sleep_for(chrono::milliseconds(1000));
369 HT_ERRORF(
"CommEngine::send_message returned '%s'",
377 if (outstanding > max_outstanding) {
381 decode_ptr = event_ptr->payload;
382 decode_remain = event_ptr->payload_len;
385 cout <<
"ECHO: " << str << endl;
387 cout <<
"[NULL]" << endl;
390 cout <<
"Error: "<< e << endl;
397 while (outstanding > 0 && resp_handler->
get_response(event_ptr)) {
399 decode_ptr = event_ptr->payload;
400 decode_remain = event_ptr->payload_len;
403 cout <<
"ECHO: " << str << endl;
405 cout <<
"[NULL]" << endl;
408 cout <<
"Error: "<< e << endl;
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Retrieves system information (hardware, installation directory, etc)
virtual void get_instance(DispatchHandlerPtr &dhp)
Creates a connection dispatch handler.
DispatchHandlerPtr m_dispatch_handler_ptr
std::queue< EventPtr > m_queue
static void initialize(uint16_t reactor_count)
Initializes I/O reactors.
int connect(const CommAddress &addr, const DispatchHandlerPtr &default_handler)
Establishes a TCP connection and attaches a default dispatch handler.
HandlerFactory(DispatchHandlerPtr &dhp)
Helper class for printing usage banners on the command line.
Abstract base class for application dispatch handlers registered with AsyncComm.
void init(int argc, char *argv[], const Desc *desc=NULL)
Initialize with default policy.
Abstract class for creating default application dispatch handlers.
Po::typed_value< String > * str(String *v=0)
virtual bool get_response(EventPtr &event_ptr)=0
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
int main(int argc, char **argv)
main function
int send_datagram(const CommAddress &addr, const CommAddress &send_addr, CommBufPtr &cbuf)
Sends a datagram to a remote address.
Connection established event.
void set_inet(sockaddr_in addr)
Sets address type to CommAddress::INET and inet value to addr.
Declarations for DispatchHandler.
static void destroy()
This method shuts down the reactors.
std::shared_ptr< ConnectionHandlerFactory > ConnectionHandlerFactoryPtr
Smart pointer to ConnectionHandlerFactory.
size_t encoded_length_str16(const char *str)
Computes the encoded length of a string16 encoding.
const char * get_text(int error)
Returns a descriptive error message.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
virtual bool get_response(EventPtr &event_ptr)
bool wait_for_connection()
Logging routines and macros.
Compatibility Macros for C/C++.
Initialization helper for applications.
Functions to serialize/deserialize primitives to/from a memory buffer.
const char * decode_str16(const uint8_t **bufp, size_t *remainp)
Decodes a c-style string from the given buffer.
Connection disconnected event.
(somewhat) Abstract base class for response handlers; Defines the message queue and the mutex and con...
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Entry point to AsyncComm service.
virtual void handle(EventPtr &event_ptr)
Callback method.
#define HT_INFOF(msg,...)
Internet address wrapper classes and utility functions.
virtual void handle(EventPtr &event_ptr)
Callback method.
Request/response message event.
void create_datagram_receive_socket(CommAddress &addr, int tos, const DispatchHandlerPtr &handler)
Creates a socket for receiving datagrams and attaches handler as the default dispatch handler...
virtual bool get_response(EventPtr &event_ptr)
This is a generic exception class for Hypertable.
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Message buffer for holding data to be transmitted over a network.
virtual ~ResponseHandler()
#define HT_ERRORF(msg,...)
void listen(const CommAddress &addr, ConnectionHandlerFactoryPtr &chf)
Creates listen (accept) socket on addr.
This is the dispatch handler that gets installed as the default handler for the TCP connection...
static void dump_and_exit(const char **usage, int rcode=1)
Same as dump, but performs _exit(rcode) afterwards.
This is the dispatch handler that gets installed as the default handler for UDP mode.
Error codes, Exception handling, error logging.
int send_request(const CommAddress &addr, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler)
Sends a request message over a connection, expecting a response.
Address abstraction to hold either proxy name or IPv4:port address.
std::condition_variable m_cond