cassandra 并发技术介绍

Posted 方丈的寺院

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了cassandra 并发技术介绍相关的知识,希望对你有一定的参考价值。

摘要

本文主要介绍cassandra线程技术,cassandra的实现是基于java的,所以线程技术使用的也是jdk包提供的线程类。cassandra是分布式数据库,整个并发架构是基于阶段事件驱动架构(staged envent-driven architecture)它能够利用queue将复杂的事件驱动分解为各个阶段。

一.java并发

Executor 框架
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
我们可以通过集成Thread类,实现Runnable接口等方法创建多线程。java concurrent包提供了一种更加灵活的实现方式。

public interface Executor{
    void execute(Runnable command);
}

Executor将任务的提交过程与执行过程分离开来,直接使用Runnable表示任务。基于生产者-消费者模式,提交任务的操作就是生产者,执行任务的线程相当于消费者。

ExecutorService 提供生命周期管理

ExecutorService 继承了Executor 接口,提供了生命周期的管理,ExecutorService 有三种状态
运行,关闭,终止。

public interface ExecutorService extends Executor {

void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
    throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
}

Executor是以异步方式来执行task,所以在某个时刻,有任务还没有完成,有些任务还在队列中等待。shutdown方法提供的是平缓关闭,将已经启动的任务完成,不接受新的任务。shutdownNow 则是强制关闭所有任务。

Future提供带返回结果的任务
Executor 不提供返回结果,所以需要有带返回结果的Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
}

在上面的ExecutorService 的submit方法中我们可以看到需要Future类,这样Executor就可以根据返回的Future 来获得任务的执行结果或者取消任务。Future 的get方法就是用来获取任务执行情况,如果任务已经完成,就会立即返回或者抛出异常,如果没有完成就会block住,等待任务完成。

http://www.javaworld.com/article/2078809/java-concurrency/java-concurrency-java-101-the-next-generation-java-concurrency-without-the-pain-part-1.html?page=2
https://www.javacodegeeks.com/2013/01/java-thread-pool-example-using-executors-and-threadpoolexecutor.html

ExecutorService executorService = Executors.newFixedThreadPool(5);  

ThreadPoolExecutor 在Executors 基础上又提供了 rejectedExecutionHandler 的方法,在线程忙时,如何处理事件,如放到队列中


private final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
            public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) {
                taskQueue.offer(task);
            }
        };

private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,
5,TIME_KEEP_ALIVE, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(600),handler);

二.SEDA架构

SEDA 架构是有加州大学伯克利分校的Matt Welsh, David Culler, and Eric Brewer首先提出来的。
(原论文请访问):http://www.eecs.harvard.edu/~mdw/papers/seda-sosp01.pdf
SEDA架构将应用分为不同的阶段。每个阶段都是独立构建,独自负责资源管理,并有一个与之关联的消息队列和线程池。各个阶段通过队列联系。SEDA使用动态资源阈值来控制资源管理使得系统能够适应过载的情况。

三.Cassandra线程技术

cassandra 阶段有单线程和多线程两种,可以通过 nodetool tpstats 查看各阶段的信息。也可以通过JMX 暴露出来的metrics来监控。
cassandra 各阶段的健康状况,来判断集群的瓶颈及问题所在。属性主要有以下五种

MBeantpstats意义
ActiveCountActive线程正在处理的tasks数目
PendingTasksPendingqueue 里面等待线程的tasks 数目
CompletedTasksCompleted已经完成的tasks 数目
CurrentlyBlockedTasksBlocked当线程池中的thread都分配了,queue中的pending task也到了设置的最大值。再进来的task就会被block住
TotalBlockedTasksAll time blocked已经block的tasks 总和

一般正常的系统不会出现block tasks。出现了block,通常系统就要出问题。运维人员需要采取相关措施了。cassandra的multi-thread pool 线程数默认值是32。

cassandra 多线程阶段

阶段名作用
MutationStage执行本地的 insert/update
Counter Mutationmutation计数值
ReadStage执行本地read
RequestResponseStage请求回复
InternalResponseStage非client端发送的request响应,节点内部消息应答如bootstrap

其余的为单线程
可以在org.apache.cassandra.concurrent.StageManager类中看到这些Stage的定义。
各个阶段的线程数及线程状态可以在org.apache.cassandra.metrics.ThreadPools中查看

2017-08-27
从目前的理解,SEDA的架构好处在于提供了一个中心管理处,来管理线程池的创建。
提高线程池的处理效率最好的方式就是线程处理一类问题,不要让CPU密集型,和IO密集型,数据库连接等task混合在一起。
另外如果没有统一管理,在各处随意创建线程池,也会导致系统出问题。

四.从阶段的角度拆分写操作

在这篇文章有简单提到过cassandra的写过程http://blog.csdn.net/fs1360472174/article/details/51174487
写操作主要涉及到的Stage有MutationStage,FlushWriter,MemtablePostFlusher,CounterMutation,
MigrationStage
细节将会在另一篇文章中详述

更多的相关知识可以关注公众号nosql开发
这里写图片描述
对cassandra感兴趣的童鞋可以加入群(104822562)一起学习探讨
这里写图片描述

五.参考

http://blog.csdn.net/lxlzhn/article/details/8163380
https://wiki.apache.org/cassandra/ArchitectureInternals

https://www.pythian.com/blog/guide-to-cassandra-thread-pools/

以上是关于cassandra 并发技术介绍的主要内容,如果未能解决你的问题,请参考以下文章

全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段

Cassandra 如何处理并发更新?

Cassandra并发读写

如何使用Apache Flink阅读Cassandra?

golang goroutine例子[golang并发代码片段]

cassandra在pom.xml里面怎么设置library