>

配置及支出,mysqlslap实施规范测试

- 编辑:www.bifa688.com -

配置及支出,mysqlslap实施规范测试

查看mysqlslap所支持的主要参数配置及说明如下

Kafak connect 简介

Kafaka connect 是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。Kafka Connect可以从数据库或应用程序服务器收集数据到Kafka topic,使数据可用于低延迟的流处理。导出作业可以将数据从Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。

Hadoop配置出错信息汇总

-a, --auto-generate-sql 由系统自动生成SQL脚本进行测试
                      Generate SQL where not supplied by file or command line.
  --auto-generate-sql-add-autoincrement 生成的表中增加自增的ID
                      Add an AUTO_INCREMENT column to auto-generated tables.--auto-generate-sql-load-type=name 制定测试过程中使用的查询类型
                      Specify test load type: mixed, update, write, key, or
                      read; default is mixed.--auto-generate-sql-write-number=# 制定所生成的初始化数据的条数
                      Number of row inserts to perform for each thread (default
                      is 100).-c, --concurrency=name 制定并发线程的数量
                      Number of clients to simulate for query to run.
  --create=name       File or string to use create tables.
  --create-schema=name 创建一个测试数据库的schema名称
                      Schema to run tests in.-T, --debug-info    This is a non-debug version. Catch this and exit.指定输出额外的内存及CPU统计信息-e, --engine=name   Storage engine to use for creating the table. 指定所测试的存储引擎,用逗号可以分割以便测试多个引擎
  -h, --host=name     Connect to host. 链接远程主机的IP
  -i, --iterations=#  Number of times to run the tests. 指定本次测试需要运行的次数
  --no-drop           Do not drop the schema after the test. 指定测试完成后,不清理过程数据
  -x, --number-char-cols=name 指定测试表中生成的varchar类型的数据数量
                      Number of VARCHAR columns to create in table if
                      specifying --auto-generate-sql.
  -y, --number-int-cols=name 指定测试表中生成的int类型的数据数量
                      Number of INT columns to create in table if specifying
                      --auto-generate-sql.
  --number-of-queries=# 指定每一个线程所执行的查询数量
                      Limit each client to this number of queries (this is not
                      exact).
  --only-print        Do not connect to the databases, but instead print out 并不运行测试脚本,而是把生成的脚本打印出来
                      what would have been done.
  -p, --password[=name] 指定测试所用的链接数据库的密码
                      Password to use when connecting to server. If password is
                      not given it's asked from the tty.-q, --query=name    Query to run or file containing query to run.自定义测试用的sql
  -u, --user=name     User for login if not current user. 指定测试所用的链接数据库的用户名

Kafaka connect 概念

Kafaka connect 有几个重要的概念:

  • Source:负责导入数据到kafka
  • Sink:负责从kafka导出数据
  • Connectors :通过管理任务来协调数据流的高级抽象
  • Tasks:数据写入kafk和从kafka中读出的具体实现
  • Workers:运行connectors和tasks的进程
  • Converters:kafka connect和其他存储系统直接发送或者接受数据之间转换数据
  • Transforms:一种轻量级数据调整的工具

1:INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 0 time(s).

其中,完整的运行脚本如下

Kafka connect 工作模式

Kafka connect 有两种工作模式

  • standalone:在standalone模式中,所有的worker都在一个独立的进程中完成
  • distributed:distributed模式具有高扩展性,以及提供自动容错机制。你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。
    在分布式模式下通过rest api管理connector
  • GET /connectors – 返回所有正在运行的connector名。
  • POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
  • GET /connectors/{name} – 获取指定connetor的信息。
  • GET /connectors/{name}/config – 获取指定connector的配置信息。
  • PUT /connectors/{name}/config – 更新指定connector的配置信息。
  • GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
  • GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
  • GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
  • PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
  • PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
  • POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
  • POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
  • DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

运行
[email protected]:~$ hadoop fs -ls

mysqlslap -S /tmp/mysql3306.sock --concurrency=1,50,100,200 --iterations=3 --number-int-cols=5 --number-char-cols=5 --auto-generate-sql --auto-generate-sql-add-autoincrement --engine=innodb --number-of-queries=10 --create-schema=test -uroot -p

Kafka connect 快速启动

Kafka connect 只是Apache Kafka提出的一种kafka数据流处理的框架,目前有很多开源的、优秀的实现,比较著名的是Confluent平台,支持很多Kafka connect的实现,例如Elasticsearch(Sink)、HDFS(Sink)、JDBC等等。下面主要介绍Confluent平台中kafka-connect-elasticsearch的使用。首先在Confluent官网上下载confluent-3.3.0。想运行kafka-connect-elasticsearch的前提是提供kafka服务以及ES服务。

