github地址:https://github.com/antirez/disque
非常激动,终于等到Disque的apoha版本,虽然没有稳定版本的发布,但是可以确定的是基本结构已经定了。
现在根据github上的介绍来简单说下disque的特点:
1.顺序并不保证:在分布式环境下,如果需要保证顺序,需要引入分布式协调(Zookeeper等),但是单个节点中的message是可以保证顺序的(但是应该考虑容错机制打乱顺序)。
2.至少消费一次:在每个job里面,保存了一个属性retry时间,消息添加到队列中的时候,同时会加入到一个服务器调度队列中,在指定(retry)时间没有得到消费完成的确认(ackjob)的话,消息就会被重新入队用于消费。
3.最多消费一次:消息不是幂等的,但可以丢消息,retry=0。
4.分布式:实现AP而不顾虑C,即不保证多节点一致性。
5.复制:每当添加一个job之后,就会从当前集群中随机选取指定数目的集群节点,顺序发送新增指定job请求到这些节点,只有当最后一个节点返回成功之后,才会给客户端返回成功。
6.failure over:disque的分布式协议采是Gossip,本身就是无中心的分布式系统,如果故障节点的消息全部采用了复制,在别的节点,必然会有该消息的复制,这样消息就不会丢失了。
7.load balance:disque会动态计算每个节点的压力,对于请求量较大导致队列空掉的节点,会从别的节点拿到更多消息用于消费。
8.不落地:作者的思路是想在OOM的时候,写日志入盘,之后等有内存的时候在加载回来,目前并没有实现。
命令:
添加一个job:ADDJOB queue_name job [REPLICATE ] [DELAY ] [RETRY ] [TTL ] [MAXLEN ] [ASYNC]
REPLICATE count:复制份数,之前提到的,会从现有实例中,取出count个实例作为复制目标。
DELAY sec:之前没有提到的参数,指定多少秒钟之后,才把消息放入队列中。
RETRY sec:消息取走没有收到ack sec秒之后,重新把消息放回队列。
TTL sec:消息如果在队列中存在时间超过sec秒,就直接删除消息,无论有没有被消费。
MAXLEN count:消息队列最大长度。
ASYNC:采用异步方式操作。
主要过程描述:
添加一个job后,client拿到的jobId,这个id的组成是,前两位采用关键字DI(不知道什么作用),之后8位用来标志生成这个id的实例,之后32位随机生成但和时间相关的字符串,之后4位是消息存活时间,用于在消息在指定时间内没有被消费的处理。最后加上关键字SQ。
长这个样子:DI | 0f0c644f | d3ccb51c2cedbd47fcb6f312646c993c | 05a0 | SQ |
消息ID的作用,主要有以下几点:
每当生产者发送一个job,就会拿到这个id,可以用于生成者确认消息是否被正确消费。
每当消费者拿到一个job,就会附带这个id,用于消费者通知生成者或者disque消息已经被正确处理完毕。
获取job:GETJOB [TIMEOUT ] [COUNT ] FROM queue1 queue2 … queueN
TIMEOUT:执行超时时间,如果超时,直接返回。
COUNT count:返回count个消息。
主要过程:
返回的消息,附带队列名,消息以及jobID。
删除job:DELJOB jobid_1 jobid_2 … jobid_N
确认消息已经被消费:ACKJOB jobid1 jobid2 … jobidN
FASTACK jobid_1 jobid_2 … jobid_N 不需要确认其他节点应答ACK消息就会返回。