技术分享 MySQL 分组需求探秘

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了技术分享 MySQL 分组需求探秘相关的知识,希望对你有一定的参考价值。

参考技术A 前两天同事有个 mysql 数据分组的需求,如下测试数据,需要找出每个 name 分组中 create_date 最近的记录:

需要注意的是,此处用的 MySQL 是5.6,最初是使用这条语句:

<pre class="custom" data-tool="mdnice编辑器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px;"> select name, value, create_date, update_date from t1 group by name order by create_date desc; </pre>

用这条 SQL 得到的其实只是每个 name 分组中最先插入的记录,然后按照 create_date 进行了降序排列,和原始需求,完全不同。

此时可采用分而治之的策略,先做排序,再做分组:

<pre class="custom" data-tool="mdnice编辑器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px;"> select * from (select name, value, create_date, update_date from t1 order by create_date desc) t group by t.name; </pre>

当然,针对此需求,可能有其他方法,有兴趣的朋友,可以尝试写写,共享一下。

可能有细心的朋友会发现个问题,就是上述 SQL 中的 group by ,好像有些奇怪,如果按照常规,select 中的字段需要出现在 group by 中,上述语句竟然没报错?

如果我们在 MySQL 5.7 执行相同的语句:

<pre class="custom" data-tool="mdnice编辑器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px;"> select name, value, create_date, update_date from t1 group by name order by create_date desc; </pre>

因此从5.6升级到5.7,很可能出现这种相同的 SQL 执行结果不同的现象,这对兼容性测试的要求就会很高,究其原因,一方面是特性决定的,另一方面就是各种配置参数不同导致的。

可以在5.7的 sql_mode 中删除这个 ONLY_FULL_GROUP_BY ,即可达到5.6相同效果了,或者改写 SQL ,例如:

<pre class="custom" data-tool="mdnice编辑器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px;"> select * from t1 a where create_date = (select max(create_date) from t1 b where a.name = b.name); </pre>

或者,

<pre class="custom" data-tool="mdnice编辑器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px;"> select * from t1 a where not exists (select * from t1 b where a.name = b.name and b.create_date > a.create_date); </pre>

MySQL 8.0支持 row_number()函数,操作应该和如下 Oracle 相近的。

Oracle 中可以使用 row_number()实现此需求:

<pre class="custom" data-tool="mdnice编辑器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px;"> select * from (select name, create_date, row_number() over (partition by name order by create_date desc) as r from t1) where r=1; </pre>

深度探秘 Java 8 函数式编程(上)

技术文章第一时间送达!

源码精品专栏

 
  •  69 篇

  •  61 篇



来源:http://t.cn/ELmra8O

  • 引子

    • 将行为作为数据传递

  • Java8函数框架解读

    • 函数接口

    • 聚合器


引子

将行为作为数据传递

怎样在一行代码里同时计算一个列表的和、最大值、最小值、平均值、元素个数、奇偶分组、指数、排序呢?

答案是思维反转!将行为作为数据传递。 文艺青年的代码如下所示:

public class FunctionUtil {

   public static <T,R> List<R> multiGetResult(List<Function<List<T>, R>> functions, List<T> list) {
     return functions.stream().map(f -> f.apply(list)).collect(Collectors.toList());
   }

   public static void main(String[] args) {
     System.out.println(multiGetResult(
         Arrays.asList(
             list -> list.stream().collect(Collectors.summarizingInt(x->x)),
             list -> list.stream().filter(x -> x < 50).sorted().collect(Collectors.toList()),
             list -> list.stream().collect(Collectors.groupingBy(x->(x%2==0"even""odd"))),
             list -> list.stream().sorted().collect(Collectors.toList()),
             list -> list.stream().sorted().map(Math::sqrt).collect(Collectors.toMap(x->x, y->Math.pow(2,y)))),
         Arrays.asList(64,49,25,16,9,4,1,81,36)));
   }
}

呃,有点卖弄小聪明。 不过要是能将行为作为数据自由传递和施加于数据集产生结果,那么其代码表达能力将如庄子之言,恣意潇洒而无所极限。

