如何实现一个高可用高性能的区块链平台?
  zYlHB79h2aVW 2023年11月02日 77 0

基本概念

广义的区块链是指实现了数据公开、透明、可追溯的产品的架构设计方法。必须包含点对点网络设计、加密技术应用、分布式算法的实现、数据存储技术的使用等4个方面,其他的可能涉及到分布式存储、机器学习、VR、物联网、大数据等。狭义的区块链仅仅涉及到数据存储技术,数据库或文件操作等。本文的区块链,指的是广义的区块链,不会涉及数字币的任何机制和实现。

平台功能说明

本文所阐述的区块链实现主要面向的是私有链及联盟链。例如数家公司组个联盟,来共同见证、记录一些不可篡改的交互信息,如A公司给B公司发了一个xxx请求,B公司响应了什么什么。其实要的就是一个分布式数据库,而且性能要好,不是像比特币那种10分钟才生成一个区块,同时还需要确保每条信息被准确记录,不允许丢失(即技术上不允许分叉也不允许丢失)。这个平台的核心是数据库的性能,和区块链的一些特性以及实现高可用和选择性能更优的共识算法。

平台架构

整个系统分为接入层、池化层、指令处理层、网络层以及存储层,下面我们分层逐个介绍,并就各层可能的性能及高可用优化点做一个说明。

接入层

接入层主要是区块链创建及查询的API接口,业务逻辑很简单,例如区块链的创建,先看一下指令集的定义:

public class Instruction{
    /**
     * 新的内容
     */
    private String json;
    /**
     * 时间戳
     */
    private Long timeStamp;
    /**
     * 操作人的公钥
     */
    private String publicKey;
    /**
     * 签名
     */
    private String sign;
    /**
     * 该操作的hash
     */
    private String hash;
  /**
     * 指令的操作,增删改(1,-1,2)
     */
    private byte operation;
    /**
     * 操作的表名
     */
    private String table;
    /**
     * 原始Json,用于回滚区块执行,del 和 update 必填
     */
    private String oldJson;
    /**
     * 业务id,sql语句中where需要该Id  del 和 update 必填
     */
    private String instructionId;
  }

将传入的参数按照指令集的模板进行创建及添加到指令池,如下:

  public ResultPack<String> production(Instruction instruction) {
        //判断指令是否已存在,如果已存在则不添加
        String key = instruction.getHash();
        if (instructionHash.contains(key)) {
            return ResultPack.failed("[" + key + "]已经存在指令池中");
        }
    //添加到内存缓冲队列
        ResultPack<String> resultPack = instructionQueue.offer(instruction);
        //入队失败
        if (resultPack.isFailed()) {
            throw new BlockException("指令池已满,当前队列数量:" + instructionQueue.getSize() + " 队列最大限制:" + capital);
        } else {
            instructionHash.add(key);
        }
        return resultPack;
}

池化及指令处理层

池化层主要是基于ConcurrentLinkedQueue实现的一个队列服务,它主要的功能逻辑如下:

生产者:

  • API层的指令接收后存储到队列;

  • 区块链生成失败或者分叉后重新加入到队列。

消费者:

  • 定时线程进行消费,并调用区块添加服务发起区块添加共识;

生产者第1个功能在前面已经介绍,这里介绍下第2个生产者场景,当服务端接收到来自共识结果是指令已存在,例如冲突,分叉的时候就进行重新打包:

