您的位置:

Java Stream过滤实现原理详解

一、Stream简介

Stream是Java 8中引入的一种全新的函数式编程方式,可以看做对集合和数组操作的函数化封装。

Stream可以进行过滤、映射、统计、归约等多种操作,可以大大简化代码复杂度,提高编程效率。

 // Stream是java.util.stream包下的类,需要进行导包  
import java.util.stream.Stream;

二、Stream过滤方法filter()

Stream中的filter方法用于过滤数据,筛选出符合条件的元素。

filter方法接受一个Predicate接口类型的lambda表达式,该表达式用于确定元素是否符合过滤条件。

 // 过滤出大于等于10的数字  
Stream<Integer> stream1 = Stream.of(5, 10, 15, 20);  
Stream<Integer> stream2 = stream1.filter(i -> i >= 10);  
stream2.forEach(System.out::println);  
// 输出:  
// 10  
// 15  
// 20  

三、Stream过滤实现原理

filter方法实现过滤的关键在于,判断每个元素是否符合条件,只输出符合条件的元素。

实现这个过程需要满足两个条件:

  • 每个元素都要被判断,即需要用到遍历方法。
  • 满足筛选条件的元素需要输出,即需要用到输出方法。

Stream本质上就是对这两个条件进行封装,核心就是使用map、reduce等方法实现中间操作,最终使用forEach等方法实现终端操作。

具体流程如下:

  1. 获取数据源——Stream.of或集合的stream方法等。
  2. 进行中间操作——如filter方法,进行过滤。
  3. 进行终端操作——如forEach方法,输出符合条件的元素。

四、Stream过滤实现原理示例

通过对Stream源码的分析,可以看出Stream的具体实现原理。

1. Stream的准备

Stream的创建方式有多种,示例中使用of方法创建一个int类型的Stream:

Stream<Integer> stream = Stream.of(2, 3, 4, 5, 6);

2. filter方法实现

filter方法需要传入一个Predicate类型的lambda表达式,用于判断元素是否符合过滤条件。

具体实现过程如下:

public interface Predicate<T> {  
    //进行元素判断,判断是否符合过滤条件  
    boolean test(T t);  
}  

public interface Stream<T> extends BaseStream<T, Stream<T>> {  
    // 进行中间操作,如filter方法 
    Stream<T> filter(Predicate<? super T> predicate);  
}  

// 中间操作的具体实现    
@Override  
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {  
    Objects.requireNonNull(predicate);  
    return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {  
        @Override  
        public Sink<P_OUT> opWrapSink(int flags, Sink<T> sink) {  
            return new Sink.ChainedReference<>(sink) {  
                @Override  
                public void begin(long size) {  
                    downstream.begin(-1);  
                }  
                @Override  
                public void accept(T t) {  
                    if (predicate.test(t)) {  
                        downstream.accept(t);  
                    }  
                }  
            };  
        }  
    };  
}

可以看出filter方法返回了一个新的Stream对象,其中StatelessOp实现了对原Stream的封装(对外提供的接口还是Stream)。

具体实现了两个方法:

  • opIsStateful:用于判断Stream是否能重复使用,filter方法中实现为return false。
  • opWrapSink:用于返回一个Sink对象,Sink是对元素遍历输出的封装。

opWrapSink返回的是一个匿名类,用于判断元素是否符合过滤条件:

if (predicate.test(t)) {  
    downstream.accept(t);  
}

3. forEach方法实现

forEach用于对Stream的符合条件的元素进行输出,具体实现过程如下:

public interface Consumer<T> {  
    void accept(T t);  
}  

public interface Stream<T> extends BaseStream<T, Stream<T>> {  
    // 对符合条件的元素进行逐个操作  
    void forEach(Consumer<? super T> action);  
}  

// 终端操作的具体实现  
@Override  
public final void forEach(Consumer<? super P_OUT> action) {  
    evaluate(ForEachOps.makeRef(action, !isParallel()));  
}

可以看出,forEach方法实际调用了evaluate方法。

evaluate方法的具体实现在PipelineHelper类中,对StatelessOp对象逐个进行输出操作。

下面是PipelineHelper的部分代码实现:

 @Override  
        public <E_OUT> TerminalOp<P_OUT, E_OUT> evaluate(TerminalOp<E_OUT, R> terminalOp) {  
            Objects.requireNonNull(terminalOp);  
            if (linkedOrConsumed) {  
                throw new IllegalStateException(MSG_STREAM_LINKED);  
            }  
            linkedOrConsumed = true;  
            return new ChainedTerminal<>(terminalOp).evaluateSequential(this);  
        }

static final class ChainedTerminal<E_IN, E_OUT>  
            implements TerminalOp<E_IN, E_OUT> {  
            private final TerminalOp<?, E_OUT> terminalOp;  

            ChainedTerminal(TerminalOp<?, E_OUT> terminalOp) {  
                this.terminalOp = terminalOp;  
            }  

            @Override  
            public StreamShape inputShape() {  
                return terminalOp.inputShape();  
            }  

            @Override  
            public <S> E_OUT evaluateSequential(PipelineHelper<S> helper, Spliterator<S> spliterator) {  
                return terminalOp.evaluateSequential(helper, ops.wrapSink(terminalOp, helper.wrapSink(terminalOp, terminalSink)  
                ));  
            }  
    }

@Override  
    // Input(depth==0), Output(Vector,X)  
    public Sink<T> wrapSink(PipelineHelper<T> ph, Sink<X> sink) {  
        return opWrapSink(opFlags, sink);  
    }  

五、总结

Java Stream过滤实现原理是在中间操作filter和终端操作forEach的配合下完成的。

Stream的本质是对集合和数组操作的封装,并不会增加实际的存储压力。

Stream的使用可以大大简化代码复杂度,提高编程效率。

下面是完整的示例代码:

import java.util.Objects;  
import java.util.function.Consumer;  
import java.util.function.Predicate;  
import java.util.stream.Stream;  

public class StreamFilterDemo {  
    public static void main(String[] args) {  
        // Stream的创建  
        Stream<Integer> stream = Stream.of(2, 3, 4, 5, 6);  
        // 过滤大于等于4的元素  
        Stream<Integer> stream2 = stream.filter(i -> i >= 4);  
        // 输出符合条件的元素  
        stream2.forEach(System.out::println);  
    }  
}