/* * Copyright (C) 2011-2017 Redis Labs Ltd. * * This file is part of memtier_benchmark. * * memtier_benchmark is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, version 2. * * memtier_benchmark is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with memtier_benchmark. If not, see . */ #ifndef MEMTIER_BENCHMARK_SHARD_CONNECTION_H #define MEMTIER_BENCHMARK_SHARD_CONNECTION_H #include #include #include #include #include #include #include #include #include "protocol.h" // forward decleration class connections_manager; struct benchmark_config; class abstract_protocol; class object_generator; enum connection_state { conn_disconnected, conn_in_progress, conn_connected }; enum authentication_state { auth_none, auth_sent, auth_done }; enum select_db_state { select_none, select_sent, select_done }; enum cluster_slots_state { slots_none, slots_sent, slots_done }; enum request_type { rt_unknown, rt_set, rt_get, rt_wait, rt_arbitrary, rt_auth, rt_select_db, rt_cluster_slots }; struct request { request_type m_type; struct timeval m_sent_time; unsigned int m_size; unsigned int m_keys; request(request_type type, unsigned int size, struct timeval* sent_time, unsigned int keys); virtual ~request(void) {} }; struct arbitrary_request : public request { size_t index; arbitrary_request(size_t request_index, request_type type, unsigned int size, struct timeval* sent_time); virtual ~arbitrary_request(void) {} }; struct verify_request : public request { char *m_key; unsigned int m_key_len; char *m_value; unsigned int m_value_len; verify_request(request_type type, unsigned int size, struct timeval* sent_time, unsigned int keys, const char *key, unsigned int key_len, const char *value, unsigned int value_len); virtual ~verify_request(void); }; class shard_connection { friend void cluster_client_read_handler(bufferevent *bev, void *ctx); friend void cluster_client_event_handler(bufferevent *bev, short events, void *ctx); public: shard_connection(unsigned int id, connections_manager* conn_man, benchmark_config* config, struct event_base* event_base, abstract_protocol* abs_protocol); ~shard_connection(); void set_address_port(const char* address, const char* port); const char* get_readable_id(); int connect(struct connect_info* addr); void disconnect(); void send_wait_command(struct timeval* sent_time, unsigned int num_slaves, unsigned int timeout); void send_set_command(struct timeval* sent_time, const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset); void send_get_command(struct timeval* sent_time, const char *key, int key_len, unsigned int offset); void send_mget_command(struct timeval* sent_time, const keylist* key_list); void send_verify_get_command(struct timeval* sent_time, const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset); int send_arbitrary_command(const command_arg *arg); int send_arbitrary_command(const command_arg *arg, const char *val, int val_len); void send_arbitrary_command_end(size_t command_index, struct timeval* sent_time, int cmd_size); void set_authentication() { m_authentication = auth_none; } void set_select_db() { m_db_selection = select_none; } void set_cluster_slots() { m_cluster_slots = slots_none; } enum cluster_slots_state get_cluster_slots_state() { return m_cluster_slots; } unsigned int get_id() { return m_id; } abstract_protocol* get_protocol() { return m_protocol; } const char* get_address() { return m_address; } const char* get_port() { return m_port; } enum connection_state get_connection_state() { return m_connection_state; } private: void setup_event(int sockfd); int setup_socket(struct connect_info* addr); void set_readable_id(); bool is_conn_setup_done(); void send_conn_setup_commands(struct timeval timestamp); request* pop_req(); void push_req(request* req); void process_response(void); void process_subsequent_requests(void); void process_first_request(); void fill_pipeline(void); void handle_event(short evtype); unsigned int m_id; connections_manager* m_conns_manager; benchmark_config* m_config; char* m_address; char* m_port; std::string m_readable_id; struct sockaddr_un* m_unix_sockaddr; struct bufferevent *m_bev; struct event_base* m_event_base; abstract_protocol* m_protocol; std::queue* m_pipeline; int m_pending_resp; enum connection_state m_connection_state; enum authentication_state m_authentication; enum select_db_state m_db_selection; enum cluster_slots_state m_cluster_slots; }; #endif //MEMTIER_BENCHMARK_SHARD_CONNECTION_H