行为就是数据。

Java8函数框架解读

函数编程的最直接的表现,莫过于将函数作为数据自由传递,结合泛型推导能力,使代码表达能力获得飞一般的提升。那么,Java8是怎么支持函数编程的呢?主要有三个核心概念:

  • 函数接口(Function)

  • 流(Stream)

  • 聚合器(Collector)

函数接口

关于函数接口,需要记住的就是两件事:

  • 函数接口是行为的抽象;

  • 函数接口是数据转换器。

最直接的支持就是 java.util.Function 包。定义了四个最基础的函数接口:

  • Supplier

    : 数据提供器,可以提供 T 类型对象;无参的构造器,提供了 get 方法;
  • Function

    : 数据转换器,接收一个 T 类型的对象,返回一个 R类型的对象; 单参数单返回值的行为接口;提供了 apply, compose, andThen, identity 方法;
  • Consumer

    : 数据消费器, 接收一个 T类型的对象,无返回值,通常用于根据T对象做些处理; 单参数无返回值的行为接口;提供了 accept, andThen 方法;
  • Predicate

    : 条件测试器,接收一个 T 类型的对象,返回布尔值,通常用于传递条件函数; 单参数布尔值的条件性接口。提供了 test (条件测试) , and-or- negate(与或非) 方法。

其中, compose, andThen, and, or, negate 用来组合函数接口而得到更强大的函数接口。

其它的函数接口都是通过这四个扩展而来。

  • 在参数个数上扩展: 比如接收双参数的,有 Bi 前缀, 比如 BiConsumer

    , BiFunction  ;
  • 在类型上扩展: 比如接收原子类型参数的,有 [Int|Double|Long][Function|Consumer|Supplier|Predicate]

  • 特殊常用的变形: 比如 BinaryOperator , 是同类型的双参数 BiFunction

     ,二元操作符 ; UnaryOperator 是 Function  一元操作符。

那么,这些函数接口可以接收哪些值呢?

  • 类/对象的静态方法引用、实例方法引用。引用符号为双冒号 ::

  • 类的构造器引用,比如 Class::new

  • lambda表达式

在博文“使用函数接口和枚举实现配置式编程(Java与Scala实现)”, “精练代码:一次Java函数式编程的重构之旅” 给出了基本的例子。后面还有更多例子。重在练习和尝试。

聚合器

先说聚合器。每一个流式计算的末尾总有一个类似 collect(Collectors.toList()) 的方法调用。collect 是 Stream 的方法,而参数则是聚合器Collector。已有的聚合器定义在Collectors 的静态方法里。 那么这个聚合器是怎么实现的呢?

Reduce

大部分聚合器都是基于 Reduce 操作实现的。 Reduce ,名曰推导,含有三个要素: 初始值 init, 二元操作符 BinaryOperator, 以及一个用于聚合结果的数据源S。

Reduce 的算法如下:

STEP1: 初始化结果 R = init ;

STEP2: 每次从 S 中取出一个值 v,通过二元操作符施加到 R 和 v ,产生一个新值赋给 R = BinaryOperator(R, v);重复 STEP2, 直到 S 中没有值可取为止。

比如一个列表求和,Sum([1,2,3]) , 那么定义一个初始值 0 以及一个二元加法操作 BO = a + b ,通过三步完成 Reduce 操作:step1: R = 0; step2: v=1, R = 0+v = 1; step2: v=2, R = 1 + v = 3 ; step3: v = 3, R = 3 + v = 6。

四要素

一个聚合器的实现,通常需要提供四要素:

  • 一个结果容器的初始值提供器 supplier ;

  • 一个用于将每次二元操作的中间结果与结果容器的值进行操作并重新设置结果容器的累积器 accumulator ;

  • 一个用于对Stream元素和中间结果进行操作的二元操作符 combiner ;

  • 一个用于对结果容器进行最终聚合的转换器 finisher(可选) 。

