重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
Concurrent:
创新互联建站主要从事网站建设、网站设计、网页设计、企业做网站、公司建网站等业务。立足成都服务鹤庆,10年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:13518219792
1. BlockingQueue( 阻塞队列 )
ArrayBlockingQueue( 指定容量,不可变 ),LinkeBlocingQueue (指定容量不可变,也可以不指定容量,默认 Integer.Max_value )
PriorityBlockingQueue ( 根据实现的接口自定义排序,只有在逐个拿取的时候才有序 )
SynchronousQueue (长度以 1 只能为 1 )
2. ConcurrentMap
ConcurrentHashMap
1.5 分桶加锁,1.8 CAS+红黑树来保证线程安全
ConcurrentNavigableMap (针对有序列表,有 map.headMap map.subMap map.tailMap , 返回有序的在取出范围的 map )
3. CountDownLatch 闭锁
CountDownLatch 以一个给定的数量初始化。 countDown() 每被调用一次,这一数 量就减一。通过调用 await() 方法之一,线程可以阻塞等待这一数量到达零。
等待一定数量的线程完成。来执行其后的程序
4. CyclicBarrier 栅栏
它能够对处理一些算法的线程实现同
步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这
里,然后所有线程才可以继续做其他事情
5. Exchanger 交换机
类表示一种两个线程可以进行互相交换对象的会和点 ,只能两个线程之间交换数据
6. Semaphore 信号量
l
acquire()
l
release()
计数信号量由一个指定数量的
"
许可
"
初始化。每调用一次
acquire()
,一个许可会
被调用线程取走。每调用一次
release()
,一个许可会被返还给信号量。因此,在没
有任何
release()
调用时,最多有
N
个线程能够通过
acquire()
方法,
N
是该信
号量初始化时的许可的指定数量。
线程池
7. ExecutorService
(executors.newFixedThreadPool ,长任务场景,只有核心线程,没有临时线程,容纳无限多
,newCachedThreadPool (高并发短任务场景,没有核心线程,全部都是临时线程,处理任意多的线程)
,newSingleThreadPool
,newSchedulerThreadPool (有核心线程,有临时线程)
)
提交线程的方法
execute() 提交线程 没有返回值submit(Runnable) 提交线程,返Future 可以通过Future.get()得到该线程的状态但是如果该线程未执行完成,那么该方法阻塞
submit(Callable) 同上面类似,但是线程可以带有返回值
invokeAny(.....),随机选择线程执行一个
invokeAll(),自动执行所有的线程
关闭线程池: ExecutorService.shutdown();该方法不再接受线程池,等待所有线程执行完毕后,线程池结束。
ExecutorService.shutdown();立即关闭,退出任务,正在执行的线程可能会出错。
##Callable只能用线程池提交
Callable runnable :
1. 返回值
2. 异常, runnabel没有容错机制,callable有容错机制,可以将 异常抛给上层处理
3. Callable只能通过submit方法提交,runnable可以new 也可 以通过线程池提交
8. ReadWriteLock 读写锁 ,可以是公平,也可以是非公平的。该锁可以跨方法
读锁可以共享,写锁互斥
读锁 readLock().lock();
写锁 writeLock().lock();
package hgs.test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Test { public static void main(String[] args) throws InterruptedException, BrokenBarrierException { //ConcurrentMap //BlockingQueuebq = new ArrayBlockingQueue (4); //add(超过长度会报错) remove(没有元素会报错) /* bq.add("1"); bq.add("1"); bq.add("1"); bq.add("1"); bq.remove(); bq.remove(); bq.remove(); bq.remove(); */ //offer(如果可以插入返回true 否则 false) poll(如果没有元素返回null) //bq.offer("1"); //bq.offer("1"); //bq.offer("1"); //bq.offer("1"); //bq.offer("1"); //String flag = bq.poll(); //System.out.println(flag); //put take 阻塞式 //bq.put("1"); //bq.put("1"); //bq.put("1"); //bq.put("1"); //bq.put("1"); //bq.put("1"); //bq.take(); //offer(o,timeout,timeunit),poll(timeout,tameunit) 等待timeout时间,然后跳过 //bq.poll(10, TimeUnit.SECONDS); //bq.element();//检查是否为空,是的话跑出异常 //bq.peek();//阻塞 /* ConcurrentNavigableMap m = new ConcurrentSkipListMap () ; m.put(1, "1"); m.put(2, "2"); m.put(5, "5"); m.put(4, "4"); m.put(6, "6"); m.put(5, "5"); System.out.println("head(\"5\")"+m.headMap(4)); System.out.println(); System.out.println();*/ /*Boy b1 = new Boy("b1",24); Boy b2 = new Boy("b2",23); Boy b3 = new Boy("b3",26); Boy b4 = new Boy("b4",19); PriorityBlockingQueue< Boy> pbq = new PriorityBlockingQueue (100); pbq.add(b1); pbq.add(b2); pbq.add(b3); pbq.add(b4); for(int i =0 ;i<4;i++) { System.out.println(pbq.take().toString()); } */ //闭锁 /*CountDownLatch cdl = new CountDownLatch(4); for(int i = 0; i<4; i++) { new Thread(new BoyRun(cdl)).start(); } cdl.await(); System.out.println("全部到达。。。");*/ //栅栏 /*CyclicBarrier cb = new CyclicBarrier(4); for(int i = 0; i<4; i++) { new Thread(new GirlRan(cb)).start(); } cb.await(); System.out.println("all comming.");*/ //exchanger交换器 /*Exchanger ex = new Exchanger (); ExchangerTest e1 = new ExchangerTest(ex); ExchangerTest e2 = new ExchangerTest(ex); new Thread(e1).start(); new Thread(e2).start(); */ //6.Semaphore 信号量 /* Semaphore s = new Semaphore(5); for(int i = 0 ;i<9;i++) { new SemaphoreTest(s).start(); }*/ //原始创建线程池,executors.newCacheThreadPool 的底层调用该方法 /* ExecutorService es = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("full......"); } }); for(int i = 0 ;i<24;i++) { es.execute(new ThreadPoolTest()); } es.shutdown();*/ //可重入锁 ReentrantLock 可重入读写锁ReentrantReadWriteLock /* ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); for (int i = 0 ;i<3;i++) { new ReadLockTest( rwLock ).start(); }*/ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); for (int i = 0 ;i<3;i++) { new WriteLockTest( rwLock ).start(); } } } class WriteLockTest extends Thread{ ReentrantReadWriteLock rwLock ; public WriteLockTest(ReentrantReadWriteLock rwLock ) { this.rwLock = rwLock; } @Override public void run() { rwLock.writeLock().lock(); System.out.println("reading......"); try { Thread.sleep((long)(Math.random()*2000)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("done......"); rwLock.writeLock().unlock(); } } class ReadLockTest extends Thread{ ReentrantReadWriteLock rwLock ; public ReadLockTest(ReentrantReadWriteLock rwLock ) { this.rwLock = rwLock; } @Override public void run() { rwLock.readLock().lock(); System.out.println("reading......"); try { Thread.sleep((long)(Math.random()*2000)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("done......"); rwLock.readLock().unlock(); } } class ThreadPoolTest implements Runnable{ ThreadPoolTest (){ } @Override public void run() { System.out.println(Thread.currentThread().getName()); try { Thread.sleep((long)(Math.random()*2000)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } class SemaphoreTest extends Thread{ Semaphore s = null; public SemaphoreTest(Semaphore s) { this.s = s; } @Override public void run() { try { s.acquire(); System.out.println("aquire......"); Thread.sleep((long)(Math.random()*3000)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("release......"); s.release(); } } class ExchangerTest implements Runnable{ Exchanger ex = null; public ExchangerTest(Exchanger ex) { this.ex = ex; } @Override public void run() { String my = Thread.currentThread().getName(); String exstr = null; try { exstr = ex.exchange(my); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(my+" "+exstr); } } class BoyRun implements Runnable{ CountDownLatch cdl ; public BoyRun(CountDownLatch cdl ) { this.cdl = cdl; } @Override public void run() { System.out.println("cdl"+" +1"); cdl.countDown(); System.out.println("cdl"+" +1--"); } } class GirlRan implements Runnable { CyclicBarrier cb ; public GirlRan(CyclicBarrier cb ) { this.cb = cb; } @Override public void run() { System.out.println("cdl"+" +1"); try { Thread.sleep((long)(Math.random()*5000)); cb.await(); } catch (InterruptedException | BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("cdl"+" +1--"); } } class Boy implements Comparable { String name; int age; public Boy(String name ,int age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public int compareTo(Boy o) { return this.age - o.age; } @Override public String toString() { return "Boy [name=" + name + ", age=" + age + "]"; } }