Cassandra性能测试

Posted 波子汽水yeah

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Cassandra性能测试相关的知识,希望对你有一定的参考价值。

驱动下载
https://github.com/datastax/java-driver/

java代码示例
http://blog.csdn.net/guotong1988/article/details/9145697

语法细节
http://zhaoyanblog.com/archives/180.html

运行时报错

Exception in thread "main" java.lang.NoClassDefFoundError: io/netty/util/Timer

报错

Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /xx.93.xx.xx:9042 (com.datastax.driver.core.ConnectionException: [/xx.93.xx.xx:9042] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/xx.93.xx.xx:9042] Connection has been closed)))

一种解释为驱动不兼容但是用的是3.1版本的jar包:
http://stackoverflow.com/questions/34395191/cassandra-java-driver-error-all-hosts-tried-for-query-failed-connection-has
百度上这错误很少,谷歌有一堆。

问题:java jdbc连接Cassandra集群数据库

Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.2:9042 (com.datastax.driver.core.ConnectionException: [/192.168.1.2:9042] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/192.168.1.2:9042] Connection has been closed)))

这个问题困扰了很久!
windows下telnet 到数据库IP的9042端口是通的。
放在linux主机上本地改IP为192.168.1.2运行也是报这个同样的错误!
联通云控制台里面已经放行了7000-9999的端口!
到底是什么问题。Cassandra配置文件有问题吗?
linux主机上直接bin/cqlsh 192.168.1.5 打开的控制台能正常查询

cqlsh> SELECT cluster_name, listen_address FROM system.local;

 cluster_name | listen_address
--------------+----------------
 Test Cluster |    192.168.1.5

想不通是什么问题
代码里用户名和密码也正确

  public static void main(String[] args)
    {
        QueryOptions options = new QueryOptions();
        options.setConsistencyLevel(ConsistencyLevel.QUORUM);
        //Connection has been closed
        Cluster cluster = Cluster.builder()
            .addContactPoint("192.168.1.2") 
            .withCredentials("cassandra", "pwdcassandra123")
            .withQueryOptions(options)
//            .withPort(9042)   // 默认的端口号  [/xx.xx.xx.xx:9042] Connection has been closed)))
//            .withPort(9160) Cannot connect
            .build();

        Session session = cluster.connect();
        session.execute("CREATE  KEYSPACE kp WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");

        // 针对keyspace的session,后面表名前面不用加keyspace
        Session kpSession = cluster.connect("kp");
        kpSession.execute("CREATE TABLE tbl(a INT,  b INT, c INT, PRIMARY KEY(a));");

        RegularStatement insert = QueryBuilder.insertInto("kp", "tbl").values(new String[] {"a", "b", "c"}, new Object[] {1, 2, 3});
        kpSession.execute(insert);

        RegularStatement insert2 = QueryBuilder.insertInto("kp", "tbl").values(new String[] {"a", "b", "c"}, new Object[] {3, 2, 1});
        kpSession.execute(insert2);

        RegularStatement delete = QueryBuilder.delete().from("kp", "tbl").where(QueryBuilder.eq("a", 1));
        kpSession.execute(delete);

        RegularStatement update = QueryBuilder.update("kp", "tbl").with(QueryBuilder.set("b", 6)).where(QueryBuilder.eq("a", 3));
        kpSession.execute(update);

        RegularStatement select = QueryBuilder.select().from("kp", "tbl").where(QueryBuilder.eq("a", 3));
        ResultSet rs = kpSession.execute(select);
        Iterator<Row> iterator = rs.iterator();
        while (iterator.hasNext())
        {
            Row row = iterator.next();
            System.out.println("a=" + row.getInt("a"));
            System.out.println("b=" + row.getInt("b"));
            System.out.println("c=" + row.getInt("c"));
        }
        kpSession.close();
        session.close();
        cluster.close();
    }

真是没辙了啊!
有遇到过类似问题的童鞋么?

2016年9月18日17:45:16
我觉得这是历史性的一刻
刚刚改了程序后报错信息

[root@template tmp]# java -jar 5mavencass.jar 
---打开连接
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
--连接成功----
获取执行方法
Connected to cluster: Test Cluster
Datatacenter: datacenter1; Host: /192.168.1.2; Rack: rack1
Datatacenter: datacenter1; Host: /192.168.1.4; Rack: rack1
Datatacenter: datacenter1; Host: /192.168.1.5; Rack: rack1
Exception in thread "main" com.datastax.driver.core.exceptions.InvalidQueryException: Keyspace pimin_net does not exist
        at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
        at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
        at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:100)
        at net.pimin.cassandra.TestJava.insertData(TestJava.java:77)
        at net.pimin.cassandra.TestJava.main(TestJava.java:194)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Keyspace pimin_net does not exist
        at com.datastax.driver.core.Responses$Error.asException(Responses.java:136)
        at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:224)
        at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:200)
        at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