Collectors.CollectorImpl 的实现展示了这一点:

static class CollectorImpl<TARimplements Collector<TAR{
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;
        private final BinaryOperator<A> combiner;
        private final Function<A, R> finisher;
        private final Set<Characteristics> characteristics;

        CollectorImpl(Supplier<A> supplier,
                      BiConsumer<A, T> accumulator,
                      BinaryOperator<A> combiner,
                      Function<A,R> finisher,
                      Set<Characteristics> characteristics) {
            this.supplier = supplier;
            this.accumulator = accumulator;
            this.combiner = combiner;
            this.finisher = finisher;
            this.characteristics = characteristics;
        }
}

列表类聚合器

列表类聚合器实现,基本是基于Reduce 操作完成的。 看如下代码:

public static <T>
    Collector<T, ?, List<T>> toList() {
        return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                                   (left, right) -> { left.addAll(right); return left; },
                                   CH_ID);

首先使用 ArrayList::new 创造一个空列表; 然后 List:add 将Stream累积操作的中间结果加入到这个列表;第三个函数则将两个列表元素进行合并成一个结果列表中。 就是这么简单。集合聚合器 toSet(), 字符串连接器 joining(),以及列表求和(summingXXX)、最大(maxBy)、最小值(minBy)等都是这个套路。

映射类聚合器

映射类聚合器基于Map合并来完成。看这段代码:

private static <K, V, M extends Map<K,V>>
    BinaryOperator<M> mapMerger(BinaryOperator<V> mergeFunction) {
        return (m1, m2) -> {
            for (Map.Entry<K,V> e : m2.entrySet())
                m1.merge(e.getKey(), e.getValue(), mergeFunction);
            return m1;
        };
    }

根据指定的值合并函数 mergeFunction, 返回一个map合并器,用来合并两个map里相同key的值。mergeFunction用来对两个map中相同key的值进行运算得到新的value值,如果value值为null,会移除相应的key,否则使用value值作为对应key的值。这个方法是私有的,主要为支撑 toMap,groupingBy 而生。

toMap的实现很简短,实际上就是将指定stream的每个元素分别使用给定函数keyMapper, valueMapper进行映射得到 newKey, newValue,从而形成新的MapnewKey,newValue, 再使用mapMerger(mergeFunction) 生成的 map 合并器将其合并到 mapSupplier (2)。如果只传 keyMapper, valueMapper,那么就只得到结果(1)。

public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
                                Function<? super T, ? extends U> valueMapper,
                                BinaryOperator<U> mergeFunction,
                                Supplier<M> mapSupplier) {
        BiConsumer<M, T> accumulator
                = (map, element) -> map.merge(keyMapper.apply(element),
                                              valueMapper.apply(element), mergeFunction);
        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
    }

toMap 的一个示例见如下代码:

     List<Integer> list = Arrays.asList(1,2,3,4,5);
     Supplier<Map<Integer,Integer>> mapSupplier = () -> list.stream().collect(Collectors.toMap(x->x, y-> y * y));

     Map<Integer, Integer> mapValueAdd = list.stream().collect(Collectors.toMap(x->x, y->y, (v1,v2) -> v1+v2, mapSupplier));
     System.out.println(mapValueAdd);

将一个 List 转成 map[1=1,2=2,3=3,4=4,5=5],然后与另一个map[1=1,2=4,3=9,4=16,5=25]的相同key的value进行相加。注意到, toMap 的最后一个参数是 Supplier

自定义聚合器

让我们仿照 Collectors.toList() 做一个自定义的聚合器。实现一个含N个数的斐波那契序列 List 。由于 Reduce 每次都从流中取一个数,因此需要生产一个含N个数的stream;可使用 Arrays.asList(1,2,3,4,5,6,7,8,9,10).stream() , 亦可使用 IntStream.range(1,11) ,不过两者的 collector 方法是不一样的。这里我们取前者。

