/*
* Copyright (C) 2011-2017 Redis Labs Ltd.
*
* This file is part of memtier_benchmark.
*
* memtier_benchmark is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 2.
*
* memtier_benchmark is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with memtier_benchmark. If not, see .
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_SYS_TYPES_H
#include
#endif
#ifdef HAVE_FCNTL_H
#include
#endif
#include
#include
#include
#include
#ifdef HAVE_SYS_SOCKET_H
#include
#endif
#ifdef HAVE_NETINET_TCP_H
#include
#endif
#ifdef HAVE_LIMITS_H
#include
#endif
#ifdef HAVE_ASSERT_H
#include
#endif
#include
#include
#include
#include "client.h"
#include "cluster_client.h"
bool client::setup_client(benchmark_config *config, abstract_protocol *protocol, object_generator *objgen)
{
m_config = config;
assert(m_config != NULL);
unsigned long long total_num_of_clients = config->clients*config->threads;
// create main connection
shard_connection* conn = new shard_connection(m_connections.size(), this, m_config, m_event_base, protocol);
m_connections.push_back(conn);
m_obj_gen = objgen->clone();
assert(m_obj_gen != NULL);
if (config->distinct_client_seed && config->randomize)
m_obj_gen->set_random_seed(config->randomize + config->next_client_idx);
else if (config->randomize)
m_obj_gen->set_random_seed(config->randomize);
else if (config->distinct_client_seed)
m_obj_gen->set_random_seed(config->next_client_idx);
// Parallel key-pattern determined according to the first command
if ((config->arbitrary_commands->is_defined() && config->arbitrary_commands->at(0).key_pattern == 'P') ||
(config->key_pattern[key_pattern_set]=='P')) {
unsigned long long client_index = config->next_client_idx % total_num_of_clients;
unsigned long long range = (config->key_maximum - config->key_minimum)/total_num_of_clients + 1;
unsigned long long min = config->key_minimum + (range * client_index);
unsigned long long max = min + range - 1;
if (client_index == (total_num_of_clients - 1)) {
max = config->key_maximum; //the last clients takes the leftover
}
m_obj_gen->set_key_range(min, max);
}
config->next_client_idx++;
m_keylist = new keylist(m_config->multi_key_get + 1);
assert(m_keylist != NULL);
return true;
}
client::client(client_group* group) :
m_event_base(NULL), m_initialized(false), m_end_set(false), m_config(NULL),
m_obj_gen(NULL), m_stats(group->get_config()), m_reqs_processed(0), m_reqs_generated(0),
m_set_ratio_count(0), m_get_ratio_count(0),
m_arbitrary_command_ratio_count(0), m_executed_command_index(0),
m_tot_set_ops(0), m_tot_wait_ops(0)
{
m_event_base = group->get_event_base();
if (!setup_client(group->get_config(), group->get_protocol(), group->get_obj_gen())) {
return;
}
benchmark_debug_log("new client %p successfully set up.\n", this);
m_initialized = true;
}
client::client(struct event_base *event_base, benchmark_config *config,
abstract_protocol *protocol, object_generator *obj_gen) :
m_event_base(NULL), m_initialized(false), m_end_set(false), m_config(NULL),
m_obj_gen(NULL), m_stats(config), m_reqs_processed(0), m_reqs_generated(0),
m_set_ratio_count(0), m_get_ratio_count(0),
m_arbitrary_command_ratio_count(0), m_executed_command_index(0),
m_tot_set_ops(0), m_tot_wait_ops(0), m_keylist(NULL)
{
m_event_base = event_base;
if (!setup_client(config, protocol, obj_gen)) {
return;
}
benchmark_debug_log("new client %p successfully set up.\n", this);
m_initialized = true;
}
client::~client()
{
for (unsigned int i = 0; i < m_connections.size(); i++) {
shard_connection* sc = m_connections[i];
delete sc;
}
m_connections.clear();
if (m_obj_gen != NULL) {
delete m_obj_gen;
m_obj_gen = NULL;
}
if (m_keylist != NULL) {
delete m_keylist;
m_keylist = NULL;
}
}
bool client::initialized(void)
{
return m_initialized;
}
void client::disconnect(void)
{
shard_connection* sc = MAIN_CONNECTION;
assert(sc != NULL);
sc->disconnect();
}
int client::connect(void)
{
struct connect_info addr;
// get primary connection
shard_connection* sc = MAIN_CONNECTION;
assert(sc != NULL);
// get address information
if (m_config->unix_socket == NULL) {
if (m_config->server_addr->get_connect_info(&addr) != 0) {
benchmark_error_log("connect: resolve error: %s\n", m_config->server_addr->get_last_error());
return -1;
}
// Just in case we got domain name and not ip, we convert it
struct sockaddr_in *ipv4 = (struct sockaddr_in *)addr.ci_addr;
char address[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(ipv4->sin_addr), address, INET_ADDRSTRLEN);
char port_str[20];
snprintf(port_str, sizeof(port_str)-1, "%u", m_config->port);
// save address and port
sc->set_address_port(address, port_str);
}
// call connect
int ret = sc->connect(&addr);
if (ret)
return ret;
return 0;
}
bool client::finished(void)
{
if (m_config->requests > 0 && m_reqs_processed >= m_config->requests)
return true;
if (m_config->test_time > 0 && m_stats.get_duration() >= m_config->test_time)
return true;
return false;
}
void client::set_start_time() {
struct timeval now;
gettimeofday(&now, NULL);
m_stats.set_start_time(&now);
}
void client::set_end_time() {
// update only once
if (!m_end_set) {
benchmark_debug_log("nothing else to do, test is finished.\n");
m_stats.set_end_time(NULL);
m_end_set = true;
}
}
bool client::hold_pipeline(unsigned int conn_id) {
// don't exceed requests
if (m_config->requests) {
if (m_reqs_generated >= m_config->requests)
return true;
}
// if we have reconnect_interval stop enlarging the pipeline on time
if (m_config->reconnect_interval) {
if ((m_reqs_processed % m_config->reconnect_interval) + (m_reqs_generated - m_reqs_processed) >= m_config->reconnect_interval)
return true;
}
return false;
}
void client::create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id) {
int cmd_size = 0;
benchmark_debug_log("%s [%s]:\n", cmd->command_name.c_str(), cmd->command.c_str());
for (unsigned int i = 0; i < cmd->command_args.size(); i++) {
const command_arg* arg = &cmd->command_args[i];
if (arg->type == const_type) {
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg);
} else if (arg->type == key_type) {
int iter = get_arbitrary_obj_iter_type(cmd, m_executed_command_index);
unsigned int key_len;
const char *key = m_obj_gen->get_key(iter, &key_len);
assert(key != NULL);
assert(key_len > 0);
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, key, key_len);
} else if (arg->type == data_type) {
unsigned int value_len;
const char *value = m_obj_gen->get_value(0, &value_len);
assert(value != NULL);
assert(value_len > 0);
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, value, value_len);
}
}
m_connections[conn_id]->send_arbitrary_command_end(m_executed_command_index, ×tamp, cmd_size);
m_reqs_generated++;
}
// This function could use some urgent TLC -- but we need to do it without altering the behavior
void client::create_request(struct timeval timestamp, unsigned int conn_id)
{
// are we using arbitrary command?
if (m_config->arbitrary_commands->is_defined()) {
const arbitrary_command* executed_command = m_config->arbitrary_commands->get_next_executed_command(m_arbitrary_command_ratio_count,
m_executed_command_index);
create_arbitrary_request(executed_command, timestamp, conn_id);
return;
}
// If the Set:Wait ratio is not 0, start off with WAITs
if (m_config->wait_ratio.b &&
(m_tot_wait_ops == 0 ||
(m_tot_set_ops/m_tot_wait_ops > m_config->wait_ratio.a/m_config->wait_ratio.b))) {
m_tot_wait_ops++;
unsigned int num_slaves = m_obj_gen->random_range(m_config->num_slaves.min, m_config->num_slaves.max);
unsigned int timeout = m_obj_gen->normal_distribution(m_config->wait_timeout.min,
m_config->wait_timeout.max, 0,
((m_config->wait_timeout.max - m_config->wait_timeout.min)/2.0) + m_config->wait_timeout.min);
m_connections[conn_id]->send_wait_command(×tamp, num_slaves, timeout);
m_reqs_generated++;
}
// are we set or get? this depends on the ratio
else if (m_set_ratio_count < m_config->ratio.a) {
// set command
data_object *obj = m_obj_gen->get_object(obj_iter_type(m_config, 0));
unsigned int key_len;
const char *key = obj->get_key(&key_len);
unsigned int value_len;
const char *value = obj->get_value(&value_len);
m_connections[conn_id]->send_set_command(×tamp, key, key_len,
value, value_len, obj->get_expiry(),
m_config->data_offset);
m_reqs_generated++;
m_set_ratio_count++;
m_tot_set_ops++;
} else if (m_get_ratio_count < m_config->ratio.b) {
// get command
int iter = obj_iter_type(m_config, 2);
if (m_config->multi_key_get > 0) {
unsigned int keys_count;
keys_count = m_config->ratio.b - m_get_ratio_count;
if ((int)keys_count > m_config->multi_key_get)
keys_count = m_config->multi_key_get;
m_keylist->clear();
while (m_keylist->get_keys_count() < keys_count) {
unsigned int keylen;
const char *key = m_obj_gen->get_key(iter, &keylen);
assert(key != NULL);
assert(keylen > 0);
m_keylist->add_key(key, keylen);
}
m_connections[conn_id]->send_mget_command(×tamp, m_keylist);
m_reqs_generated++;
m_get_ratio_count += keys_count;
} else {
unsigned int keylen;
const char *key = m_obj_gen->get_key(iter, &keylen);
assert(key != NULL);
assert(keylen > 0);
m_connections[conn_id]->send_get_command(×tamp, key, keylen, m_config->data_offset);
m_reqs_generated++;
m_get_ratio_count++;
}
} else {
// overlap counters
m_get_ratio_count = m_set_ratio_count = 0;
}
}
int client::prepare(void)
{
if (MAIN_CONNECTION == NULL)
return -1;
int ret = this->connect();
if (ret < 0) {
benchmark_error_log("prepare: failed to connect, test aborted.\n");
return ret;
}
return 0;
}
void client::handle_response(unsigned int conn_id, struct timeval timestamp,
request *request, protocol_response *response)
{
if (response->is_error()) {
benchmark_error_log("server %s handle error response: %s\n",
m_connections[conn_id]->get_readable_id(),
response->get_status());
}
switch (request->m_type) {
case rt_get:
m_stats.update_get_op(×tamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp),
response->get_hits(),
request->m_keys - response->get_hits());
break;
case rt_set:
m_stats.update_set_op(×tamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
break;
case rt_wait:
m_stats.update_wait_op(×tamp,
ts_diff(request->m_sent_time, timestamp));
break;
case rt_arbitrary: {
arbitrary_request *ar = static_cast(request);
m_stats.update_arbitrary_op(×tamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp),
ar->index);
break;
}
default:
assert(0);
break;
}
}
///////////////////////////////////////////////////////////////////////////
verify_client::verify_client(struct event_base *event_base,
benchmark_config *config,
abstract_protocol *protocol,
object_generator *obj_gen) : client(event_base, config, protocol, obj_gen),
m_finished(false), m_verified_keys(0), m_errors(0)
{
MAIN_CONNECTION->get_protocol()->set_keep_value(true);
}
unsigned long long int verify_client::get_verified_keys(void)
{
return m_verified_keys;
}
unsigned long long int verify_client::get_errors(void)
{
return m_errors;
}
void verify_client::create_request(struct timeval timestamp, unsigned int conn_id)
{
// TODO: Refactor client::create_request so this can be unified.
if (m_set_ratio_count < m_config->ratio.a) {
// Prepare a GET request that will be compared against a previous
// SET request.
data_object *obj = m_obj_gen->get_object(obj_iter_type(m_config, 0));
unsigned int key_len;
const char *key = obj->get_key(&key_len);
unsigned int value_len;
const char *value = obj->get_value(&value_len);
m_connections[conn_id]->send_verify_get_command(×tamp, key, key_len,
value, value_len, obj->get_expiry(),
m_config->data_offset);
m_set_ratio_count++;
} else if (m_get_ratio_count < m_config->ratio.b) {
// We don't really care about GET operations, all we do here is keep
// the object generator synced.
int iter = obj_iter_type(m_config, 2);
if (m_config->multi_key_get > 0) {
unsigned int keys_count;
keys_count = m_config->ratio.b - m_get_ratio_count;
if ((int)keys_count > m_config->multi_key_get)
keys_count = m_config->multi_key_get;
m_keylist->clear();
while (m_keylist->get_keys_count() < keys_count) {
unsigned int keylen;
const char *key = m_obj_gen->get_key(iter, &keylen);
assert(key != NULL);
assert(keylen > 0);
m_keylist->add_key(key, keylen);
}
m_get_ratio_count += keys_count;
} else {
unsigned int keylen;
m_obj_gen->get_key(iter, &keylen);
m_get_ratio_count++;
}
// We don't really send this request, but need to count it to be in sync.
m_reqs_processed++;
} else {
m_get_ratio_count = m_set_ratio_count = 0;
}
}
void verify_client::handle_response(unsigned int conn_id, struct timeval timestamp,
request *request, protocol_response *response)
{
unsigned int rvalue_len;
const char *rvalue = response->get_value(&rvalue_len);
verify_request *vr = static_cast(request);
assert(vr->m_type == rt_get);
if (response->is_error()) {
benchmark_error_log("error: request for key [%.*s] failed: %s\n",
vr->m_key_len, vr->m_key, response->get_status());
m_errors++;
} else {
if (!rvalue || rvalue_len != vr->m_value_len || memcmp(rvalue, vr->m_value, rvalue_len) != 0) {
benchmark_error_log("error: key [%.*s]: expected [%.*s], got [%.*s]\n",
vr->m_key_len, vr->m_key,
vr->m_value_len, vr->m_value,
rvalue_len, rvalue);
m_errors++;
} else {
benchmark_debug_log("key: [%.*s] verified successfuly.\n",
vr->m_key_len, vr->m_key);
m_verified_keys++;
}
}
}
bool verify_client::finished(void)
{
if (m_finished)
return true;
if (m_config->requests > 0 && m_reqs_processed >= m_config->requests)
return true;
return false;
}
///////////////////////////////////////////////////////////////////////////
client_group::client_group(benchmark_config* config, abstract_protocol *protocol, object_generator* obj_gen) :
m_base(NULL), m_config(config), m_protocol(protocol), m_obj_gen(obj_gen)
{
m_base = event_base_new();
assert(m_base != NULL);
assert(protocol != NULL);
assert(obj_gen != NULL);
}
client_group::~client_group(void)
{
for (std::vector::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
client* c = *i;
delete c;
}
m_clients.clear();
if (m_base != NULL)
event_base_free(m_base);
m_base = NULL;
}
int client_group::create_clients(int num)
{
for (int i = 0; i < num; i++) {
client* c;
if (m_config->cluster_mode)
c = new cluster_client(this);
else
c = new client(this);
assert(c != NULL);
if (!c->initialized()) {
delete c;
return i;
}
m_clients.push_back(c);
}
return num;
}
int client_group::prepare(void)
{
for (std::vector::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
client* c = *i;
int ret = c->prepare();
if (ret < 0) {
return ret;
}
}
return 0;
}
void client_group::run(void)
{
event_base_dispatch(m_base);
}
unsigned long int client_group::get_total_bytes(void)
{
unsigned long int total_bytes = 0;
for (std::vector::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
total_bytes += (*i)->get_stats()->get_total_bytes();
}
return total_bytes;
}
unsigned long int client_group::get_total_ops(void)
{
unsigned long int total_ops = 0;
for (std::vector::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
total_ops += (*i)->get_stats()->get_total_ops();
}
return total_ops;
}
unsigned long int client_group::get_total_latency(void)
{
unsigned long int total_latency = 0;
for (std::vector::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
total_latency += (*i)->get_stats()->get_total_latency();
}
return total_latency;
}
unsigned long int client_group::get_duration_usec(void)
{
unsigned long int duration = 0;
unsigned int thread_counter = 1;
for (std::vector::iterator i = m_clients.begin(); i != m_clients.end(); i++, thread_counter++) {
float factor = ((float)(thread_counter - 1) / thread_counter);
duration = factor * duration + (float)(*i)->get_stats()->get_duration_usec() / thread_counter ;
}
return duration;
}
void client_group::merge_run_stats(run_stats* target)
{
assert(target != NULL);
unsigned int iteration_counter = 1;
for (std::vector::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
target->merge(*(*i)->get_stats(), iteration_counter++);
}
}
void client_group::write_client_stats(const char *prefix)
{
unsigned int client_id = 0;
for (std::vector::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
char filename[PATH_MAX];
snprintf(filename, sizeof(filename)-1, "%s-%u.csv", prefix, client_id++);
if (!(*i)->get_stats()->save_csv(filename, m_config)) {
fprintf(stderr, "error: %s: failed to write client stats.\n", filename);
}
}
}