fork-join
框架允许在几个工作进程中断某个任务,然后等待结果组合它们。 它在很大程度上利用了多处理器机器的生产能力。 以下是fork-join
框架中使用的核心概念和对象。
fork是一个进程,其中任务将其分成可以并发执行的较小且独立的子任务。
语法
sum left = new sum(array, low, mid);
left.fork();
这里sum
是recursivetask
的子类,left.fork()
方法将任务分解为子任务。
连接(join
)是子任务完成执行后任务加入子任务的所有结果的过程,否则它会持续等待。
语法
left.join();
这里剩下的是sum
类的一个对象。
它是一个特殊的线程池,旨在使用fork-and-join
任务拆分。
语法
forkjoinpool forkjoinpool = new forkjoinpool(4);
这里有一个新的forkjoinpool
,并行级别为4
个cpu。
recursiveaction
表示不返回任何值的任务。
语法
class writer extends recursiveaction {
@override
protected void compute() { }
}
recursivetask
表示返回值的任务。
语法
class sum extends recursivetask {
@override
protected long compute() { return null; }
}
以下testthread
程序显示了基于线程的环境中fork-join
框架的使用。
import java.util.concurrent.executionexception;
import java.util.concurrent.forkjoinpool;
import java.util.concurrent.recursivetask;
public class testthread {
public static void main(final string[] arguments) throws interruptedexception, executionexception {
int nthreads = runtime.getruntime().availableprocessors();
system.out.println(nthreads);
int[] numbers = new int[1000];
for(int i=0; i< numbers.length; i++){
numbers[i] = i;
}
forkjoinpool forkjoinpool = new forkjoinpool(nthreads);
long result = forkjoinpool.invoke(new sum(numbers,0,numbers.length));
system.out.println(result);
}
static class sum extends recursivetask<long> {
int low;
int high;
int[] array;
sum(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
protected long compute() {
if(high - low <= 10) {
long sum = 0;
for(int i=low; i < high; ++i)
sum += array[i];
return sum;
} else {
int mid = low + (high - low) / 2;
sum left = new sum(array, low, mid);
sum right = new sum(array, mid, high);
left.fork();
long rightresult = right.compute();
long leftresult = left.join();
return leftresult + rightresult;
}
}
}
}
这将产生以下结果 -
4
499500