博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ZooKeeper实现分布式FIFO队列
阅读量:2717 次
发布时间:2019-05-13

本文共 7444 字,大约阅读时间需要 24 分钟。

,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: 
  • email: bsspirit@gmail.com

转载请注明出处:

前言

ZooKeeper是一个强大的分布式协作系统,用ZooKeeper可以方便地实现先进先出(FIFO)队列。给“队列”的技术现实多一种选择,标准化我们的程序结构。另一篇,分步式同步队列实现,请参考:

关于ZooKeeper的基本使用,请参考:

目录

  1. 分布式先进先出(FIFO)队列
  2. 设计思路
  3. 程序实现

1. 分布式先进先出(FIFO)队列

在计算机科学中,消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式。消息队列提供了异步的通信协议,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

先进先出(FIFO)队列,是消息队列最基本的一种实现形式,先发出的先消费。

2. 设计思路

实现的思路也非常简单,在/queue-fifo的目录下创建 SEQUENTIAL 类型的子目录 /x(i),这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证FIFO。

应用实例

图标解释

  1. app1,app2,app3是3个独立的业务系统
  2. zk1,zk2,zk3是ZooKeeper集群的3个连接点
  3. /queue-fifo,是znode的队列,按顺序存储数据
  4. /queue-fifo/x1,是znode队列中,1号排对者,由app1提交
  5. /queue-fifo/x2,是znode队列中,2号排对者,由app2提交
  6. app3是消费者,通过zk3连接到znode队列中,找到/queue-fifo中顺序最少的节点消费,删除消费后的节点(红色线表示)

注:

  • 1). app1可以通过zk2提交,app2也可通过zk3提交
  • 2). app1可以提交3次请求,生成x1,x2,x3多个节点
  • 3). app1可以作为消费者,消费队列数据

3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,生产2个节点,然后再消费3个节点。

public static void doOne() throws Exception {        String host1 = "192.168.1.201:2181";        ZooKeeper zk = connection(host1);        initQueue(zk);        produce(zk, 1);        produce(zk, 2);        cosume(zk);        cosume(zk);        cosume(zk);        zk.close();    }

创建一个与服务器的连接

public static ZooKeeper connection(String host) throws IOException {        ZooKeeper zk = new ZooKeeper(host, 60000, null);        return zk;    }

出始化队列

public static ZooKeeper connection(String host) throws IOException {        return new ZooKeeper(host, 60000, new Watcher() {            public void process(WatchedEvent event) {            }        });    }

生产者

public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {        System.out.println("create /queue-fifo/x" + x + " x" + x);        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);    }

消费者

public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {        List list = zk.getChildren("/queue-fifo", true);        if (list.size() > 0) {            long min = Long.MAX_VALUE;            for (String num : list) {                if (min > Long.parseLong(num.substring(1))) {                    min = Long.parseLong(num.substring(1));                }            }            System.out.println("delete /queue/x" + min);            zk.delete("/queue-fifo/x" + min, 0);        } else {            System.out.println("No node to cosume");        }    }

启动main函数

public static void main(String[] args) throws Exception {            doOne();    }

运行结果:

/queue-fifo is exist!create /queue-fifo/x1 x1create /queue-fifo/x2 x2delete /queue/x10000000032delete /queue/x20000000033No node to cosume

完全符合我的们预期,由于produce时,我们创建的节点模式是EPHEMERAL_SEQUENTIAL,所以系统会在x(i)(n),随机生成n=0000000032,输出为x10000000032。

接下来我们看分布式环境。

2). 分布式模拟实验

app1通过zk1生产x1, app2通过zk2生产x2, app3通过zk3消费3个节点

public static void doAction(int client) throws Exception {        String host1 = "192.168.1.201:2181";        String host2 = "192.168.1.201:2182";        String host3 = "192.168.1.201:2183";        ZooKeeper zk = null;        switch (client) {        case 1:            zk = connection(host1);            initQueue(zk);            produce(zk, 1);            break;        case 2:            zk = connection(host2);            initQueue(zk);            produce(zk, 2);            break;        case 3:            zk = connection(host3);            initQueue(zk);            cosume(zk);            cosume(zk);            cosume(zk);            break;        }    }

