这里将自己定义实现一个消息队列,该消息队列可以手动配置队列容量,可以供生产者生产消息,可以供消费者消费消息。
消息类
@DatapublicclassMessage{// 消息IDprivatefinalint id;// 消息内容privatefinalObject message;publicMessage(finalint id,finalObject message){this.id= id;this.message= message;}}
消息队列类
根据面向对象的思想,消息队列需要提供put消息和take消息的方法。
@Slf4jpublicclassMessageQuene{/** * 双向队列 */privatefinalLinkedList<Message> queue=newLinkedList<>();/** * 消息队列的容量,默认容量110 */privateint capacity=10;publicMessageQuene(finalint capacity){this.capacity= capacity;}/** * 向消息队列put消息 */publicvoidput(finalMessage message)throwsInterruptedException{synchronized(this.queue){// 超出容量,则在这里等待消费者取走消息while(this.queue.size()>=this.capacity){ log.info("消息队列已满,等待消费者消费消息.....");this.queue.wait();}// 从尾部加,从头部取this.queue.addLast(message);this.queue.notifyAll();}}/** * 从消息队列获取消息 */publicMessagetake()throwsInterruptedException{synchronized(this.queue){while(this.queue.isEmpty()){ log.info("消息队列已空,等待生产者生产消息.....");// 这里将中断异常抛出this.queue.wait();}// 从尾部加,从头部取,取出后就从队列删除finalMessage message=this.queue.pollFirst();this.queue.notifyAll();return message;}}}
@Slf4jpublicclassMessageQueneTest{staticfinalMessageQuene messageQuene=newMessageQuene(2);publicstaticvoidmain(finalString[] args){// 消费者不断从队列中消费消息finalThread t1=newThread(MessageQueneTest::consumeMessage,"t1");finalThread t2=newThread(MessageQueneTest::consumeMessage,"t2");// 生产者,不断向队列中生产消息finalThread t3=newThread(MessageQueneTest::putMessage,"t3"); t1.start(); t2.start(); t3.start();}publicstaticvoidconsumeMessage(){try{while(true){finalMessage message= messageQuene.take(); log.info("消费者消费到消息,id = {},message = {}", message.getId(), message.getMessage());}}catch(finalInterruptedException e){ e.printStackTrace();}}publicstaticvoidputMessage(){try{while(true){finalString message= UUID.randomUUID().toString();finalint id=RandomUtils.nextInt(); log.info("生产者生产消息,id = {},message = {}", id, message); messageQuene.put(newMessage(id, message));TimeUnit.SECONDS.sleep(1);}}catch(finalInterruptedException e){ e.printStackTrace();}}}
测试结果
11:27:58.787 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息..... 11:27:58.794 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息..... 11:27:58.802 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc 11:27:58.805 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息..... 11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQueneTest - 消费者消费到消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc 11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息..... 11:27:59.809 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 773935934,message = 0fe599b8-bf43-41e7-9158-96a6d12b8a25