#!/usr/bin/python3 # * Copyright (c) 2019 TAOS Data, Inc. # * # * This program is free software: you can use, redistribute, and/or modify # * it under the terms of the GNU Affero General Public License, version 3 # * or later ("AGPL"), as published by the Free Software Foundation. # * # * This program is distributed in the hope that it will be useful, but WITHOUT # * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # * FITNESS FOR A PARTICULAR PURPOSE. # * # * You should have received a copy of the GNU Affero General Public License # * along with this program. If not, see . # -*- coding: utf-8 -*- import sys import getopt import requests import json import random import time import datetime from multiprocessing import Manager, Pool from multipledispatch import dispatch from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED @dispatch(str, str) def v_print(msg: str, arg: str): if verbose: print(msg % arg) @dispatch(str, str, str) def v_print(msg: str, arg1: str, arg2: str): if verbose: print(msg % (arg1, arg2)) @dispatch(str, str, str, str) def v_print(msg: str, arg1: str, arg2: str, arg3: str): if verbose: print(msg % (arg1, arg2, arg3)) @dispatch(str, str, str, str, str) def v_print(msg: str, arg1: str, arg2: str, arg3: str, arg4: str): if verbose: print(msg % (arg1, arg2, arg3, arg4)) @dispatch(str, int) def v_print(msg: str, arg: int): if verbose: print(msg % int(arg)) @dispatch(str, int, str) def v_print(msg: str, arg1: int, arg2: str): if verbose: print(msg % (int(arg1), str(arg2))) @dispatch(str, str, int) def v_print(msg: str, arg1: str, arg2: int): if verbose: print(msg % (arg1, int(arg2))) @dispatch(str, int, int) def v_print(msg: str, arg1: int, arg2: int): if verbose: print(msg % (int(arg1), int(arg2))) @dispatch(str, int, int, str) def v_print(msg: str, arg1: int, arg2: int, arg3: str): if verbose: print(msg % (int(arg1), int(arg2), str(arg3))) @dispatch(str, int, int, int) def v_print(msg: str, arg1: int, arg2: int, arg3: int): if verbose: print(msg % (int(arg1), int(arg2), int(arg3))) @dispatch(str, int, int, int, int) def v_print(msg: str, arg1: int, arg2: int, arg3: int, arg4: int): if verbose: print(msg % (int(arg1), int(arg2), int(arg3), int(arg4))) def restful_execute(host: str, port: int, user: str, password: str, cmd: str): url = "http://%s:%d/rest/sql" % (host, restPort) v_print("restful_execute - cmd: %s", cmd) resp = requests.post(url, cmd, auth=(user, password)) v_print("resp status: %d", resp.status_code) if debug: v_print("resp text: %s", json.dumps(resp.json(), sort_keys=True, indent=2)) else: print("resp: %s" % json.dumps(resp.json())) def query_func(process: int, thread: int, cmd: str): v_print("%d process %d thread cmd: %s", process, thread, cmd) if oneMoreHost != "NotSupported" and random.randint(0, 1) == 1: v_print("%s", "Send to second host") if native: cursor.execute(cmd) else: restful_execute(oneMoreHost, port, user, password, cmd) else: v_print("%s%s%s", "Send ", cmd, " to the host") if native: pass # cursor.execute(cmd) else: restful_execute(host, port, user, password, cmd) def query_data_process(cmd: str): # establish connection if native if native: v_print("host:%s, user:%s passwd:xxxxxx configDir:%s ", host, user, configDir) try: conn = taos.connect( host=host, user=user, password=password, config=configDir ) v_print("conn: %s", str(conn.__class__)) except Exception as e: print("Error: %s" % e.args[0]) sys.exit(1) try: cursor = conn.cursor() v_print("cursor:%d %s", id(cursor), str(cursor.__class__)) except Exception as e: print("Error: %s" % e.args[0]) conn.close() sys.exit(1) if native: try: cursor.execute(cmd) cols = cursor.description print(cols) data = cursor.fetchall() for col in data: print(col) except Exception as e: conn.close() print("Error: %s" % e.args[0]) sys.exit(1) else: restful_execute(host, port, user, password, cmd) if native: cursor.close() conn.close() def create_stb(): for i in range(0, numOfStb): if native: cursor.execute( "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))" % (stbName, i) ) else: restful_execute( host, port, user, password, "CREATE TABLE IF NOT EXISTS %s%d (ts timestamp, value float) TAGS (uuid binary(50))" % (stbName, i), ) def use_database(): if native: cursor.execute("USE %s" % current_db) else: restful_execute(host, port, user, password, "USE %s" % current_db) def create_databases(): for i in range(0, numOfDb): v_print("will create database db%d", int(i)) if native: cursor.execute("CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i)) else: restful_execute( host, port, user, password, "CREATE DATABASE IF NOT EXISTS %s%d" % (dbName, i), ) def drop_tables(): # TODO v_print("TODO: drop tables total %d", numOfTb) pass def drop_stable(): # TODO v_print("TODO: drop stables total %d", numOfStb) pass def drop_databases(): v_print("drop databases total %d", numOfDb) # drop exist databases first for i in range(0, numOfDb): v_print("will drop database db%d", int(i)) if native: cursor.execute("DROP DATABASE IF EXISTS %s%d" % (dbName, i)) else: restful_execute( host, port, user, password, "DROP DATABASE IF EXISTS %s%d" % (dbName, i) ) def insert_func(process: int, thread: int): v_print("%d process %d thread, insert_func ", process, thread) # generate uuid uuid_int = random.randint(0, numOfTb + 1) uuid = "%s" % uuid_int v_print("uuid is: %s", uuid) # establish connection if native if native: v_print("host:%s, user:%s passwd:xxxxxx configDir:%s ", host, user, configDir) try: conn = taos.connect( host=host, user=user, password=password, config=configDir ) v_print("conn: %s", str(conn.__class__)) except Exception as e: print("Error: %s" % e.args[0]) sys.exit(1) try: cursor = conn.cursor() v_print("cursor:%d %s", id(cursor), str(cursor.__class__)) except Exception as e: print("Error: %s" % e.args[0]) conn.close() sys.exit(1) v_print("numOfRec %d:", numOfRec) row = 0 while row < numOfRec: v_print("row: %d", row) sqlCmd = ["INSERT INTO "] try: sqlCmd.append("%s.%s%d " % (current_db, tbName, thread)) if numOfStb > 0 and autosubtable: sqlCmd.append( "USING %s.%s%d TAGS('%s') " % (current_db, stbName, numOfStb - 1, uuid) ) start_time = datetime.datetime(2021, 1, 25) + datetime.timedelta( seconds=row ) sqlCmd.append("VALUES ") for batchIter in range(0, batch): sqlCmd.append( "('%s', %f) " % ( start_time + datetime.timedelta(milliseconds=batchIter), random.random(), ) ) row = row + 1 if row >= numOfRec: v_print("BREAK, row: %d numOfRec:%d", row, numOfRec) break except Exception as e: print("Error: %s" % e.args[0]) cmd = " ".join(sqlCmd) if measure: exec_start_time = datetime.datetime.now() if native: affectedRows = cursor.execute(cmd) print("affectedRows: %d" % affectedRows) else: restful_execute(host, port, user, password, cmd) if measure: exec_end_time = datetime.datetime.now() exec_delta = exec_end_time - exec_start_time v_print("consume %d microseconds", exec_delta.microseconds) v_print("cmd: %s, length:%d", cmd, len(cmd)) if native: cursor.close() conn.close() def create_tb_using_stb(): # TODO: pass def create_tb(): v_print("create_tb() numOfTb: %d", numOfTb) for i in range(0, numOfDb): if native: cursor.execute("USE %s%d" % (dbName, i)) else: restful_execute(host, port, user, password, "USE %s%d" % (dbName, i)) for j in range(0, numOfTb): if native: cursor.execute( "CREATE TABLE %s%d (ts timestamp, value float)" % (tbName, j) ) else: restful_execute( host, port, user, password, "CREATE TABLE %s%d (ts timestamp, value float)" % (tbName, j), ) def insert_data_process(lock, i: int, begin: int, end: int): lock.acquire() tasks = end - begin v_print( "insert_data_process:%d table from %d to %d, tasks %d", i, begin, end, tasks ) if threads < (end - begin): for j in range(begin, end, threads): with ThreadPoolExecutor(max_workers=threads) as executor: k = end if ((j + threads) > end) else (j + threads) workers = [executor.submit(insert_func, i, n) for n in range(j, k)] wait(workers, return_when=ALL_COMPLETED) else: with ThreadPoolExecutor(max_workers=threads) as executor: workers = [executor.submit(insert_func, i, j) for j in range(begin, end)] wait(workers, return_when=ALL_COMPLETED) lock.release() def query_db(i): if native: cursor.execute("USE %s%d" % (dbName, i)) else: restful_execute(host, port, user, password, "USE %s%d" % (dbName, i)) for j in range(0, numOfTb): if native: cursor.execute("SELECT COUNT(*) FROM %s%d" % (tbName, j)) else: restful_execute( host, port, user, password, "SELECT COUNT(*) FROM %s%d" % (tbName, j) ) def printConfig(): print("###################################################################") print("# Use native interface: %s" % native) print("# Server IP: %s" % host) if native: print("# Server port: %s" % port) else: print("# Server port: %s" % restPort) print("# Configuration Dir: %s" % configDir) print("# User: %s" % user) print("# Number of Columns per record: %s" % colsPerRecord) print("# Number of Threads: %s" % threads) print("# Number of Processes: %s" % processes) print("# Number of Tables: %s" % numOfTb) print("# Number of records per Table: %s" % numOfRec) print("# Records/Request: %s" % batch) print("# Database name: %s" % dbName) print("# Replica: %s" % replica) print("# Use STable: %s" % useStable) print("# Table prefix: %s" % tbName) if useStable: print("# STable prefix: %s" % stbName) print("# Data order: %s" % outOfOrder) print("# Data out of order rate: %s" % rateOOOO) print("# Delete method: %s" % deleteMethod) print("# Query command: %s" % queryCmd) print("# Insert Only: %s" % insertOnly) print("# Verbose output %s" % verbose) print( "# Test time: %s" % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S") ) print("###################################################################") if __name__ == "__main__": native = False verbose = False debug = False measure = True dropDbOnly = False colsPerRecord = 3 numOfDb = 1 dbName = "test" replica = 1 batch = 1 numOfTb = 1 tbName = "tb" useStable = False numOfStb = 0 stbName = "stb" numOfRec = 10 ieration = 1 host = "127.0.0.1" configDir = "/etc/taos" oneMoreHost = "NotSupported" port = 6030 restPort = 6041 user = "root" defaultPass = "taosdata" processes = 1 threads = 1 insertOnly = False autosubtable = False queryCmd = "NO" outOfOrder = 0 rateOOOO = 0 deleteMethod = 0 skipPrompt = False try: opts, args = getopt.gnu_getopt( sys.argv[1:], "Nh:p:u:P:d:a:m:Ms:Q:T:C:r:l:t:n:c:xOR:D:vgyH", [ "native", "host", "port", "user", "password", "dbname", "replica", "tbname", "stable", "stbname", "query", "threads", "processes", "recPerReq", "colsPerRecord", "numOfTb", "numOfRec", "config", "insertOnly", "outOfOrder", "rateOOOO", "deleteMethod", "verbose", "debug", "skipPrompt", "help", ], ) except getopt.GetoptError as err: print("ERROR:", err) print("Try `taosdemo.py --help` for more options.") sys.exit(1) if bool(opts) is False: print("Try `taosdemo.py --help` for more options.") sys.exit(1) for key, value in opts: if key in ["-H", "--help"]: print("") print("taosdemo.py for TDengine") print("") print("Author: Shuduo Sang ") print("") print("\t-H, --help Show usage.") print("") print( "\t-N, --native flag, Use native interface if set. Default is using RESTful interface." ) print( "\t-h, --host host, The host to connect to TDengine. Default is localhost." ) print( "\t-p, --port port, The TCP/IP port number to use for the connection. Default is 0." ) print( "\t-u, --user user, The user name to use when connecting to the server. Default is 'root'." ) print( "\t-P, --password password, The password to use when connecting to the server. Default is 'taosdata'." ) print( "\t-l, --colsPerRec num_of_columns_per_record, The number of columns per record. Default is 3." ) print( "\t-d, --dbname database, Destination database. Default is 'test'." ) print( "\t-a, --replica replica, Set the replica parameters of the database, Default 1, min: 1, max: 5." ) print( "\t-m, --tbname table_prefix, Table prefix name. Default is 't'." ) print( "\t-M, --stable flag, Use super table. Default is no" ) print( "\t-s, --stbname stable_prefix, STable prefix name. Default is 'st'" ) print( "\t-Q, --query [NO|EACHTB|command] query, Execute query command. set 'EACHTB' means select * from each table" ) print( "\t-T, --threads num_of_threads, The number of threads. Default is 1." ) print( "\t-C, --processes num_of_processes, The number of threads. Default is 1." ) print( "\t-r, --batch num_of_records_per_req, The number of records per request. Default is 1000." ) print( "\t-t, --numOfTb num_of_tables, The number of tables. Default is 1." ) print( "\t-n, --numOfRec num_of_records_per_table, The number of records per table. Default is 1." ) print( "\t-c, --config config_directory, Configuration directory. Default is '/etc/taos/'." ) print("\t-x, --inserOnly flag, Insert only flag.") print( "\t-O, --outOfOrder out of order data insert, 0: In order, 1: Out of order. Default is in order." ) print( "\t-R, --rateOOOO rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50." ) print( "\t-D, --deleteMethod Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database." ) print("\t-v, --verbose Print verbose output") print("\t-g, --debug Print debug output") print( "\t-y, --skipPrompt Skip read key for continous test, default is not skip" ) print("") sys.exit(0) if key in ["-N", "--native"]: try: import taos except Exception as e: print("Error: %s" % e.args[0]) sys.exit(1) native = True if key in ["-h", "--host"]: host = value if key in ["-p", "--port"]: port = int(value) if key in ["-u", "--user"]: user = value if key in ["-P", "--password"]: password = value else: password = defaultPass if key in ["-d", "--dbname"]: dbName = value if key in ["-a", "--replica"]: replica = int(value) if replica < 1: print("FATAL: number of replica need > 0") sys.exit(1) if key in ["-m", "--tbname"]: tbName = value if key in ["-M", "--stable"]: useStable = True numOfStb = 1 if key in ["-s", "--stbname"]: stbName = value if key in ["-Q", "--query"]: queryCmd = str(value) if key in ["-T", "--threads"]: threads = int(value) if threads < 1: print("FATAL: number of threads must be larger than 0") sys.exit(1) if key in ["-C", "--processes"]: processes = int(value) if processes < 1: print("FATAL: number of processes must be larger than 0") sys.exit(1) if key in ["-r", "--batch"]: batch = int(value) if key in ["-l", "--colsPerRec"]: colsPerRec = int(value) if key in ["-t", "--numOfTb"]: numOfTb = int(value) v_print("numOfTb is %d", numOfTb) if key in ["-n", "--numOfRec"]: numOfRec = int(value) v_print("numOfRec is %d", numOfRec) if numOfRec < 1: print("FATAL: number of records must be larger than 0") sys.exit(1) if key in ["-c", "--config"]: configDir = value v_print("config dir: %s", configDir) if key in ["-x", "--insertOnly"]: insertOnly = True v_print("insert only: %d", insertOnly) if key in ["-O", "--outOfOrder"]: outOfOrder = int(value) v_print("out of order is %d", outOfOrder) if key in ["-R", "--rateOOOO"]: rateOOOO = int(value) v_print("the rate of out of order is %d", rateOOOO) if key in ["-D", "--deleteMethod"]: deleteMethod = int(value) if (deleteMethod < 0) or (deleteMethod > 3): print( "inputed delete method is %d, valid value is 0~3, set to default 0" % deleteMethod ) deleteMethod = 0 v_print("the delete method is %d", deleteMethod) if key in ["-v", "--verbose"]: verbose = True if key in ["-g", "--debug"]: debug = True if key in ["-y", "--skipPrompt"]: skipPrompt = True if verbose: printConfig() if not skipPrompt: input("Press any key to continue..") # establish connection first if native if native: v_print("host:%s, user:%s passwd:xxxxxx configDir:%s ", host, user, configDir) try: conn = taos.connect( host=host, user=user, password=password, config=configDir ) v_print("conn: %s", str(conn.__class__)) except Exception as e: print("Error: %s" % e.args[0]) sys.exit(1) try: cursor = conn.cursor() v_print("cursor:%d %s", id(cursor), str(cursor.__class__)) except Exception as e: print("Error: %s" % e.args[0]) conn.close() sys.exit(1) # drop data only if delete method be set if deleteMethod > 0: if deleteMethod == 1: drop_tables() print("Drop tables done.") elif deleteMethod == 2: drop_stable() print("Drop super tables done.") elif deleteMethod == 3: drop_databases() print("Drop Database done.") sys.exit(0) # create databases drop_databases() create_databases() # use last database current_db = "%s%d" % (dbName, (numOfDb - 1)) use_database() if measure: start_time_begin = time.time() if numOfStb > 0: create_stb() if autosubtable is False: create_tb_using_stb() else: create_tb() if measure: end_time = time.time() print( "Total time consumed {} seconds for create table.".format( (end_time - start_time_begin) ) ) if native: cursor.close() conn.close() # start insert data if measure: start_time = time.time() manager = Manager() lock = manager.Lock() pool = Pool(processes) begin = 0 end = 0 quotient = numOfTb // processes if quotient < 1: processes = numOfTb quotient = 1 remainder = numOfTb % processes v_print( "num of tables: %d, quotient: %d, remainder: %d", numOfTb, quotient, remainder ) for i in range(processes): begin = end if i < remainder: end = begin + quotient + 1 else: end = begin + quotient pool.apply_async( insert_data_process, args=( lock, i, begin, end, ), ) pool.close() pool.join() time.sleep(1) if measure: end_time = time.time() print( "Total time consumed {} seconds for insert data.".format( (end_time - start_time) ) ) # query data if queryCmd != "NO": print("queryCmd: %s" % queryCmd) query_data_process(queryCmd) if measure: end_time = time.time() print("Total time consumed {} seconds.".format((end_time - start_time_begin))) print("done")