源码:

package net.pimin.cassandra;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

/**
 * 测试Cassandra 2.0.10
 * 
 * @author pimin.net
 * 
 */
public class TestJava {

    private Cluster cluster;

    private Session session;

    public Cluster getCluster() {
        return cluster;
    }

    public void setCluster(Cluster cluster) {
        this.cluster = cluster;
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    /**
     * 连接节点
     * 
     * @param node
     */
    public void connect(String node) {
        System.out.println("---打开连接");
        try{
        cluster = Cluster.builder().addContactPoint(node).build();
        }catch(Exception e){
            System.out.println(e.getMessage());
        }
        System.out.println("--连接成功----");
        Metadata metadata = cluster.getMetadata();
        System.out.println("获取执行方法");
        System.out.printf("Connected to cluster: %s\\n",
                metadata.getClusterName());
        for (Host host : metadata.getAllHosts()) {
            System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\\n",
                    host.getDatacenter(), host.getAddress(), host.getRack());
        }

        this.session = cluster.connect("mycas");
    }

    public void insertData() {
        PreparedStatement insertStatement = getSession().prepare(
                "INSERT INTO pimin_net.users "
                        + "(id, first_name, last_name, age, emails,avatar) "
                        + "VALUES (?, ?, ?, ?, ?, ?);");

        BoundStatement boundStatement = new BoundStatement(insertStatement);
        Set<String> emails = new HashSet<String>();
        emails.add("xxx@qq.com");
        emails.add("xxx@163.com");

        java.nio.ByteBuffer avatar = null;
        try {
            avatar = toByteBuffer("f:\\\\user.png");
            avatar.flip();
            System.out.println("头像大小:" + avatar.capacity());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        getSession()
                .execute(
                        boundStatement.bind(
                                UUID.fromString("756716f7-2e54-4715-9f00-91dcbea6cf50"),
                                "pi", "min", 10, emails, avatar));

    }

    public void loadData() {
        ResultSet resultSet = getSession().execute(
                "SELECT first_name,last_name,age,avatar FROM pimin_net.users;");
        System.out
                .println(String
                        .format("%-30s\\t%-20s\\t%-20s\\n%s", "first_name",
                                "last_name", "age",
                                "-------------------------------+-----------------------+--------------------"));
        for (Row row : resultSet) {
            System.out.println(String.format("%-30s\\t%-20s\\t%-20s",
                    row.getString("first_name"), row.getString("last_name"),
                    row.getInt("age")));

            ByteBuffer byteBuffer = row.getBytes("avatar");
            System.out.println("头像大小:"
                    + (byteBuffer.limit() - byteBuffer.position()));

            FileOutputStream fileOutputStream = null;
            try {
                fileOutputStream = new FileOutputStream("f:\\\\2.png");
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                fileOutputStream.write(byteBuffer.array(),
                        byteBuffer.position(),
                        byteBuffer.limit() - byteBuffer.position());
                fileOutputStream.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
        System.out.println();

    }

    public void close() {
        cluster.close();
    }

    /**
     * 读取文件
     * 
     * @param filename
     * @return
     * @throws IOException
     */
    public static ByteBuffer toByteBuffer(String filename) throws IOException {

        File f = new File(filename);
        if (!f.exists()) {
            throw new FileNotFoundException(filename);
        }

        FileChannel channel = null;
        FileInputStream fs = null;
        try {
            fs = new FileInputStream(f);
            channel = fs.getChannel();
            ByteBuffer byteBuffer = ByteBuffer.allocate((int) channel.size());
            while ((channel.read(byteBuffer)) > 0) {
                // do nothing
                // System.out.println("reading");
            }
            return byteBuffer;
        } catch (IOException e) {
            e.printStackTrace();
            throw e;
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        TestJava client = new TestJava();
        client.connect("192.168.1.5");
        client.insertData();
        client.loadData();
        client.session.close();
        client.close();
    }
}

改动的地方:
pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>net.pimin</groupId>
    <artifactId>cassandra.test</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <!-- <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>2.1.0</version>
        </dependency> -->

        <!-- https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core -->
<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>3.1.0</version>
</dependency>
    </dependencies>
</project>

驱动改为3.1.0了
翻墙谷歌查询到这篇
http://stackoverflow.com/questions/34395191/cassandra-java-driver-error-all-hosts-tried-for-query-failed-connection-has

最后一句:


Fixed it by going to driver 3.0.0.

这里写图片描述

折腾了不止一天终于有点眉目了!不容易!贵在坚持!

改进了程序

/*
 *      Copyright (C) 2012-2015 DataStax Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */
package net.pimin.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;


public class CreateAndPopulateKeyspace {

    static String[] CONTACT_POINTS = {"192.168.1.5"};
    static int PORT = 9042;

    public static void main(String[] args) {

        CreateAndPopulateKeyspace client = new CreateAndPopulateKeyspace();

        try {

            client.connect(CONTACT_POINTS, PORT);
            client.createSchema();
            client.loadData();
            client.querySchema();

        } finally {
            client.close();
        }
    }

    private Cluster cluster;

    private Session session;

    /**
     * Initiates a connection to the cluster
     * specified by the given contact point.
     *
     * @param contactPoints the contact points to use.
     * @param port          the port to use.
     */
    public void connect(String[] contactPoints, int port) {

        cluster = Cluster.builder()
                .addContactPoints(contactPoints).withPort(port)
                .build();

        System.out.printf("Connected to cluster: %s%n", cluster.getMetadata().getClusterName());

        session = cluster.connect();
    }

    /**
     * Creates the schema (keyspace) and tables
     * for this example.
     */
    public void createSchema() {

        session.execute("CREATE KEYSPACE IF NOT EXISTS simplex WITH replication " +
                "= {'class':'SimpleStrategy', 'replication_factor':1};");
        System.out.println("创建了命名空间   simplex");
        session.execute(
                "CREATE TABLE IF NOT EXISTS simplex.songs (" +
                        "id uuid PRIMARY KEY," +
                        "title text," +
                        "album text," +
                        "artist text," +
                        "tags set<text>," +
                        "data blob" +
                        ");");

        session.execute(
                "CREATE TABLE IF NOT EXISTS simplex.playlists (" +
                        "id uuid," +
                        "title text," +
                        "album text, " +
                        "artist text," +
                        "song_id uuid," +
                        "PRIMARY KEY (id, title, album, artist)" +
                        ");");
        System.out.println("---创建了3个---表");
    }

    /**
     * Inserts data into the tables.
     */
    public void loadData() {

        session.execute(
                "INSERT INTO simplex.songs (id, title, album, artist, tags) " +
                        "VALUES (" +
                        "756716f7-2e54-4715-9f00-91dcbea6cf50," +
                        "'La Petite Tonkinoise'," +
                        "'Bye Bye Blackbird'," +
                        "'Joséphine Baker'," +
                        "{'jazz', '2013'})" +
                        ";");

        session.execute(
                "INSERT INTO simplex.playlists (id, song_id, title, album, artist) " +
                        "VALUES (" +
                        "2cc9ccb7-6221-4ccb-8387-f22b6a1b354d," +
                        "756716f7-2e54-4715-9f00-91dcbea6cf50," +
                        "'La Petite Tonkinoise'," +
                        "'Bye Bye Blackbird'," +
                        "'Joséphine Baker'" +
                        ");");
        System.out.println("插入了一顿数据---");
    }

    /**
     * Queries and displays data.
     */
    public void querySchema() {
        System.out.println("查询一下");
        ResultSet results = session.execute(
                "SELECT * FROM simplex.playlists " +
                        "WHERE id = 2cc9ccb7-6221-4ccb-8387-f22b6a1b354d;");

        System.out.printf("%-30s\\t%-20s\\t%-20s%n", "title", "album", "artist");
        System.out.println("-------------------------------+-----------------------+--------------------");

        for (Row row : results) {

            System.out.printf("%-30s\\t%-20s\\t%-20s%n",
                    row.getString("title"),
                    row.getString("album"),
                    row.getString("artist"));

        }

    }

    /**
     * Closes the session and the cluster.
     */
    public void close() {
        session.close();
        cluster.close();
    }

}

创建了表再查询
报错

[root@template tmp]# java -jar beautiful.jar 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Connected to cluster: Test Cluster
创建了命名空间   simplex
---创建了3个---表
Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.5:9042 (com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)))
        at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
        at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
        at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
        at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
        at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:64)
        at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
        at net.pimin.cassandra.CreateAndPopulateKeyspace.loadData(CreateAndPopulateKeyspace.java:103)
        at net.pimin.cassandra.CreateAndPopulateKeyspace.main(CreateAndPopulateKeyspace.java:37)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.5:9042 (com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)))
        at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:208)
        at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:43)
        at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:274)
        at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.run(RequestHandler.java:429)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745

没有足够的副本可以在一致性local_one查询
把其他几个节点都启动
OK了
这里写图片描述
心情是激动的!

附录:
参考文章1

以上是关于Cassandra性能测试的主要内容,如果未能解决你的问题,请参考以下文章

利用雅虎ycsb对cassandra做性能测试

Cassandra 原生二进制协议性能

3大主流NoSQL数据库性能对比测试报告

在mysql vs cassandra中插入速度

如何使用Apache Flink阅读Cassandra?

用InfluxDB开源的性能测试工具对比InfluxDB和TDengine