if (rpcCheckBlockBody.getCode() == RpcCheckBlockResult.EXITS_INSTRUCTION.getCode()) {
            //如果指令中存在已经被打包的,找出未打包指令后单播,重新再次打包
            List<Instruction> appendingInstructions = ApplicationContextProvider.getBean(DbBlockManager.class)
                    .checkAndGetpendingInstructions(block.getBlockBody().getInstructions());

            if (CollUtil.isNotEmpty(appendingInstructions)) {
                BlockPacket instructionPacket = new PacketBuilder<InstructionBody>()
                        .setType(PacketType.ADD_INSTRUCTION_REQUEST.getKey())
                        .setBody(new InstructionBody(appendingInstructions)).build();
        //向指定节点发送通信,指令重新打包添加
                Aio.send(channelContext, instructionPacket);
            }

接下来消费者线程的功能说明一下:

 private void execute() {
        if (packageLimitSize <= 0) {
            packageLimitSize = instructionQueue.getSize();
        }
    //获取指令集
        List<Instruction> pollData = instructionQueue.pollList(packageLimitSize);
        if (!CollectionUtils.isEmpty(pollData)) {
            for (Instruction ins : pollData) {
                instructionHash.remove(ins.getHash());
            }
            try {
          //指令消息回调,此时会调用区块添加服务
                senderCallBack.consume(pollData);
            } catch (Exception e) {
                logger.error("exec consume error", e);
            }
        }
}

回调的区块添加服务,这里需要进行去重过滤:

public void consume(List<Instruction> instructions) {
              //校验该指令集是否有些已经在处理,如果已在处理就过滤掉
                List<Instruction> appendingInstructions = dbBlockManager.checkAndGetpendingInstructions(instructions);
                log.info("队列消费处理指令数:{} Appending数:{}", instructions.size(), appendingInstructions.size());
                if (CollUtil.isNotEmpty(appendingInstructions)) {
                    BlockBody blockBody = new BlockBody();
                    blockBody.setInstructions(appendingInstructions);
          //发送区块添加请求
                    blockService.addBlock(blockBody);
                }
            }

接下来blockService会调用addBlock发起区块添加请求,会调用节点共识算法进行共识:

public Block addBlock(BlockBody blockBody) {
        //打包区块账户
        String coinBasePubKey = dbBlockManager.getCoinBasePubKey();
        if (StringUtils.isBlank(coinBasePubKey)) {
            throw new BlockException("节点无法生成区块,先创建账户");
        }
    //解析出区块指令集合
        List<Instruction> instructions = blockBody.getInstructions();
        List<String> hashList = instructions.stream().map(Instruction::getHash).collect(Collectors.toList());
        Block lastBlock = dbBlockManager.getLastBlock();
        long blockNumber = 1;
        String lastBlockHash = null;
        if (lastBlock != null) {
            blockNumber = lastBlock.getBlockHeader().getNumber() + 1;
            lastBlockHash = lastBlock.getHash();
        }

        //区块头构建
        BlockHeader blockHeader = new BlockHeader();
        blockHeader.setHashList(hashList);
        //计算所有指令的hashRoot,构建MerkleTree存储
        blockHeader.setHashMerkleRoot(new MerkleTree(hashList).build().getRoot());
        blockHeader.setPublicKey(coinBasePubKey);
        blockHeader.setTimeStamp(CommonUtil.getNow());
        blockHeader.setVersion(version);
        blockHeader.setNumber(blockNumber);
        blockHeader.setHashPreviousBlock(lastBlockHash);

        //区块构建
        Block block = new Block();
        block.setBlockBody(blockBody);
        block.setBlockHeader(blockHeader);
    //区块链的hash映射值,采用Sha256算法,区块链就是通过这个值形成链式关系的
        block.setHash(Sha256.sha256(BlockUtils.getBlockSignStr(block)));

        //不同的共识算法选择
        switch (systemConfig.getConsensusType()) {
            case PBFT:
                BlockPacket blockPacket = new PacketBuilder<>().setType(PacketType.GENERATE_BLOCK_REQUEST.getKey()).setBody(new
                        RpcBlockBody(block)).build();
                //广播给其他人做验证
                packetSender.sendGroup(blockPacket);
                break;
            case RAFT:
                try {
            //RAFT提交共识,异步验证回调结果
                    raftService.set(block.getHash(), block);
                } catch (Exception e) {
                    log.error("raft add block error", e);
                }
                break;
            default:
                break;
        }
        return block;
    }

为什么使用队列?

1、异步化处理,提升接入层性能;

2、防止区块添加失败或者分叉,可重新添加进行重试;

3、指令去重;

4、还可以采取类似kafka消息队列,实现分布式消费,增强处理性能。

网络层

点对点及广播通信

网络层,各节点采用的是长连接,断线后重连,维持心跳包。网络框架使用的是t-io,oschina的知名开源项目。t-io采用了AIO的方式,在大量长连接情况下性能优异,资源占用也很少,并且具备group功能,特别适合于做多个联盟链的SaaS平台。并且包含了心跳包、断线重连、retry等优秀功能。

在项目中,每个节点即是server,又是client,作为server则被其他的N-1个节点连接,作为client则去连接其他N-1个节点的server。同一个联盟,设定一个Group,每次发消息,直接调用sendGroup方法即可。

 public void sendGroup(BlockPacket blockPacket) {
        //对外发出client请求事件
        ApplicationContextProvider.publishEvent(new ClientRequestEvent(blockPacket));
        //发送到一个group
        Aio.sendToGroup(clientGroupContext, GROUP_NAME, blockPacket);
    }

共识算法

分布式共识算法是分布式系统的核心,常见的有Paxos、pbft、bft、raft、pow等。区块链中常见的是POW、POS、DPOS、pbft等。

比特币采用了POW工作量证明,需要耗费大量的资源进行hash运算(挖矿),由矿工来完成生成Block的权利。其他多是采用选举投票的方式来决定谁来生成Block。共同的特点就是只能特定的节点来生成区块,然后广播给其他人。

区块链分如下三类:

私有链:这是指在企业内部部署的区块链应用,所有节点都是可以信任的;

联盟链:半封闭生态的交易网络,存在不对等信任的节点;

公有链:开放生态的交易网络,为联盟链和私有链等提供全球交易网络。

由于私有链是封闭生态的存储系统,因此采用Paxos类共识算法(过半同意)可以达到最优的性能;联盟链有半公开半开放特性,因此拜占庭容错是适合选择之一,例如IBM超级账本项目;对于公有链来说,这种共识算法的要求已经超出了普通分布式系统构建的范畴,再加上交易的特性,因此需要引入更多的安全考虑。所以比特币的POW是个非常好的选择。

我们这里可选的是raft和pbft,分别做私链和联盟链,项目中我使用了修改过的pbft共识算法。

PBFT

先来简单了解pbft:

  1. 从全网节点选举出一个主节点(Leader),新区块由主节点负责生成;

  2. 每个节点把客户端发来的交易向全网广播,主节点将从网络收集到需放在新区块内的多个交易排序后存入列表,并将该列表向全网广播;

  3. 每个节点接收到交易列表后,根据排序模拟执行这些交易。所有交易执行完后,基于交易结果计算新区块的哈希摘要,并向全网广播;

  4. 如果一个节点收到的2f(f为可容忍的拜占庭节点数)个其它节点发来的摘要都和自己相等,就向全网广播一条commit消息;

  5. 如果一个节点收到2f+1条(包括自己)commit消息,即可提交新区块到本地的区块链和状态数据库。

图示如下: 可以看到,传统的pbft是需要先选举出leader的,然后由leader来搜集交易,并打包,然后广播出去。之后各个节点开始对新Block进行校验、投票、累积commit数量,最后落地。

为了提升性能及可用性,这里对pbft做了修改,这是一个联盟,各个节点是平等的,所以不需要让每个节点都生成一个指令后,发给其他节点,再大家选举出一个节点来搜集网络上的指令组合再生成Block,太复杂了,而且又存在了leader节点的故障隐患。

我们对pbft的修改是,不需要选择leader,任何节点都可以构建Block,然后全网广播。其他节点收到该Block请求时即进入Pre-Prepare状态,校验格式、hash、签名等权限,校验通过后,进入Prepare状态,并全网广播状态。待自己累积的各节点Prepare的数量大于2f+1时,进入commit状态,并全网广播该状态。待自己累积的各节点Commit的数量大于2f+1时,认为已达成共识,将Block加入区块链中。

很明显,和有leader时相比,提升了处理性能及解决了leader的节点故障问题,但是缺少了顺序的概念。有leader时能保证Block的顺序,当有并发生成Block的需求时,leader能按照顺序进行广播。譬如大家都已经到number=5的区块了,然后需要再生成2个,有leader时,则会按照6、7的顺序来生成。而没有leader时,则可能发生多节点同时生成6的情况。为了避免分叉,可以优化的方案如下:

由于我们的业务不允许数据丢失,只需要能够最终形成共识入链即可,那么就可以采取回流到队列重新进行共识加入到区块链中;

采取中心化的消息队列处理组件,例如kafka,利用kafka的分区特性实现消息的串行处理,如果在消费时出现异常,仍然回流到消息队列进行重试处理即可。

本系统这个版本采取的是方案1实现。

RAFT

关于RAFT的共识流程可以参看下图: 对比PBFT的共识流程可以发现RAFT的共识流程复杂度要低很多,事实上RAFT的共识流程复杂度是o(n),而PBFT的复杂度是o(n*n),RAFT只是容忍故障节点,但不容忍作恶节点(即虚假返回状态)。由于其共识性能更优,我们在系统中也实现了RAFT共识模块,采取的是git上的一款轻量级的raft-java,它核心实现的共识这里不做过多说明,感兴趣的朋友可以查找相应资料了解下。这里只是解释下共识后的区块添加逻辑,如下:

public void run() {
            ServerMessage.GetRequest getRequest = ServerMessage.GetRequest.newBuilder()
                    .setKey(key).build();
            while (true) {
          //共识返回的结果
                ServerMessage.GetResponse getResponse = RaftServerServiceContainer.getInstance().get(getRequest);
                try {
                    if (getResponse != null) {
                        Block block = Json.toBean(getResponse.getValue(), Block.class);
                        if (block == null) {
                            log.debug("get response block is null value:{}", getResponse.getValue());
                            continue;
                        }
                        //leader 生成block后也需要校验区块合法性
                        ResultPack<String> resultPack = checkBlock(block);
                        if (resultPack.isFailed()) {
                            log.error("区块校验失败:{}", resultPack.comment());
                            return;
                        }
                        //本地广播消息,进行区块添加
                        ApplicationContextProvider.publishEvent(new AddBlockEvent(ConsensusType.RAFT, block));
                        //全链节点发送基于raft共识一致性消息,区块进行全网添加
                        BlockPacket blockPacket = new PacketBuilder<>().setType(PacketType.COMMIT_RAFT_BLOCK_REQUEST.getKey()).setBody(new
                                RpcBlockBody(block)).build();
                        packetSender.sendGroup(blockPacket);
                        return;
                    } else {
                        log.error("get request failed, key={}\n", key);
                    }
                } catch (Exception ex) {
                    log.error("getResponse error", ex);
                }
            }
        }

存储层

我们存储层采取的是rocksDB和levelDB,他们本质上都是一致的,rocksDB是基于levelDB优化而来的,比特币采取的是levelDB实现,他们都非常适合这种变长KV小规模的元数据存取。

我们这里封装了一个DbStore接口,定义了常见的put、get及delete操作,rocksDB及levelDB分别实现这些接口即可,涉及到的区块落盘的逻辑如下:

//校验区块
        RpcCheckBlockBody rpcCheckBlockBody = checkerManager.check(block);
        if (rpcCheckBlockBody.getCode() != RpcCheckBlockResult.SUCCESS.getCode()) {
            logger.warn("block check error check:{} block:{}", rpcCheckBlockBody, block);
            return;
        }
        //如果没有上一区块,说明该块就是创世块
        if (block.getBlockHeader().getHashPreviousBlock() == null) {
            dbStore.put(Constants.KEY_FIRST_BLOCK, hash);
        } else {
            //保存上一区块对该区块的key value映射
            dbStore.put(Constants.KEY_BLOCK_NEXT_PREFIX + block.getBlockHeader().getHashPreviousBlock(), hash);
        }
        //存入rocksDB
        dbStore.put(hash, Json.toJson(block));
        //设置最后一个block的key value
        dbStore.put(Constants.KEY_LAST_BLOCK, hash);
        //设置number对应的hash
        dbStore.put(Constants.KEY_BLOCK_NUMBER + block.getBlockHeader().getNumber(), hash);

总结

由于篇幅所限,本文讨论了一个区块链的几个核心层的主要实现逻辑及优化,对于P2P网络的Merkle Tree实现以及网络节点之间的数据同步和一致性机制没有涉及,大家如果感兴趣后续我们可以继续接着聊。

附注:此平台在os china开源md_blockchain项目上优化而来,同时也感谢jasperwang参与对源码优化的贡献。

推荐阅读 KAFKA系列:

kafka是如何做到百万级高并发低迟延的?

kafka生产者的蓄水池机制

kafka生产者的消息发送机制

Kafka生产者分区优化

架构实践系列:

高性能系统的读写分离怎么做?

分布式唯一ID生成方案 扫码关注我们 互联网架构师之路

过滤技术杂质,只为精品呈现

如果喜欢,请关注加星喔

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  kG7ef0NqClb6   2023年11月13日   32   0   0 java
  u2N3sQ7nC0dn   2023年11月13日   28   0   0 java
  rCd1NYtlhh0U   2023年11月13日   31   0   0 java
  rCd1NYtlhh0U   2023年11月13日   35   0   0 java
  Ydqp9G8hCwM0   2023年11月13日   35   0   0 java
zYlHB79h2aVW