/*
* Copyright (C) 2011-2017 Redis Labs Ltd.
*
* This file is part of memtier_benchmark.
*
* memtier_benchmark is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 2.
*
* memtier_benchmark is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with memtier_benchmark. If not, see .
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_SYS_TYPES_H
#include
#endif
#ifdef HAVE_FCNTL_H
#include
#endif
#include
#include
#include
#include
#ifdef HAVE_SYS_SOCKET_H
#include
#endif
#ifdef HAVE_NETINET_TCP_H
#include
#endif
#ifdef HAVE_LIMITS_H
#include
#endif
#ifdef HAVE_ASSERT_H
#include
#endif
#include "shard_connection.h"
#include "obj_gen.h"
#include "memtier_benchmark.h"
#include "connections_manager.h"
#include "event2/bufferevent.h"
#ifdef USE_TLS
#include
#include
#include "event2/bufferevent_ssl.h"
#endif
void cluster_client_read_handler(bufferevent *bev, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;
assert(sc != NULL);
sc->process_response();
}
void cluster_client_event_handler(bufferevent *bev, short events, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;
assert(sc != NULL);
sc->handle_event(events);
}
request::request(request_type type, unsigned int size, struct timeval* sent_time, unsigned int keys)
: m_type(type), m_size(size), m_keys(keys)
{
if (sent_time != NULL)
m_sent_time = *sent_time;
else {
gettimeofday(&m_sent_time, NULL);
}
}
arbitrary_request::arbitrary_request(size_t request_index, request_type type,
unsigned int size, struct timeval* sent_time) :
request(type, size, sent_time, 1),
index(request_index) {
}
verify_request::verify_request(request_type type,
unsigned int size,
struct timeval* sent_time,
unsigned int keys,
const char *key,
unsigned int key_len,
const char *value,
unsigned int value_len) :
request(type, size, sent_time, keys),
m_key(NULL), m_key_len(0),
m_value(NULL), m_value_len(0)
{
m_key_len = key_len;
m_key = (char *)malloc(key_len);
memcpy(m_key, key, m_key_len);
m_value_len = value_len;
m_value = (char *)malloc(value_len);
memcpy(m_value, value, m_value_len);
}
verify_request::~verify_request(void)
{
if (m_key != NULL) {
free((void *) m_key);
m_key = NULL;
}
if (m_value != NULL) {
free((void *) m_value);
m_value = NULL;
}
}
shard_connection::shard_connection(unsigned int id, connections_manager* conns_man, benchmark_config* config,
struct event_base* event_base, abstract_protocol* abs_protocol) :
m_address(NULL), m_port(NULL), m_unix_sockaddr(NULL),
m_bev(NULL), m_pending_resp(0), m_connection_state(conn_disconnected),
m_authentication(auth_done), m_db_selection(select_done), m_cluster_slots(slots_done) {
m_id = id;
m_conns_manager = conns_man;
m_config = config;
m_event_base = event_base;
if (m_config->unix_socket) {
m_unix_sockaddr = (struct sockaddr_un *) malloc(sizeof(struct sockaddr_un));
assert(m_unix_sockaddr != NULL);
m_unix_sockaddr->sun_family = AF_UNIX;
strncpy(m_unix_sockaddr->sun_path, m_config->unix_socket, sizeof(m_unix_sockaddr->sun_path)-1);
m_unix_sockaddr->sun_path[sizeof(m_unix_sockaddr->sun_path)-1] = '\0';
}
m_protocol = abs_protocol->clone();
assert(m_protocol != NULL);
m_pipeline = new std::queue;
assert(m_pipeline != NULL);
}
shard_connection::~shard_connection() {
if (m_address != NULL) {
free(m_address);
m_address = NULL;
}
if (m_port != NULL) {
free(m_port);
m_port = NULL;
}
if (m_unix_sockaddr != NULL) {
free(m_unix_sockaddr);
m_unix_sockaddr = NULL;
}
if (m_bev != NULL) {
bufferevent_free(m_bev);
m_bev = NULL;
}
if (m_protocol != NULL) {
delete m_protocol;
m_protocol = NULL;
}
if (m_pipeline != NULL) {
delete m_pipeline;
m_pipeline = NULL;
}
}
void shard_connection::setup_event(int sockfd) {
if (m_bev) {
bufferevent_free(m_bev);
}
#ifdef USE_TLS
if (m_config->openssl_ctx) {
SSL *ctx = SSL_new(m_config->openssl_ctx);
assert(ctx != NULL);
if (m_config->tls_sni) {
SSL_set_tlsext_host_name(ctx, m_config->tls_sni);
}
m_bev = bufferevent_openssl_socket_new(m_event_base,
sockfd, ctx, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_CLOSE_ON_FREE);
} else {
#endif
m_bev = bufferevent_socket_new(m_event_base, sockfd, BEV_OPT_CLOSE_ON_FREE);
#ifdef USE_TLS
}
#endif
assert(m_bev != NULL);
bufferevent_setcb(m_bev, cluster_client_read_handler,
NULL, cluster_client_event_handler, (void *)this);
m_protocol->set_buffers(bufferevent_get_input(m_bev), bufferevent_get_output(m_bev));
}
int shard_connection::setup_socket(struct connect_info* addr) {
int flags;
int sockfd;
if (m_unix_sockaddr != NULL) {
sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd < 0) {
return -1;
}
} else {
// initialize socket
sockfd = socket(addr->ci_family, addr->ci_socktype, addr->ci_protocol);
if (sockfd < 0) {
return -1;
}
// configure socket behavior
struct linger ling = {0, 0};
int flags = 1;
int error = setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &flags, sizeof(flags));
assert(error == 0);
error = setsockopt(sockfd, SOL_SOCKET, SO_LINGER, (void *) &ling, sizeof(ling));
assert(error == 0);
error = setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (void *) &flags, sizeof(flags));
assert(error == 0);
}
// set non-blocking behavior
flags = 1;
if ((flags = fcntl(sockfd, F_GETFL, 0)) < 0 ||
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) < 0) {
close(sockfd);
return -1;
}
return sockfd;
}
int shard_connection::connect(struct connect_info* addr) {
// set required setup commands
m_authentication = m_config->authenticate ? auth_none : auth_done;
m_db_selection = m_config->select_db ? select_none : select_done;
// setup socket
int sockfd = setup_socket(addr);
if (sockfd < 0) {
fprintf(stderr, "Failed to setup socket: %s", strerror(errno));
return -1;
}
// set up bufferevent
setup_event(sockfd);
// set readable id
set_readable_id();
// call connect
m_connection_state = conn_in_progress;
if (bufferevent_socket_connect(m_bev,
m_unix_sockaddr ? (struct sockaddr *) m_unix_sockaddr : addr->ci_addr,
m_unix_sockaddr ? sizeof(struct sockaddr_un) : addr->ci_addrlen) == -1) {
disconnect();
benchmark_error_log("connect failed, error = %s\n", strerror(errno));
return -1;
}
return 0;
}
void shard_connection::disconnect() {
if (m_bev) {
bufferevent_free(m_bev);
}
m_bev = NULL;
m_connection_state = conn_disconnected;
// by default no need to send any setup request
m_authentication = auth_done;
m_db_selection = select_done;
m_cluster_slots = slots_done;
}
void shard_connection::set_address_port(const char* address, const char* port) {
if (m_address != NULL) {
free(m_address);
}
m_address = strdup(address);
if (m_port != NULL) {
free(m_port);
}
m_port = strdup(port);
}
void shard_connection::set_readable_id() {
if (m_unix_sockaddr != NULL) {
m_readable_id.assign(m_config->unix_socket);
} else {
m_readable_id.assign(m_address);
m_readable_id.append(":");
m_readable_id.append(m_port);
}
}
const char* shard_connection::get_readable_id() {
return m_readable_id.c_str();
}
request* shard_connection::pop_req() {
request* req = m_pipeline->front();
m_pipeline->pop();
m_pending_resp--;
assert(m_pending_resp >= 0);
return req;
}
void shard_connection::push_req(request* req) {
m_pipeline->push(req);
m_pending_resp++;
}
bool shard_connection::is_conn_setup_done() {
return m_authentication == auth_done &&
m_db_selection == select_done &&
m_cluster_slots == slots_done;
}
void shard_connection::send_conn_setup_commands(struct timeval timestamp) {
if (m_authentication == auth_none) {
benchmark_debug_log("sending authentication command.\n");
m_protocol->authenticate(m_config->authenticate);
push_req(new request(rt_auth, 0, ×tamp, 0));
m_authentication = auth_sent;
}
if (m_db_selection == select_none) {
benchmark_debug_log("sending db selection command.\n");
m_protocol->select_db(m_config->select_db);
push_req(new request(rt_select_db, 0, ×tamp, 0));
m_db_selection = select_sent;
}
if (m_cluster_slots == slots_none) {
benchmark_debug_log("sending cluster slots command.\n");
// in case we send CLUSTER SLOTS command, we need to keep the response to parse it
m_protocol->set_keep_value(true);
m_protocol->write_command_cluster_slots();
push_req(new request(rt_cluster_slots, 0, ×tamp, 0));
m_cluster_slots = slots_sent;
}
}
void shard_connection::process_response(void)
{
int ret;
bool responses_handled = false;
struct timeval now;
gettimeofday(&now, NULL);
while ((ret = m_protocol->parse_response()) > 0) {
bool error = false;
protocol_response *r = m_protocol->get_response();
request* req = pop_req();
switch (req->m_type)
{
case rt_auth:
if (r->is_error()) {
benchmark_error_log("error: authentication failed [%s]\n", r->get_status());
error = true;
} else {
m_authentication = auth_done;
benchmark_debug_log("authentication successful.\n");
}
break;
case rt_select_db:
if (strcmp(r->get_status(), "+OK") != 0) {
benchmark_error_log("database selection failed.\n");
error = true;
} else {
benchmark_debug_log("database selection successful.\n");
m_db_selection = select_done;
}
break;
case rt_cluster_slots:
if (r->get_mbulk_value() == NULL || r->get_mbulk_value()->mbulks_elements.size() == 0) {
benchmark_error_log("cluster slot failed.\n");
error = true;
} else {
// parse response
m_conns_manager->handle_cluster_slots(r);
m_protocol->set_keep_value(false);
m_cluster_slots = slots_done;
benchmark_debug_log("cluster slot command successful\n");
}
break;
default:
benchmark_debug_log("server %s: handled response (first line): %s, %d hits, %d misses\n",
get_readable_id(),
r->get_status(),
r->get_hits(),
req->m_keys - r->get_hits());
m_conns_manager->handle_response(m_id, now, req, r);
m_conns_manager->inc_reqs_processed();
responses_handled = true;
break;
}
delete req;
if (error) {
return;
}
}
if (ret == -1) {
benchmark_error_log("error: response parsing failed.\n");
}
if (m_config->reconnect_interval > 0 && responses_handled) {
if ((m_conns_manager->get_reqs_processed() % m_config->reconnect_interval) == 0) {
assert(m_pipeline->size() == 0);
benchmark_debug_log("reconnecting, m_reqs_processed = %u\n", m_conns_manager->get_reqs_processed());
// client manage connection & disconnection of shard
m_conns_manager->disconnect();
ret = m_conns_manager->connect();
assert(ret == 0);
return;
}
}
fill_pipeline();
// update events
if (m_bev != NULL) {
// no pending response (nothing to read) and output buffer empty (nothing to write)
if ((m_pending_resp == 0) && (evbuffer_get_length(bufferevent_get_output(m_bev)) == 0)) {
bufferevent_disable(m_bev, EV_WRITE|EV_READ);
}
}
if (m_conns_manager->finished()) {
m_conns_manager->set_end_time();
}
}
void shard_connection::process_first_request() {
m_conns_manager->set_start_time();
fill_pipeline();
}
void shard_connection::fill_pipeline(void)
{
struct timeval now;
gettimeofday(&now, NULL);
while (!m_conns_manager->finished() && m_pipeline->size() < m_config->pipeline) {
if (!is_conn_setup_done()) {
send_conn_setup_commands(now);
return;
}
// don't exceed requests
if (m_conns_manager->hold_pipeline(m_id)) {
break;
}
// client manage requests logic
m_conns_manager->create_request(now, m_id);
}
}
void shard_connection::handle_event(short events)
{
// connect() returning to us? normally we expect EV_WRITE, but for UNIX domain
// sockets we workaround since connect() returned immediately, but we don't want
// to do any I/O from the client::connect() call...
if ((get_connection_state() == conn_in_progress) && (events & BEV_EVENT_CONNECTED)) {
m_connection_state = conn_connected;
bufferevent_enable(m_bev, EV_READ|EV_WRITE);
if (!m_conns_manager->get_reqs_processed()) {
process_first_request();
} else {
benchmark_debug_log("reconnection complete, proceeding with test\n");
fill_pipeline();
}
return;
}
if (events & BEV_EVENT_ERROR) {
bool ssl_error = false;
#ifdef USE_TLS
unsigned long sslerr;
while ((sslerr = bufferevent_get_openssl_error(m_bev))) {
ssl_error = true;
benchmark_error_log("TLS connection error: %s\n",
ERR_reason_error_string(sslerr));
}
#endif
if (!ssl_error && errno) {
benchmark_error_log("Connection error: %s\n", strerror(errno));
}
disconnect();
return;
}
if (events & BEV_EVENT_EOF) {
benchmark_error_log("connection dropped.\n");
disconnect();
return;
}
}
void shard_connection::send_wait_command(struct timeval* sent_time,
unsigned int num_slaves, unsigned int timeout) {
int cmd_size = 0;
benchmark_debug_log("WAIT num_slaves=%u timeout=%u\n", num_slaves, timeout);
cmd_size = m_protocol->write_command_wait(num_slaves, timeout);
push_req(new request(rt_wait, cmd_size, sent_time, 0));
}
void shard_connection::send_set_command(struct timeval* sent_time, const char *key, int key_len,
const char *value, int value_len, int expiry, unsigned int offset) {
int cmd_size = 0;
benchmark_debug_log("server %s: SET key=[%.*s] value_len=%u expiry=%u\n",
get_readable_id(), key_len, key, value_len, expiry);
cmd_size = m_protocol->write_command_set(key, key_len, value, value_len,
expiry, offset);
push_req(new request(rt_set, cmd_size, sent_time, 1));
}
void shard_connection::send_get_command(struct timeval* sent_time,
const char *key, int key_len, unsigned int offset) {
int cmd_size = 0;
benchmark_debug_log("server %s: GET key=[%.*s]\n", get_readable_id(), key_len, key);
cmd_size = m_protocol->write_command_get(key, key_len, offset);
push_req(new request(rt_get, cmd_size, sent_time, 1));
}
void shard_connection::send_mget_command(struct timeval* sent_time, const keylist* key_list) {
int cmd_size = 0;
const char *first_key, *last_key;
unsigned int first_key_len, last_key_len;
first_key = key_list->get_key(0, &first_key_len);
last_key = key_list->get_key(key_list->get_keys_count()-1, &last_key_len);
benchmark_debug_log("MGET %d keys [%.*s] .. [%.*s]\n",
key_list->get_keys_count(), first_key_len, first_key, last_key_len, last_key);
cmd_size = m_protocol->write_command_multi_get(key_list);
push_req(new request(rt_get, cmd_size, sent_time, key_list->get_keys_count()));
}
void shard_connection::send_verify_get_command(struct timeval* sent_time, const char *key, int key_len,
const char *value, int value_len, int expiry, unsigned int offset) {
int cmd_size = 0;
benchmark_debug_log("GET key=[%.*s] value_len=%u expiry=%u\n",
key_len, key, value_len, expiry);
cmd_size = m_protocol->write_command_get(key, key_len, offset);
push_req(new verify_request(rt_get, cmd_size, sent_time, 1, key, key_len, value, value_len));
}
/*
* arbitrary command:
*
* we send the arbitrary command in several iterations, where on each iteration
* different type of argument can be sent (const/randomized).
*
* since we do it on several iterations, we call to arbitrary_command_end() to mark that
* all the command sent
*/
int shard_connection::send_arbitrary_command(const command_arg *arg) {
int cmd_size = 0;
cmd_size = m_protocol->write_arbitrary_command(arg);
return cmd_size;
}
int shard_connection::send_arbitrary_command(const command_arg *arg, const char *val, int val_len) {
int cmd_size = 0;
if (arg->type == key_type) {
benchmark_debug_log("key: value[%.*s]\n", val_len, val);
} else {
benchmark_debug_log("data: value_len=%u\n", val_len);
}
cmd_size = m_protocol->write_arbitrary_command(val, val_len);
return cmd_size;
}
void shard_connection::send_arbitrary_command_end(size_t command_index, struct timeval* sent_time, int cmd_size) {
push_req(new arbitrary_request(command_index, rt_arbitrary, cmd_size, sent_time));
}