【Redis 源码日志】— Redis 应用:消息中间件

作者:郑思愿

出处:http://daoluan.net


消息队列简介

接触 Linux 系统编程的时候,曾经学到消息队列是 IPC 的一种方式,这种通讯方式通常只用于本地的进程,基于共享内存的《无锁消息队列》即是一个很好的中间件,详见这里。但这篇提到的消息队列,也被称为消息中间件,通常在分布式系统中用到。

提及消息中间件的时候,还会涉及生产者和消费者两个概念。消息中间件是负责接收来自生产者的消息,并存储并转发给对应的消费者,生产者可以按 topic 发布各样消息,消费者也可以按 topic 订阅各样消息。生产者只管往消息队列里推送消息,不用等待消费者的回应;消费者只管从消息队列中取出数据并处理,可用可靠性等问题都交由消息中间件来负责。

说白了,这种分布式的消息中间件即是网络上一个服务器,我们可以往里面扔数据,里面的数据会被消息中间件推送或者被别人拉取,消息中间件取到一个数据中转的作用。生产者和消费者通常有两种对应关系,一个生产者对应一个消费者,以及一个生产者对应多个消费者。在这篇文章中,介绍了消息中间件的三个特点:解耦,异步和并行。读者可以自行理解。一些不需要及时可靠响应的业务场景,消息中间件可以大大提高业务上层的 吞吐量。

目前消息中间件一族里边有一些优秀的作品,RabbitMQ, Jafka/Kafka。redis 也可以作为一个入门级的消息队列。上面提到的一个生产者对应一个消费者,Redis 的 blist 可以实现;一个生产者对应多个消费者,Redis 的pub/sub 模式可以实现。值得注意的是,使用 Redis 作为消息中间件,假如消费者有一段时间断开了与 Redis 的连接,它将不会收到这段时间内 Redis 内的数据,这一点从 pub/sub 的实现可以知道。严格意义上的消息中间件,需要保证数据的可靠性。

分布式的消息队列

在平时的开发当中,消息队列算是最常见的应用了。在本机的时候,可以使用系统提供的消息队列,或者基于共享内存的循环消息队列,来实现本机进程以及进程之间的通信。对于异机部署的多个进程,就需要用到分布式的消息队列了,来看看这个场景:

生产者,基于 Redis 的消息队列,3 个 worker 组都分别部署在不同的机器上,生产者会快速将产出内容(如需要存储的数据或者日志等)推送到消息队列服务器上,这是 worker group 就能消费了。

这种实现可以借助 Redis 中的 blist 实现。在这里用 C 实现了一个生产者和 worker group 的示例代码:

// comm.h
#ifndef COMM_H__
#define COMM_H__
#include <inttypes.h>
typedef struct {
   char ip[32];
   uint16_t port;
   char queue_name[256];
} config_t ;
void Usage(char *program) {
   printf("Usage: %s -h ip -p port -l test\n",program);
   abort();
}
const size_t max_cmd_len = 512;
#endif

生产者的代码:
// producer.cc
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#include "comm.h"

#include "hiredis/hiredis.h"

void test_redis_client()
{
   redisContext *rc = redisConnect("127.0.0.1",6379);
   if(NULL == rc || rc != NULL && rc->err) {
       fprintf(stderr,"error: %s\n",rc->errstr);
       return;
   }
   // set name
   redisReply *reply = (redisReply *)redisCommand(rc,"set name dylan");
   printf("%s\n",reply->str);
   // get name
   reply = (redisReply *)redisCommand(rc,"get name");
   printf("%s\n",reply->str);
   }
   int main(int argc, char *argv[]) {
   if (argc < 7)
       Usage(argv[0]);
   config_t config;
   for (int i = EOF;
           (i = getopt(argc, argv, "h:p:l:")) != EOF;) {
       switch (i) {
           case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
           case 'p': config.port = atoi(optarg); break;
           case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
           optarg); break;
           default: Usage(argv[0]); break;
       }
   }
   redisContext *rc = redisConnect(config.ip,config.port);
   if (NULL == rc || rc != NULL && rc->err) {
       fprintf(stderr,"error: %s\n",rc->errstr);
       return -1;
   }
   redisReply *reply = NULL;
   char cmd[max_cmd_len];
   snprintf(cmd,sizeof(cmd),"LPUSH %s task",config.queue_name);
   printf("cmd=%s\n",cmd);
   int count = 100;
   while (count--) {
       reply = (redisReply *)redisCommand(rc,cmd);
   if (reply && reply->type == REDIS_REPLY_INTEGER) {
   } else {
       printf("BLPUSH error\n");
       }
   }
return 0;
}

消费者的代码:
// consumer.cc
#include "comm.h"
#include "hiredis/hiredis.h"
int DoLogic(char *data, size_t len);
int main(int argc, char *argv[]) {
   if (argc < 7)
       Usage(argv[0]);
   config_t config;
       for (int i = EOF;
           (i = getopt(argc, argv, "h:p:l:")) != EOF;) {
       switch (i) {
           case 'h': snprintf(config.ip,sizeof(config.ip),"%s",optarg); break;
           case 'p': config.port = atoi(optarg); break;
           case 'l': snprintf(config.queue_name,sizeof(config.queue_name),"%s",
           optarg); break;
           default: Usage(argv[0]); break;
       }
   }
   redisContext *rc = redisConnect(config.ip,config.port);
   if (NULL == rc || rc != NULL && rc->err) {
       fprintf(stderr,"error: %s\n",rc->errstr);
       return -1;
   }
   redisReply *reply = NULL;
   char cmd[max_cmd_len];
   snprintf(cmd,sizeof(cmd),"BRPOP %s task 30",config.queue_name);
   int seq = 0;
   while (true) {
       reply = (redisReply *)redisCommand(rc,cmd);
   if (reply && reply->type == REDIS_REPLY_STRING) {
       DoLogic(reply->str,reply->len);
   } else if (reply && reply->type == REDIS_REPLY_ARRAY) {
       for (size_t i=0; i<reply->elements; i+=2) {
       printf("%d->%s\n",seq++,reply->element[i]->str);
   }
   } else {
   printf("BRPOP error, reply->type=%d\n",reply->type);
   break;
       }
   }
   return 0;
}
   int DoLogic(char *data, size_t len) {
       printf("reply=%s\n",data);
return 0;
}
赞(1) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » 【Redis 源码日志】— Redis 应用:消息中间件
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