现在,需要构造四要素:

  • 可变的结果容器提供器 Supplier

    <list

    > = () -> [0, 1] ; 注意这里不能使用 Arrays.asList , 因为该方法生成的列表是不可变的。</list

  • 累积器 BiConsumer

    <list

    , Integer> accumulator(): 这里流的元素未用,仅仅用来使计算进行和终止。新的元素从结果容器中取最后两个相加后产生新的结果放到结果容器中。</list

  • 组合器 BinaryOperator

    <list

    > combiner() : 照葫芦画瓢,目前没看出这步是做什么用;直接 return null; 也是OK的。</list

  • 最终转换器 Function

    <list

    , List

    > finisher() :在最终转换器中,移除初始设置的两个值 0, 1 。

    </list

代码如下:

/**
 * Created by shuqin on 17/12/5.
 */

public class FiboCollector implements Collector<IntegerList<Integer>, List<Integer>> {

  public Supplier<List<Integer>> supplier() {
    return () -> {
      List<Integer> result = new ArrayList<>();
      result.add(0); result.add(1);
      return result;
    };
  }

  @Override
  public BiConsumer<List<Integer>, Integer> accumulator() {
    return (res, num) -> {
      Integer next = res.get(res.size()-1) + res.get(res.size()-2);
      res.add(next);
    };
  }

  @Override
  public BinaryOperator<List<Integer>> combiner() {
    return null;
    //return (left, right) -> { left.addAll(right); return left; };
  }

  @Override
  public Function<List<Integer>, List<Integer>> finisher() {
    return res -> { res.remove(0); res.remove(1); return res; };
  }

  @Override
  public Set<Characteristics> characteristics() {
    return Collections.emptySet();
  }

}

List<Integer> fibo = Arrays.asList(1,2,3,4,5,6,7,8,9,10).stream().collect(new FiboCollector());
System.out.println(fibo);

流(Stream)是Java8对函数式编程的重要支撑。大部分函数式工具都围绕Stream展开。

Stream的接口

Stream 主要有四类接口:

  • 流到流之间的转换:比如 filter(过滤), map(映射转换), mapToInt|Long|Double, flatMap(高维结构平铺),flatMapTo[Int|Long|Double], sorted(排序),distinct(不重复值),peek(执行某种操作,流不变,可用于调试),limit(限制到指定元素数量), skip(跳过若干元素) ;

  • 流到终值的转换: 比如 toArray(转为数组),reduce(推导结果),collect(聚合结果),min(最小值), max(最大值), count (元素个数), anyMatch (任一匹配), allMatch(所有都匹配), noneMatch(一个都不匹配), findFirst(选择首元素),findAny(任选一元素) ;

  • 直接遍历: forEach (不保序遍历,比如并行流), forEachOrdered(保序遍历) ;

  • 构造流: empty (构造空流),of (单个元素的流及多元素顺序流),iterate (无限长度的有序顺序流),generate (将数据提供器转换成无限非有序的顺序流), concat (流的连接), Builder (用于构造流的Builder对象)

除了 Stream 本身自带的生成Stream 的方法,数组和容器及StreamSupport都有转换为流的方法。比如 Arrays.stream , [List|Set|Collection].[stream|parallelStream] , StreamSupport.[int|long|double|]stream;

流的类型主要有:Reference(对象流), IntStream (int元素流), LongStream (long元素流), Double (double元素流) ,定义在类 StreamShape 中,主要将操作适配于类型系统。

flatMap 的一个例子见如下所示,将一个二维数组转换为一维数组:

     List<Integer> nums = Arrays.asList(Arrays.asList(1,2,3), Arrays.asList(1,4,9), Arrays.asList(1,8,27))
                                .stream().flatMap(x -> x.stream()).collect(Collectors.toList());
     System.out.println(nums);

collector实现

这里我们仅分析串行是怎么实现的。入口在类 java.util.stream.ReferencePipeline 的 collect 方法:

container = evaluate(ReduceOps.makeRef(collector));
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
          ? (R) container : collector.finisher().apply(container);

这里的关键是 ReduceOps.makeRef(collector)。 点进去:

public static <T, I> TerminalOp<T, I>
    makeRef(Collector<? super T, I, ?> collector) 
{
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        BiConsumer<I, ? super T> accumulator = collector.accumulator();
        BinaryOperator<I> combiner = collector.combiner();
        class ReducingSink extends Box<I>
                implements AccumulatingSink<TIReducingSink
{
            @Override
            public void begin(long size) {
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                accumulator.accept(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
        return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }

            @Override
            public int getOpFlags() {
                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                       ? StreamOpFlag.NOT_ORDERED
                       : 0;
            }
        };
    }

private static abstract class Box<U{
        U state;

        Box() {} // Avoid creation of special accessor

        public U get() {
            return state;
        }
    }

Box 是一个结果值的持有者; ReducingSink 用begin, accept, combine 三个方法定义了要进行的计算;ReducingSink是有状态的流数据消费的计算抽象,阅读Sink接口文档可知。ReduceOps.makeRef(collector) 返回了一个封装了Reduce操作的ReduceOps对象。注意到,这里都是声明要执行的计算,而不涉及计算的实际过程。展示了表达与执行分离的思想。真正的计算过程启动在 ReferencePipeline.evaluate 方法里:

final <R> evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

使用 IDE 的 go to implementations 功能, 跟进去,可以发现,最终在 AbstractPipeLine 中定义了:

@Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

Spliterator 用来对流中的元素进行分区和遍历以及施加Sink指定操作,可以用于并发计算。Spliterator的具体实现类定义在 Spliterators 的静态类和静态方法中。其中有:

数组Spliterator:
static final class ArraySpliterator<Timplements Spliterator<T>
static final class IntArraySpliterator implements Spliterator.OfInt
static final class LongArraySpliterator implements Spliterator.OfLong
static final class DoubleArraySpliterator implements Spliterator.OfDouble

迭代Spliterator:
static class IteratorSpliterator<Timplements Spliterator<T>
static final class IntIteratorSpliterator implements Spliterator.OfInt
static final class LongIteratorSpliterator implements Spliterator.OfLong
static final class DoubleIteratorSpliterator implements Spliterator.OfDouble

抽象Spliterator:
public static abstract class AbstractSpliterator<Timplements Spliterator<T>
private static abstract class EmptySpliterator<TS extends Spliterator<T>, C>
public static abstract class AbstractIntSpliterator implements Spliterator.OfInt
public static abstract class AbstractLongSpliterator implements Spliterator.OfLong
public static abstract class AbstractDoubleSpliterator implements Spliterator.OfDouble

每个具体类都实现了trySplit,forEachRemaining,tryAdvance,estimateSize,characteristics, getComparator。 trySplit 用于拆分流,提供并发能力;forEachRemaining,tryAdvance 用于遍历和消费流中的数据。下面展示了IteratorSpliterator的forEachRemaining,tryAdvance 两个方法的实现。可以看到,木有特别的地方,就是遍历元素并将指定操作施加于元素。

@Override
        public void forEachRemaining(Consumer<? super T> action) {
            if (action == nullthrow new NullPointerException();
            Iterator<? extends T> i;
            if ((i = it) == null) {
                i = it = collection.iterator();
                est = (long)collection.size();
            }
            i.forEachRemaining(action);
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (action == nullthrow new NullPointerException();
            if (it == null) {
                it = collection.iterator();
                est = (long) collection.size();
            }
            if (it.hasNext()) {
                action.accept(it.next());
                return true;
            }
            return false;
        }

整体流程就是这样。回顾一下:

  • Collector 定义了必要的聚合操作函数;

  • ReduceOps.makeRef 将 Collector 封装成一个计算对象 ReduceOps ,依赖的 ReducingSink 定义了具体的流数据消费过程;

  • Spliterator 用于对流中的元素进行分区和遍历以及施加Sink指定的操作。

Pipeline

那么,Spliterator 又是从哪里来的呢?是通过类 java.util.stream.AbstractPipeline 的方法 sourceSpliterator 拿到的:

private Spliterator<?> sourceSpliterator(int terminalFlags) {
        // Get the source spliterator of the pipeline
        Spliterator<?> spliterator = null;
        if (sourceStage.sourceSpliterator != null) {
            spliterator = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
        }
        else if (sourceStage.sourceSupplier != null) {
            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }
        // code for isParallel
       return spliterator;
}

这里的 sourceStage 是一个 AbstractPipeline。 Pipeline 是实现流式计算的流水线抽象,也是Stream的实现类。可以看到,java.util.stream 定义了四种 pipeline: DoublePipeline, IntPipeline, LongPipeline, ReferencePipeline。可以重点看 ReferencePipeline 的实现。比如 filter, map

abstract class ReferencePipeline<P_INP_OUT>
        extends AbstractPipeline<P_INP_OUTStream<P_OUT>>
        implements Stream<P_OUT>

@Override
    public final Stream<P_OUTfilter(Predicate<? super P_OUTpredicate
{
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

套路基本一样,关键点在于 accept 方法。filter 只在满足条件时将值传给下一个 pipeline, 而 map 将计算的值传给下一个 pipeline. StatelessOp 没有什么逻辑,JDK文档解释是:Base class for a stateless intermediate stage of a Stream。相应还有一个 StatefulOp, Head。 这些都是 ReferencePipeline ,负责将值在 pipeline 之间传递,交给 Sink 去计算。

static class Head<E_INE_OUTextends ReferencePipeline<E_INE_OUT>
abstract static class StatelessOp<E_INE_OUTextends ReferencePipeline<E_INE_OUT>
abstract static class StatefulOp<E_INE_OUTextends ReferencePipeline<E_INE_OUT>

至此,我们对整个流计算过程有了更清晰的认识。 细节可以再逐步推敲。




已在知识星球更新源码解析如下:

  • 《精尽 Dubbo 源码解析系列》69 篇。

  • 《精尽 Netty 源码解析系列》61 篇。

  • 《精尽 Spring 源码解析系列》35 篇。

  • 《精尽 MyBatis 源码解析系列》34 篇。

  • 《数据库实体设计》17 篇。

  • 《精尽 Spring MVC 源码解析系列》15 篇。


目前在知识星球更新了《Dubbo 源码解析》目录如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

... 一共 69+ 篇

目前在知识星球更新了《Netty 源码解析》目录如下:

01. 调试环境搭建
02. NIO 基础
03. Netty 简介
04. 启动 Bootstrap

05. 事件轮询 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 字节缓冲区 ByteBuf

09. 通道处理器 ChannelHandler

10. 编解码 Codec

11. 工具类 Util

... 一共 61+ 篇


目前在知识星球更新了《数据库实体设计》目录如下:


01. 商品模块
02. 交易模块
03. 营销模块
04. 公用模块

... 一共 17+ 篇


目前在知识星球更新了《Spring 源码解析》目录如下:


01. 调试环境搭建
02. IoC Resource 定位
03. IoC BeanDefinition 载入

04. IoC BeanDefinition 注册

05. IoC Bean 获取

06. IoC Bean 生命周期

... 一共 35+ 篇


目前在知识星球更新了《MyBatis 源码解析》目录如下:


01. 调试环境搭建
02. 项目结构一览
03. MyBatis 面试题合集

04. MyBatis 学习资料合集

05. MyBatis 初始化

06. SQL 初始化

07. SQL 执行

08. 插件体系

09. Spring 集成

... 一共 34+ 篇


源码不易↓↓↓↓

点赞支持老艿艿↓↓

以上是关于技术分享 MySQL 分组需求探秘的主要内容,如果未能解决你的问题,请参考以下文章

MySQL分组需求探秘

技术分享基于AngularJS的个推前端云组件探秘

技术分享——Docker 快速搭建 mysql

技术分享 | 我对 MySQL 隔离级别的剖析

技术分享| 音视频多频道使用的正确姿势

Kylin 实时流处理技术探秘.笔记