您的位置:

java消息队列,java消息队列mq的实现原理

本文目录一览:

到底什么是消息队列?Java中如何实现消息队列

消息队列,顾名思义 首先是个队列。

队列的操作有入队和出队

也就是有一个程序在产生内容然后入队(生产者)

另一个程序读取内容,内容出队(消费者)

这是最最基本的概念。

java中的消息队列

消息队列是线程间通讯的手段:

import java.util.*

public class MsgQueue{

private Vector queue = null;

public MsgQueue(){

queue = new Vector();

}

public synchronized void send(Object o)

{

queue.addElement(o);

}

public synchronized Object recv()

{

if(queue.size()==0)

return null;

Object o = queue.firstElement();

queue.removeElementAt(0);//or queue[0] = null can also work

return o;

}

}

因为java中是locked by object的所以添加synchronized 就可以用于线程同步锁定对象

可以作为多线程处理多任务的存放task的队列。他的client包括封装好的task类以及thread类

在JAVA中怎么实现消息队列

如下:

public static String do_post(String url, ListNameValuePair name_value_pair) throws IOException {

String body = "{}";

DefaultHttpClient httpclient = new DefaultHttpClient();

try {

HttpPost httpost = new HttpPost(url);

httpost.setEntity(new UrlEncodedFormEntity(name_value_pair, StandardCharsets.UTF_8));

HttpResponse response = httpclient.execute(httpost);

HttpEntity entity = response.getEntity();

body = EntityUtils.toString(entity);

} finally {

httpclient.getConnectionManager().shutdown();

}

return body;

}

public static String do_get(String url) throws ClientProtocolException, IOException {

String body = "{}";

DefaultHttpClient httpclient = new DefaultHttpClient();

try {

HttpGet httpget = new HttpGet(url);

HttpResponse response = httpclient.execute(httpget);

HttpEntity entity = response.getEntity();

body = EntityUtils.toString(entity);

} finally {

httpclient.getConnectionManager().shutdown();

}

return body;

}

如何用JAVA实现Linux上的消息队列功能

下面来说说如何用不用消息队列来进行进程间的通信,消息队列与命名管道有很多相似之处。有关命名管道的更多内容可以参阅我的另一篇文章:Linux进程间通信——使用命名管道

一、什么是消息队列

消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。 每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。

Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度。

二、在Linux中使用消息队列

Linux提供了一系列消息队列的函数接口来让我们方便地使用它来实现进程间的通信。它的用法与其他两个System V PIC机制,即信号量和共享内存相似。

1、msgget函数

该函数用来创建和访问一个消息队列。它的原型为:

int msgget(key_t, key, int msgflg);

与其他的IPC机制一样,程序必须提供一个键来命名某个特定的消息队列。msgflg是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。msgflg可以与IPC_CREAT做或操作,表示当key所命名的消息队列不存在时创建一个消息队列,如果key所命名的消息队列存在时,IPC_CREAT标志会被忽略,而只返回一个标识符。

它返回一个以key命名的消息队列的标识符(非零整数),失败时返回-1.

2、msgsnd函数

该函数用来把消息添加到消息队列中。它的原型为:

int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);

msgid是由msgget函数返回的消息队列标识符。

msg_ptr是一个指向准备发送消息的指针,但是消息的数据结构却有一定的要求,指针msg_ptr所指向的消息结构一定要是以一个长整型成员变量开始的结构体,接收函数将用这个成员来确定消息的类型。所以消息结构要定义成这样:

struct my_message{

long int message_type;

/* The data you wish to transfer*/

};

msg_sz是msg_ptr指向的消息的长度,注意是消息的长度,而不是整个结构体的长度,也就是说msg_sz是不包括长整型消息类型成员变量的长度。

msgflg用于控制当前消息队列满或队列消息到达系统范围的限制时将要发生的事情。

如果调用成功,消息数据的一分副本将被放到消息队列中,并返回0,失败时返回-1.

3、msgrcv函数

该函数用来从一个消息队列获取消息,它的原型为

int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);

msgid, msg_ptr, msg_st的作用也函数msgsnd函数的一样。

msgtype可以实现一种简单的接收优先级。如果msgtype为0,就获取队列中的第一个消息。如果它的值大于零,将获取具有相同消息类型的第一个信息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。

msgflg用于控制当队列中没有相应类型的消息可以接收时将发生的事情。

调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1.

4、msgctl函数

该函数用来控制消息队列,它与共享内存的shmctl函数相似,它的原型为:

int msgctl(int msgid, int command, struct msgid_ds *buf);

command是将要采取的动作,它可以取3个值,

IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。

IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值

IPC_RMID:删除消息队列

buf是指向msgid_ds结构的指针,它指向消息队列模式和访问权限的结构。msgid_ds结构至少包括以下成员:

struct msgid_ds

{

uid_t shm_perm.uid;

uid_t shm_perm.gid;

mode_t shm_perm.mode;

};

成功时返回0,失败时返回-1.

三、使用消息队列进行进程间通信

马不停蹄,介绍完消息队列的定义和可使用的接口之后,我们来看看它是怎么让进程进行通信的。由于可以让不相关的进程进行行通信,所以我们在这里将会编写两个程序,msgreceive和msgsned来表示接收和发送信息。根据正常的情况,我们允许两个程序都可以创建消息,但只有接收者在接收完最后一个消息之后,它才把它删除。

接收信息的程序源文件为msgreceive.c的源代码为:

#include unistd.h

#include stdlib.h

#include stdio.h

#include string.h

#include errno.h

#include sys/msg.h

struct msg_st

{

long int msg_type;

char text[BUFSIZ];

};

int main()

{

int running = 1;

int msgid = -1;

struct msg_st data;

long int msgtype = 0; //注意1

//建立消息队列

msgid = msgget((key_t)1234, 0666 | IPC_CREAT);

if(msgid == -1)

{

fprintf(stderr, "msgget failed with error: %d\n", errno);

exit(EXIT_FAILURE);

}

//从队列中获取消息,直到遇到end消息为止

while(running)

{

if(msgrcv(msgid, (void*)data, BUFSIZ, msgtype, 0) == -1)

{

fprintf(stderr, "msgrcv failed with errno: %d\n", errno);

exit(EXIT_FAILURE);

}

printf("You wrote: %s\n",data.text);

//遇到end结束

if(strncmp(data.text, "end", 3) == 0)

running = 0;

}

//删除消息队列

if(msgctl(msgid, IPC_RMID, 0) == -1)

{

fprintf(stderr, "msgctl(IPC_RMID) failed\n");

exit(EXIT_FAILURE);

}

exit(EXIT_SUCCESS);

}

发送信息的程序的源文件msgsend.c的源代码为:

#include unistd.h

#include stdlib.h

#include stdio.h

#include string.h

#include sys/msg.h

#include errno.h

#define MAX_TEXT 512

struct msg_st

{

long int msg_type;

char text[MAX_TEXT];

};

int main()

{

int running = 1;

struct msg_st data;

char buffer[BUFSIZ];

int msgid = -1;

//建立消息队列

msgid = msgget((key_t)1234, 0666 | IPC_CREAT);

if(msgid == -1)

{

fprintf(stderr, "msgget failed with error: %d\n", errno);

exit(EXIT_FAILURE);

}

//向消息队列中写消息,直到写入end

while(running)

{

//输入数据

printf("Enter some text: ");

fgets(buffer, BUFSIZ, stdin);

data.msg_type = 1; //注意2

strcpy(data.text, buffer);

//向队列发送数据

if(msgsnd(msgid, (void*)data, MAX_TEXT, 0) == -1)

{

fprintf(stderr, "msgsnd failed\n");

exit(EXIT_FAILURE);

}

//输入end结束输入

if(strncmp(buffer, "end", 3) == 0)

running = 0;

sleep(1);

}

exit(EXIT_SUCCESS);

}

转载仅供参考,版权属于原作者。祝你愉快,满意请采纳哦

Java开发中消息队列和rpc框架都是做什么的?

一,消息队列服务一般用于设计多系统之间的信息传输,一般这种传输不需要对方对数据做出回应。它最常见的方式是构建异步的生产者-消费者模式。我们在系统开发中,有些业务并不需要及时返回结果,我们可以把这些操作放到队列中,然后另起一个消费者去处理它。比如日志,数据库异步更新。

二,rpc一般是用于服务器与服务器进程之间通信,这种通信有请求和应答。它是建立在底层的socket通信之上的。封装为rpc之后,更加方便建立通信。就像在同一个进程中调用对方的方法一样。它本地的方法名一般和请求到达的服务器的方法名一一对应。这样可以更好的把模块划分。所以它是应对分布式而生的。比如一个网站,一开始可能所有的服务在一个进程中,但是随着业务的增长,一个进程处理不过来,这时就需要把业务拆分成多个,分部到不同的机器上去。