从mapMulti到Stream的底层逻辑

首先,mapMulti或许用的不是特别多——和flatmap相比就只是多了一个优点:不会产生对于每个元素的中间stream对象,减少了开销。但是因为对它写法的一系列疑惑,结果促使探究到了stream的链路逻辑。 场景与mapMulti&faltMap //场景:将一个数字字符串列表转为Integer类型的列表,同时去除不合法字符 //实现:使用更省开销的mapMulti方法 List<String> strings = List.of("1", " ", "2", "3 ", "", "3"); List<Integer> ints = strings.stream() .<Integer>mapMulti( (string, consumer) -> { try { consumer.accept(Integer.parseInt(string)); } catch (NumberFormatException ignored) { } }) .toList(); IO.println("ints = " + ints); 一些基础 java16后引入 该方法传入一个BiConsumer 需要被映射的元素 调用一个Consumer,用来存放最终结果流,达到不产生中间过程流的目的(相比于flatMap) 因为这里逻辑稍复杂,防止编译器混乱,泛型参数类型推断出错,手动进行了参数类型指定。 对于方法:泛型写在方法名前「更准确是写在返回类型前」 对于类:泛型写在类名后 代码块逻辑: 若元素能被Integer.parseint准确转化,就将此结果传入一个结果流中,而不是形成一个个小的stream对象再进行展平(flatMap) 若失败,报出受检异常,并被catch捕获处理 传的consumer是什么东西? 怎么会蹦一个consumer出来?内部定义的?定义这个干嘛? 要弄明白这个问题就需要深入stream的底层了 直觉式解释: stream以“pipeline”式处理流式数据闻名。源数据经过一系列中间操作累积处理逻辑,最后在最终操作那一口气处理。那么是怎么将这些操作逻辑累积下去的呢? 答:consumer。每次中间操作都会进行这样的逻辑:承载接收上流操作、将本次操作加入——“consumer.accept()”,传递到终端操作处,统一处理 mapMulti这里是显式地将内部进行的consumer操作作为参数传递,一旦有合法字符转化了,就将它传递。从而达成不产生中间流的作用 实际:AbstractPipeline + Sink。不过还是拿consumer来搭建心智模型: [源数据] → [map1] → [filter] → [map2] → [终止操作 toList] headSink → map1Sink → filterSink → map2Sink → terminalSink Stream 是惰性的:只有调用终止操作(如 toList())时,才真正开始把元素从源头流过整个管道。 终止操作首先创建了最底层的“真实 consumer”:比如 x -> result.add(x)。 然后,从最后一个中间操作开始,每一层都拿到“下游 consumer”并返回一个“包了一层逻辑的上游 consumer”。 这一层层 wrap 下来,最外层的那个 consumer,就对应管道最前面的操作(第一个 map/filter)。 当源数据被遍历时,只调用这个最外层的 head.accept(x),它内部会按顺序调用各个中间操作逻辑,最后传到终止操作的 consumer 上。 //伪实现 //终端操作: List<T> toList() { List<T> result = new ArrayList<>(); // 1. 在终止操作里,先创建最底层的 consumer Consumer<T> terminal = x -> result.add(x); // 2. 从“最后一个中间操作”开始,往前一层层 wrap Consumer<?> head = terminal; Stage<?> s = this; // this = 最后一个 stage(最近的 map/filter) while (s != null) { head = s.wrap(head); // 每一层都“包住”下游 consumer s = s.upstream; // 然后跳到上游那层 } // 3. 最终得到 head,是最前面的那个 map/filter 封装出来的“总入口” for (Object element : getSource()) { ((Consumer<Object>) head).accept(element); // 启动整条链 } return result; } //中间操作: class FilterStage<T> extends Stage<T> { private final Predicate<T> pred; FilterStage(Stage<T> upstream, Predicate<T> pred) { super(upstream); this.pred = pred; } @Override <X> MyConsumer<T> wrap(MyConsumer<X> downstream) { return (T value) -> { if (pred.test(value)) { ((MyConsumer<T>) downstream).accept(value); } // 否则就“拦截”掉,不往下传 }; } } 传的consumer就是该步操作产生的新consumer(将来会作为upStream向上传递,等待被包装)。 ...

2025-11-29 · (updated 2025-11-30) · 2 min · 337 words · Lou Feiyu