您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > JAVA

Java如何使用消息中间件ActiveMQ?

时间:2023-08-16 16:33:38  来源:  作者:运维开发木子李

 

创建ActiveMQ连接:

import JAVAx.jms.*;
import org.Apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void mAIn(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 启动连接
            connection.start();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

创建ActiveMQ会话:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

创建ActiveMQ队列:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

创建ActiveMQ主题:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

发送消息到队列:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            
            // 创建消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            
            // 发送消息
            producer.send(message);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

发送消息到主题:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息生产者
            MessageProducer producer = session.createProducer(topic);
            
            // 创建消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            
            // 发送消息
            producer.send(message);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

接收队列中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(queue);
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

接收主题中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(topic);
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

发布-订阅模式中的发布者:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息生产者
            MessageProducer producer = session.createProducer(topic);
            
            // 创建消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            
            // 发送消息
            producer.send(message);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

发布-订阅模式中的订阅者:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(topic);
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

使用消息监听器接收队列中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(queue);
            
            // 注册消息监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("Received message: " + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            // 启动连接
            connection.start();
            
            // 等待接收消息
            Thread.sleep(5000);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

使用消息监听器接收主题中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(topic);
            
            // 注册消息监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("Received message: " + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            // 启动连接
            connection.start();
            
            // 等待接收消息
            Thread.sleep(5000);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

使用消息选择器接收队列中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(queue, "age > 18");
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

使用消息选择器接收主题中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(topic, "age > 18");
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

使用事务发送消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            
            // 创建消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            
            // 发送消息
            producer.send(message);
            
            // 提交事务
            session.commit();
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

这些示例覆盖了ActiveMQ的常用用法,包括创建连接、创建会话、创建队列和主题、发送和接收消息等。你可以根据自己的需求选择相应的示例进行参考和使用。



Tags:ActiveMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Java如何使用消息中间件ActiveMQ?
创建ActiveMQ连接:import javax.jms.*;import org.apache.activemq.ActiveMQConnectionFactory;public class ActiveMQExample { public static void main(String[] args...【详细内容】
2023-08-16  Search: ActiveMQ  点击:(281)  评论:(0)  加入收藏
▌简易百科推荐
Java 8 内存管理原理解析及内存故障排查实践
本文介绍Java8虚拟机的内存区域划分、内存垃圾回收工作原理解析、虚拟机内存分配配置,以及各垃圾收集器优缺点及场景应用、实践内存故障场景排查诊断,方便读者面临内存故障时...【详细内容】
2024-03-20  vivo互联网技术    Tags:Java 8   点击:(14)  评论:(0)  加入收藏
如何编写高性能的Java代码
作者 | 波哥审校 | 重楼在当今软件开发领域,编写高性能的Java代码是至关重要的。Java作为一种流行的编程语言,拥有强大的生态系统和丰富的工具链,但是要写出性能优异的Java代码...【详细内容】
2024-03-20    51CTO  Tags:Java代码   点击:(21)  评论:(0)  加入收藏
在Java应用程序中释放峰值性能:配置文件引导优化(PGO)概述
译者 | 李睿审校 | 重楼在Java开发领域,优化应用程序的性能是开发人员的持续追求。配置文件引导优化(Profile-Guided Optimization,PGO)是一种功能强大的技术,能够显著地提高Ja...【详细内容】
2024-03-18    51CTO  Tags:Java   点击:(24)  评论:(0)  加入收藏
Java生产环境下性能监控与调优详解
堆是 JVM 内存中最大的一块内存空间,该内存被所有线程共享,几乎所有对象和数组都被分配到了堆内存中。堆被划分为新生代和老年代,新生代又被进一步划分为 Eden 和 Survivor 区,...【详细内容】
2024-02-04  大雷家吃饭    Tags:Java   点击:(56)  评论:(0)  加入收藏
在项目中如何避免和解决Java内存泄漏问题
在Java中,内存泄漏通常指的是程序中存在一些不再使用的对象或数据结构仍然保持对内存的引用,从而导致这些对象无法被垃圾回收器回收,最终导致内存占用不断增加,进而影响程序的性...【详细内容】
2024-02-01  编程技术汇  今日头条  Tags:Java   点击:(68)  评论:(0)  加入收藏
Java中的缓存技术及其使用场景
Java中的缓存技术是一种优化手段,用于提高应用程序的性能和响应速度。缓存技术通过将计算结果或者经常访问的数据存储在快速访问的存储介质中,以便下次需要时可以更快地获取。...【详细内容】
2024-01-30  编程技术汇    Tags:Java   点击:(72)  评论:(0)  加入收藏
JDK17 与 JDK11 特性差异浅谈
从 JDK11 到 JDK17 ,Java 的发展经历了一系列重要的里程碑。其中最重要的是 JDK17 的发布,这是一个长期支持(LTS)版本,它将获得长期的更新和支持,有助于保持程序的稳定性和可靠性...【详细内容】
2024-01-26  政采云技术  51CTO  Tags:JDK17   点击:(88)  评论:(0)  加入收藏
Java并发编程高阶技术
随着计算机硬件的发展,多核处理器的普及和内存容量的增加,利用多线程实现异步并发成为提升程序性能的重要途径。在Java中,多线程的使用能够更好地发挥硬件资源,提高程序的响应...【详细内容】
2024-01-19  大雷家吃饭    Tags:Java   点击:(105)  评论:(0)  加入收藏
这篇文章彻底让你了解Java与RPA
前段时间更新系统的时候,发现多了一个名为Power Automate的应用,打开了解后发现是一个自动化应用,根据其描述,可以自动执行所有日常任务,说的还是比较夸张,简单用了下,对于office、...【详细内容】
2024-01-17  Java技术指北  微信公众号  Tags:Java   点击:(95)  评论:(0)  加入收藏
Java 在 2023 年仍然流行的 25 个原因
译者 | 刘汪洋审校 | 重楼学习 Java 的过程中,我意识到在 90 年代末 OOP 正值鼎盛时期,Java 作为能够真正实现这些概念的语言显得尤为突出(尽管我此前学过 C++,但相比 Java 影响...【详细内容】
2024-01-10  刘汪洋  51CTO  Tags:Java   点击:(74)  评论:(0)  加入收藏
相关文章
    无相关信息
站内最新
站内热门
站内头条