java多线程之自定义消息队列

说明

这里将自己定义实现一个消息队列,该消息队列可以手动配置队列容量,可以供生产者生产消息,可以供消费者消费消息。

1、自定义消息队列

消息类

@DatapublicclassMessage{// 消息IDprivatefinalint id;// 消息内容privatefinalObject message;publicMessage(finalint id,finalObject message){this.id= id;this.message= message;}}

消息队列类

根据面向对象的思想,消息队列需要提供put消息和take消息的方法。

@Slf4jpublicclassMessageQuene{/**      * 双向队列      */privatefinalLinkedList<Message> queue=newLinkedList<>();/**      * 消息队列的容量,默认容量110      */privateint capacity=10;publicMessageQuene(finalint capacity){this.capacity= capacity;}/**      * 向消息队列put消息      */publicvoidput(finalMessage message)throwsInterruptedException{synchronized(this.queue){// 超出容量,则在这里等待消费者取走消息while(this.queue.size()>=this.capacity){                 log.info("消息队列已满,等待消费者消费消息.....");this.queue.wait();}// 从尾部加,从头部取this.queue.addLast(message);this.queue.notifyAll();}}/**      * 从消息队列获取消息      */publicMessagetake()throwsInterruptedException{synchronized(this.queue){while(this.queue.isEmpty()){                 log.info("消息队列已空,等待生产者生产消息.....");// 这里将中断异常抛出this.queue.wait();}// 从尾部加,从头部取,取出后就从队列删除finalMessage message=this.queue.pollFirst();this.queue.notifyAll();return message;}}}

2、自定义消息队列测试

@Slf4jpublicclassMessageQueneTest{staticfinalMessageQuene messageQuene=newMessageQuene(2);publicstaticvoidmain(finalString[] args){// 消费者不断从队列中消费消息finalThread t1=newThread(MessageQueneTest::consumeMessage,"t1");finalThread t2=newThread(MessageQueneTest::consumeMessage,"t2");// 生产者,不断向队列中生产消息finalThread t3=newThread(MessageQueneTest::putMessage,"t3");          t1.start();         t2.start();         t3.start();}publicstaticvoidconsumeMessage(){try{while(true){finalMessage message= messageQuene.take();                 log.info("消费者消费到消息,id = {},message = {}", message.getId(), message.getMessage());}}catch(finalInterruptedException e){             e.printStackTrace();}}publicstaticvoidputMessage(){try{while(true){finalString message= UUID.randomUUID().toString();finalint id=RandomUtils.nextInt();                 log.info("生产者生产消息,id = {},message = {}", id, message);                 messageQuene.put(newMessage(id, message));TimeUnit.SECONDS.sleep(1);}}catch(finalInterruptedException e){             e.printStackTrace();}}}

测试结果

11:27:58.787 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息..... 11:27:58.794 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息..... 11:27:58.802 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc 11:27:58.805 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息..... 11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQueneTest - 消费者消费到消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc 11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息..... 11:27:59.809 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 773935934,message = 0fe599b8-bf43-41e7-9158-96a6d12b8a25