一、Stream简介
Stream是Java 8中引入的一种全新的函数式编程方式,可以看做对集合和数组操作的函数化封装。
Stream可以进行过滤、映射、统计、归约等多种操作,可以大大简化代码复杂度,提高编程效率。
// Stream是java.util.stream包下的类,需要进行导包
import java.util.stream.Stream;
二、Stream过滤方法filter()
Stream中的filter方法用于过滤数据,筛选出符合条件的元素。
filter方法接受一个Predicate接口类型的lambda表达式,该表达式用于确定元素是否符合过滤条件。
// 过滤出大于等于10的数字
Stream<Integer> stream1 = Stream.of(5, 10, 15, 20);
Stream<Integer> stream2 = stream1.filter(i -> i >= 10);
stream2.forEach(System.out::println);
// 输出:
// 10
// 15
// 20
三、Stream过滤实现原理
filter方法实现过滤的关键在于,判断每个元素是否符合条件,只输出符合条件的元素。
实现这个过程需要满足两个条件:
- 每个元素都要被判断,即需要用到遍历方法。
- 满足筛选条件的元素需要输出,即需要用到输出方法。
Stream本质上就是对这两个条件进行封装,核心就是使用map、reduce等方法实现中间操作,最终使用forEach等方法实现终端操作。
具体流程如下:
- 获取数据源——Stream.of或集合的stream方法等。
- 进行中间操作——如filter方法,进行过滤。
- 进行终端操作——如forEach方法,输出符合条件的元素。
四、Stream过滤实现原理示例
通过对Stream源码的分析,可以看出Stream的具体实现原理。
1. Stream的准备
Stream的创建方式有多种,示例中使用of方法创建一个int类型的Stream:
Stream<Integer> stream = Stream.of(2, 3, 4, 5, 6);
2. filter方法实现
filter方法需要传入一个Predicate类型的lambda表达式,用于判断元素是否符合过滤条件。
具体实现过程如下:
public interface Predicate<T> {
//进行元素判断,判断是否符合过滤条件
boolean test(T t);
}
public interface Stream<T> extends BaseStream<T, Stream<T>> {
// 进行中间操作,如filter方法
Stream<T> filter(Predicate<? super T> predicate);
}
// 中间操作的具体实现
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
@Override
public Sink<P_OUT> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(T t) {
if (predicate.test(t)) {
downstream.accept(t);
}
}
};
}
};
}
可以看出filter方法返回了一个新的Stream对象,其中StatelessOp实现了对原Stream的封装(对外提供的接口还是Stream)。
具体实现了两个方法:
- opIsStateful:用于判断Stream是否能重复使用,filter方法中实现为return false。
- opWrapSink:用于返回一个Sink对象,Sink是对元素遍历输出的封装。
opWrapSink返回的是一个匿名类,用于判断元素是否符合过滤条件:
if (predicate.test(t)) {
downstream.accept(t);
}
3. forEach方法实现
forEach用于对Stream的符合条件的元素进行输出,具体实现过程如下:
public interface Consumer<T> {
void accept(T t);
}
public interface Stream<T> extends BaseStream<T, Stream<T>> {
// 对符合条件的元素进行逐个操作
void forEach(Consumer<? super T> action);
}
// 终端操作的具体实现
@Override
public final void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, !isParallel()));
}
可以看出,forEach方法实际调用了evaluate方法。
evaluate方法的具体实现在PipelineHelper类中,对StatelessOp对象逐个进行输出操作。
下面是PipelineHelper的部分代码实现:
@Override
public <E_OUT> TerminalOp<P_OUT, E_OUT> evaluate(TerminalOp<E_OUT, R> terminalOp) {
Objects.requireNonNull(terminalOp);
if (linkedOrConsumed) {
throw new IllegalStateException(MSG_STREAM_LINKED);
}
linkedOrConsumed = true;
return new ChainedTerminal<>(terminalOp).evaluateSequential(this);
}
static final class ChainedTerminal<E_IN, E_OUT>
implements TerminalOp<E_IN, E_OUT> {
private final TerminalOp<?, E_OUT> terminalOp;
ChainedTerminal(TerminalOp<?, E_OUT> terminalOp) {
this.terminalOp = terminalOp;
}
@Override
public StreamShape inputShape() {
return terminalOp.inputShape();
}
@Override
public <S> E_OUT evaluateSequential(PipelineHelper<S> helper, Spliterator<S> spliterator) {
return terminalOp.evaluateSequential(helper, ops.wrapSink(terminalOp, helper.wrapSink(terminalOp, terminalSink)
));
}
}
@Override
// Input(depth==0), Output(Vector,X)
public Sink<T> wrapSink(PipelineHelper<T> ph, Sink<X> sink) {
return opWrapSink(opFlags, sink);
}
五、总结
Java Stream过滤实现原理是在中间操作filter和终端操作forEach的配合下完成的。
Stream的本质是对集合和数组操作的封装,并不会增加实际的存储压力。
Stream的使用可以大大简化代码复杂度,提高编程效率。
下面是完整的示例代码:
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
public class StreamFilterDemo {
public static void main(String[] args) {
// Stream的创建
Stream<Integer> stream = Stream.of(2, 3, 4, 5, 6);
// 过滤大于等于4的元素
Stream<Integer> stream2 = stream.filter(i -> i >= 4);
// 输出符合条件的元素
stream2.forEach(System.out::println);
}
}