fzy-blog

zookeeper注册服务与netty的简单结合应用

2019-05-24

简单介绍一下 zookeeper 和 netty

Netty:流行的 NIO Socket 通信框架,很多开源软件如 hadoop tachyon spark 都使用 Netty 作为底层通信框架
Zookeeper:分布式的,开放源码的分布式应用程序协调服务,hadoop hbase 等开源分布式系统的重要组件

应用场景描述:
利用 Zookeeper 的服务注册与发现功能,实现 Netty 通信集群的简单高可用。

首先 NettyServer 端需要将服务注册到 zookeeper 中,代码如下

package com.zookeeper;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import com.constance.Constant;

public class ServiceRegistry {

private CountDownLatch latch = new CountDownLatch(1);
private String registryAddress;

public ServiceRegistry(String registryAddress) {
    this.registryAddress = registryAddress;
}

//注册到zk中,其中data为服务端的 ip:port
public void register(String data) {
    if (data != null) {
        ZooKeeper zk = connectServer();
        if (zk != null) {
            createNode(zk, data);
        }
    }
}

private ZooKeeper connectServer() {
    ZooKeeper zk = null;
    try {
       zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    latch.countDown();
                }
            }
        });
        latch.await();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return zk;
}

private void createNode(ZooKeeper zk, String data) {
    try {
        byte[] bytes = data.getBytes();
        String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("create zookeeper node path:"+path+" data:"+data);
    } catch (Exception e) {
     e.printStackTrace();
    }
}

}

NettyServer 端启动及调用注册服务的代码

package com.main;

import com.constance.Constant;
import com.netty.Server;
import com.zookeeper.ServiceRegistry;

public class ServerMain {

public static void main(String[] agrs){
int port = 9988;
//register service in zookeeper
ServiceRegistry zsr = new ServiceRegistry(Constant.registryAddress);
String serverIp = “123.123.123.123:9988”;
zsr.register(serverIp);
//netty bind
new Server().bind(port);
}
}

NettyClient 端启动时发现服务的代码

package com.main;

import com.constance.Constant;
import com.netty.Client;
import com.test.TestThread;
import com.zookeeper.ServiceDiscovery;

public class ClientMain {

/**

  • @param args
  • @throws InterruptedException
    */
    public static void main(String[] args) throws InterruptedException {
    //find service from zookeeper
    ServiceDiscovery sd = new ServiceDiscovery(Constant.registryAddress);
    String serverIp = sd.discover();
    String[] serverArr = serverIp.split(“:”);
    System.out.println(“ServerIP:”+serverArr[0]+” ServerPort:”+serverArr[1]);
    String hostIP = “123.123.111.111”;
    int port = 9988;
    new TestThread().test();
    //Client.connect(hostIP, port);
    Client.connect(serverArr[0], Integer.valueOf(serverArr[1]));
    }
    }

NettyClient 中 zk 发现服务的方法

package com.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import com.constance.Constant;

public class ServiceDiscovery {

private CountDownLatch latch = new CountDownLatch(1);
private volatile List<String> dataList = new ArrayList<String>();
private String registryAddress;


public ServiceDiscovery(String registryAddress) {
    this.registryAddress = registryAddress;
    ZooKeeper zk = connectServer();
    if (zk != null) {
        watchNode(zk);
    }
}

public String discover() {
    String data = null;
    int size = dataList.size();
    if (size > 0) {
        if (size == 1) {
            data = dataList.get(0);
        } else {
         //随机获取其中的一个
            data = dataList.get(ThreadLocalRandom.current().nextInt(size));
        }
    }
    return data;
}

private ZooKeeper connectServer() {
    ZooKeeper zk = null;
    try {
     //format host1:port1,host2:port2,host3:port3
        zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
             //zookeeper处于同步连通的状态时
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    latch.countDown();
                }
            }
        });
        latch.await();
    } catch (IOException e) {
     e.printStackTrace();
    } catch (InterruptedException e) {

e.printStackTrace();
}
return zk;
}

private void watchNode(final ZooKeeper zk) {
    try {
        List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                    watchNode(zk);
                }
            }
        });
        List<String> dataList = new ArrayList<String>();
        for (String node : nodeList) {
            byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
            dataList.add(new String(bytes));
        }
        this.dataList = dataList;
    } catch (Exception e) {
     e.printStackTrace();
    }
}

}

ps:使用代码框时,生成的代码框总是跑到第一行,只能将代码粘贴在文本中

Tags: 优化
使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

扫描二维码,分享此文章