14/01/08 22:01:41 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 0 time(s).
14/01/08 22:01:42 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 1 time(s).
14/01/08 22:01:43 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 2 time(s).
14/01/08 22:01:44 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 3 time(s).
14/01/08 22:01:45 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 4 time(s).
14/01/08 22:01:46 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 5 time(s).
14/01/08 22:01:47 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 6 time(s).
14/01/08 22:01:48 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 7 time(s).
14/01/08 22:01:49 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 8 time(s).
14/01/08 22:01:50 INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 9 time(s).
mkdir: Call From Lenovo-G460-LYH/127.0.0.1 to localhost:12200 failed on connection exception: java.net.ConnectException: 拒绝连接; For more details see: 

待输入密码后,测试信息如下

standalone模式

1、首先更改配置connect-standalone.properties

#broker列表
bootstrap.servers=10.120.241.1:9200
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#是否需要schemas进行转码,我们使用的是json数据,所以设置成false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=

2、更改配置quickstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=estest1012
key.ignore=true
schema.ignore=true
connection.url=http://10.120.241.194:9200
type.name=kafka-connect

注:需要配置schema.ignore=true,如果不配置会抛异常(由于使用的json数据)

3、启动kafka-connect-elasticsearch

./bin/connect-standalone ./etc/kafka/connect-standalone.properties  ./etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

4、测试效果
通过kafka-console-producer.sh发一条json格式的消息,然后查询es索引 图片 1

有几种原因:

Enter password: 
Benchmark
        Running for engine innodb
        Average number of seconds to run all queries: 0.097 seconds
        Minimum number of seconds to run all queries: 0.093 seconds
        Maximum number of seconds to run all queries: 0.107 seconds
        Number of clients running queries: 1
        Average number of queries per client: 10

Benchmark
        Running for engine innodb
        Average number of seconds to run all queries: 0.506 seconds
        Minimum number of seconds to run all queries: 0.447 seconds
        Maximum number of seconds to run all queries: 0.570 seconds
        Number of clients running queries: 50
        Average number of queries per client: 0

Benchmark
        Running for engine innodb
        Average number of seconds to run all queries: 2.204 seconds
        Minimum number of seconds to run all queries: 1.595 seconds
        Maximum number of seconds to run all queries: 3.257 seconds
        Number of clients running queries: 100
        Average number of queries per client: 0

mysqlslap: Error when connecting to server: 1040 Too many connections
mysqlslap: Error when connecting to server: 1040 Too many connections
mysqlslap: Error when connecting to server: 1040 Too many connections
mysqlslap: Error when connecting to server: 1040 Too many connections

distribute模式

1、修改配置connect-distributed.properties

# broker列表
bootstrap.servers=10.120.241.1:9200

# 同一集群中group.id需要配置一致,且不能和别的消费者同名
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 使用json数据同样配置成false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
····

2、 手动创建集群模式所必须的kafka的几个topic

# config.storage.topic=connect-configs
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact

# offset.storage.topic=connect-offsets
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact

# status.storage.topic=connect-status
$ $ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
  • config.storage.topic:topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic
  • offset.storage.topic:用于存储offsets;这个topic应该配置多个partition和副本。
  • status.storage.topic:用于存储状态;这个topic 可以有多个partitions和副本

3、 启动worker

./bin/connect-distributed ./etc/kafka/connect-distributed.properties   

4、使用restful启动connect

curl 'http://localhost:8083/connectors' -X POST -i -H "Content-Type:application/json" -d   
    '{ "name":"elasticsearch-sink",  
       "config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",  
                "tasks.max":10,  
                "topics":"estest1012",  
                "key.ignore":true,  
                "schema.ignore":true,  
                "connection.url":"http://10.120.241.194:9200",  
                "type.name":"kafka-connect"}  
    }'

5、 查看所有connnect,以及状态

图片 2 图片 3
6、 配置日志
默认情况下日志只在控制台输出,如需要保存文件需要修改配置connect-log4j.properties,例如下:

log4j.rootLogger=INFO, stdout, stdfile

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.appender.stdfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stdfile.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stdfile.File=${kafka.logs.dir}/stdout.log
log4j.appender.stdfile.layout=org.apache.log4j.PatternLayout
log4j.appender.stdfile.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR

1)hadoop配置
主要是$HADOOP_HOME/conf/hdfs-site.xml、mapred-site.xml、core-site.xml中的配置是否正确,伪分布式模式可以参考前面的blog,或是网上的文章,一大堆一大堆的。
2)机器连不通
如果是分布式的,还要看hadoop客户端机器能不能ping通hdfs机器,注意hdfs的端口号
3)namenode没有启动(本人是这种原因)
$stop-all.sh 如果出现no namenode stop则表示是namenode的问题
$hadoop namenode -format
$start-all.sh
4)其他。

此时可以看出来,当并发达到了200的时候,出现了Too many connections的异常,这是因为mysql默认配置的最大链接数为100,故需要对my.cnf做如下的修改

Kafak connect 开发