启动main函数

public static void main(String[] args) throws Exception {        if (args.length == 0) {            doOne();        } else {            doAction(Integer.parseInt(args[0]));        }    }

程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3

run1: 执行app1–>zk1

#日志输出/queue-fifo is exist!create /queue-fifo/x1 x1

run2: 执行app2–>zk2

#日志输出/queue-fifo is exist!create /queue-fifo/x2 x2

run3: 执行app3–>zk3

#日志输出/queue-fifo is exist!delete /queue/x10000000034delete /queue/x20000000035No node to cosume

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:

package org.conan.zookeeper.demo;import java.io.IOException;import java.util.List;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;public class FIFOZooKeeper {    public static void main(String[] args) throws Exception {        if (args.length == 0) {            doOne();        } else {            doAction(Integer.parseInt(args[0]));        }    }    public static void doOne() throws Exception {        String host1 = "192.168.1.201:2181";        ZooKeeper zk = connection(host1);        initQueue(zk);        produce(zk, 1);        produce(zk, 2);        cosume(zk);        cosume(zk);        cosume(zk);        zk.close();    }    public static void doAction(int client) throws Exception {        String host1 = "192.168.1.201:2181";        String host2 = "192.168.1.201:2182";        String host3 = "192.168.1.201:2183";        ZooKeeper zk = null;        switch (client) {        case 1:            zk = connection(host1);            initQueue(zk);            produce(zk, 1);            break;        case 2:            zk = connection(host2);            initQueue(zk);            produce(zk, 2);            break;        case 3:            zk = connection(host3);            initQueue(zk);            cosume(zk);            cosume(zk);            cosume(zk);            break;        }    }    // 创建一个与服务器的连接    public static ZooKeeper connection(String host) throws IOException {        return new ZooKeeper(host, 60000, new Watcher() {            public void process(WatchedEvent event) {            }        });    }    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {        if (zk.exists("/queue-fifo", false) == null) {            System.out.println("create /queue-fifo task-queue-fifo");            zk.create("/queue-fifo", "task-queue-fifo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        } else {            System.out.println("/queue-fifo is exist!");        }    }    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {        System.out.println("create /queue-fifo/x" + x + " x" + x);        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);    }    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {        List list = zk.getChildren("/queue-fifo", true);        if (list.size() > 0) {            long min = Long.MAX_VALUE;            for (String num : list) {                if (min > Long.parseLong(num.substring(1))) {                    min = Long.parseLong(num.substring(1));                }            }            System.out.println("delete /queue/x" + min);            zk.delete("/queue-fifo/x" + min, 0);        } else {            System.out.println("No node to cosume");        }    }}

转载请注明出处:

打赏作者

This entry was posted in , ,

你可能感兴趣的文章
LeetCode 62. Unique Paths LeetCode 63 Unique Paths II 不同的路径之二
查看>>
LeetCode 66. Plus One
查看>>
LeetCode 74. Search a 2D Matrix
查看>>
【已解决】 78. Subsets【39、40未解决】
查看>>
创建第一个android项目
查看>>
Excel 使用过程中碰到的问题处理
查看>>
阿里云负载均衡SLB--报错502 Bad Gateway 的解决方案
查看>>
Monte Carlo 方法求解π的近似值
查看>>
一些python学习的基本操作(持续更新中)
查看>>
Fluxion安装教程
查看>>
网络安全基础知识
查看>>
最详细 vsphere创建Windows service虚拟机,并安装VMware Tools 进行配置
查看>>
【html/css】如何设置HTML span 的宽度
查看>>
ubuntu12.10更新包后的问题
查看>>
【web开发】EL表达式的一些用法小结
查看>>
【mysql】关于命令load data local infile
查看>>
如何选择更适合你的 Linux 发行版?
查看>>
数据分析师必知必会的7款Python工具
查看>>
又到招聘季,说说网络招聘的那些坑
查看>>
Windows RDP远程桌面无密码账户
查看>>