本文将开发一个kafka-connect示例,主要的功能是将kafka中的消息持久化到文件中。

  • 新建maven工程,并添加connect-api依赖
<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>${kafka.version}</version>
</dependency>
  • 主要是实现两个类
public class NeteaseFileSinkConnector extends SinkConnector{

    /**
     * 配置项
     */
    public static final String FILE_CONFIG = "file";
    /**
     * 配置校验
     */
    private static final ConfigDef CONFIG_DEF = new ConfigDef()
            .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If not specified, the standard output will be used");

    private String filename;

    @Override
    public ConfigDef config() {
        // TODO Auto-generated method stub
        return CONFIG_DEF;
    }

    /**
     * 从配置文件中初始化配置
     */
    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
        filename = props.get(FILE_CONFIG);
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub

    }

    /**
     * 返回执行持久化的任务类
     */
    @Override
    public Class<? extends Task> taskClass() {
        // TODO Auto-generated method stub
         return FileStreamSinkTask.class;
    }

    /**
     * 返回配置
     */
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // TODO Auto-generated method stub
        ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
        for (int i = 0; i < maxTasks; i  ) {
            Map<String, String> config = new HashMap<String, String>();
            if (filename != null)
                config.put(FILE_CONFIG, filename);
            configs.add(config);
        }
        return configs;
    }

    @Override
    public String version() {
        // TODO Auto-generated method stub
        return AppInfoParser.getVersion();
    }

}


/**
 * 持久化任务类
 * @author weifu
 *
*/
public class FileStreamSinkTask extends SinkTask {

    private String filename;
    private PrintStream outputStream;

    public String version() {
        // TODO Auto-generated method stub
        return new NeteaseFileSinkConnector().version();
    }

    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
        filename = props.get(NeteaseFileSinkConnector.FILE_CONFIG);
        if (filename == null) {
            outputStream = System.out;
        } else {
            try {
                outputStream = new PrintStream(new FileOutputStream(filename, true), false,
                    StandardCharsets.UTF_8.name());
            } catch (FileNotFoundException | UnsupportedEncodingException e) {
                throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
            }
        }
    }

    /**
     * 持久化具体过程
     */
    @Override
    public void put(Collection<SinkRecord> records) {
        // TODO Auto-generated method stub
        for(SinkRecord record : records){
            outputStream.println("netease file connect: ");
            outputStream.println(record.value());
        }
    }


    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
        outputStream.flush();
    }

    /**
     * 关闭文件
     */
    @Override
    public void stop() {
        // TODO Auto-generated method stub
        if(outputStream != null && outputStream != System.out){
            outputStream.close();
        }
    }
}
  • 打包 打包需要注意的是:
    A、 不能把Kafka Connect API打包在内。
    B、 把所依赖的jar包统一放到服务器的某个路径下(需要在配置文件中添加到classpath),推荐在..confluent-3.3.0sharejava目录下建立一个文件夹(以kafka-connect-….开头命名),这样confluent会自动把该路径加载的classpath中。
  • 在分布式模式下添加该connect
  • 如下图消费两条消息后可以看到文件test.sink.txt已经有内容了:
    图片 4

 

添加max_connections=1024,之后运行正常

Ubuntu 13.04上搭建Hadoop环境

Benchmark
        Running for engine innodb
        Average number of seconds to run all queries: 0.093 seconds
        Minimum number of seconds to run all queries: 0.087 seconds
        Maximum number of seconds to run all queries: 0.098 seconds
        Number of clients running queries: 1
        Average number of queries per client: 10

Benchmark
        Running for engine innodb
        Average number of seconds to run all queries: 0.514 seconds
        Minimum number of seconds to run all queries: 0.462 seconds
        Maximum number of seconds to run all queries: 0.545 seconds
        Number of clients running queries: 50
        Average number of queries per client: 0

Benchmark
        Running for engine innodb
        Average number of seconds to run all queries: 1.209 seconds
        Minimum number of seconds to run all queries: 1.173 seconds
        Maximum number of seconds to run all queries: 1.241 seconds
        Number of clients running queries: 100
        Average number of queries per client: 0

Benchmark
        Running for engine innodb
        Average number of seconds to run all queries: 2.174 seconds
        Minimum number of seconds to run all queries: 1.978 seconds
        Maximum number of seconds to run all queries: 2.402 seconds
        Number of clients running queries: 200
        Average number of queries per client: 0

Ubuntu 12.10 Hadoop 1.2.1版本集群配置

 

Ubuntu上搭建Hadoop环境(单机模式 伪分布模式)

Ubuntu下Hadoop环境的配置

单机版搭建Hadoop环境图文教程详解

搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)

1:INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1:12200. Already tried 0 time(s). 运行 [email protected]:~$ hadoop fs -ls 14/01/08 22:...

本文由88bifa必发唯一官网发布,转载请注明来源:配置及支出,mysqlslap实施规范测试