用java实现来简单解释mysql的mvcc机制

已有 330人阅读此文 - - 数据库

前言

    Mysql的innodb引擎中,在可重复读及读已提交两种事务级别中实现了一个MVCC机制(多版本并发控制),在默认情况下会在每一个表中增加了三个字段,一个DB_TRX_ID(创建这条记录的的事务号),DB_ROLL_PTR(记录恢复的指向undo log的指针,来用事务回滚恢复数据),DB_ROW_ID(如果没有创建主键聚簇索引,刚使用这个字段,不然这个字段不会出现).

   使用大概自己的理解去用代码演示了一下过程,但是对照官网的解释,感觉还是有点不对,希望指正.

/**
 * @Description: 模拟MVCC机制
 * @author: winter
 * @Date: 2019-08-26
 * @Time: 19:55
 */
public class MvccMain {
    volatile List<TableRow> rows = new ArrayList<>();
    /**
     * 等待锁
     */
    volatile ConcurrentMap waitLockMap = new ConcurrentHashMap();
    //当前事务id,肯定 比初始化数据里的createVersion的值要大
    volatile int txId = 200000;
    /**
     * 自增id
     */
    volatile int autoIncrementId = 8;

    //为了方便操作,加一个表锁
    ReentrantLock lock = new ReentrantLock();

    /**
     * 初始化数据
     */
    @Before
    public void initData() {
        rows.add(new TableRow(1, "lilei", 10001, null));
        rows.add(new TableRow(2, "hanmeimei", 10001, null));
        rows.add(new TableRow(3, "lily", 10022, null));
        rows.add(new TableRow(4, "lucy", 10023, null));
        rows.add(new TableRow(5, "jim", 10025, null));
        rows.add(new TableRow(6, "hantao", 10026, null));
        rows.add(new TableRow(7, "mr green", 10027, null));
        rows.add(new TableRow(8, "miss green", 10027, null));
    }

    /**
     * 多线程并发事务
     */
    @Test
    public void multiThreadBeginTX() {
        Thread[] threads = new Thread[10];
        for (int n = 0; n < threads.length; n++) {
            threads[n] = new Thread(() -> beginTX());
            threads[n].start();
        }
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @After
    public void after() {
        System.out.println("after最终结果");
        System.out.println(JSONObject.toJSONString(waitLockMap));
        System.out.println(JSONObject.toJSONString(rows.stream().sorted(Comparator.comparing(TableRow::getId)).collect(Collectors.toList())));
    }

    @Test
    public void beginTX() {
        //开始事务时创建一个事务版本号,mysql中其实是执行第一个sql语句时,会创建一个事务Id
        int curTxId = executeSelectSql();
        TXData txData = new TXData(curTxId);
        //创建一个快照
        txData.setSnapshotList(select(rows, curTxId));
        System.out.println("事务Id:  " + curTxId + ":快照数据:\n" + JSONObject.toJSONString(txData.getSnapshotList()));
        curTxId = executeRUDSql();
        txData.setTxId(curTxId);
        //删除操作
        delete(txData);
        //插入操作
        insert(txData);
        //更新操作
        update(txData);
        System.out.println("事务Id====:  " + curTxId + ":提交前尝试查询数据" + JSONObject.toJSONString(select(rows, txData.getTxId())));
        try {
            Thread.sleep(new Random().nextInt(5) * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //提交操作
        commit(txData);
    }

    /**
     * 假装执行了一个查询sql,这里会生成一个很大的事务ID ,目的应该是保证一次性能查询到所有满足条件的数据
     *
     * @return
     */
    private int executeSelectSql() {
        return 2000020000 + txId + 1;
    }

    /**
     * 执行一个 删除  更新 或 插入的sql
     *
     * @return
     */
    private int executeRUDSql() {
        txId = txId + 1;
        return txId;
    }

    /**
     * 提交事务
     */
    private synchronized void commit(TXData txData) {
        //表锁 提交的时候只能有一个线程可以操作
        lock.lock();
        //先于数据持久化,先把日志进行持久化 写入redoLog日志,遇到故障可以从日志中恢复
        redoLog();
        System.out.println("\n\n\ncommit start=========================================================");
        System.out.println("commit事务Id=" + txData.getTxId() + "提交前从rows数据中查询一下数据比当前事务的快照数据对比:" + JSONObject.toJSONString(select(rows, txData.getTxId())));
        System.out.println("commit事务Id=" + txData.getTxId() + "提交前显示一下快照数据与从rows取出的数据进行对比看差别:" + JSONObject.toJSONString(txData.getSnapshotList()));

        System.out.println("\ncommit事务Id=" + txData.getTxId() + "\n|______待删除的数据\n" + JSONObject.toJSONString(txData.getDelList()) + "\n|______待插入的数据 \n " + JSONObject.toJSONString(txData.getInsertList()));

        txData.getDelList().stream().map(o -> o.getId()).distinct().forEach(id -> {
            //如果待删除的数据还有存在,就删除
            rows.removeIf(row -> row.getId().equals(id));
            waitLockMap.remove(id);
        });
        //判断当前事务id 是不是最大的,如果不是更新为最大的 实际计算比这个要复杂
        final AtomicInteger curNewTxId = new AtomicInteger(0);
        if (!(txData.getTxId() > txId)) {
            txId = txId + 1;
            curNewTxId.set(txId);
        }
        //提交事务时重新获取事务版本号
        System.out.println("commit事务Id=" + txData.getTxId() + " 更新后 事务id=" + curNewTxId);
        txData.getInsertList().stream().forEach(item -> {
            waitLockMap.remove(item.getId());
            item.setCreateVersion(curNewTxId.get());
            item.setDeleteVersion(null);
            rows.add(item);
        });
        System.out.println("commit事务Id=" + txData.getTxId() + " 事务提交后 rows中的数据:\n" + JSONObject.toJSONString(rows.stream().sorted(Comparator.comparing(TableRow::getId)).collect(Collectors.toList())));
        System.out.println("commit end=========================================\n\n\n");
        lock.unlock();
    }


    /**
     * 更新一条记录
     */
    private synchronized void update(TXData txData) {
        int updateId = new Random().nextInt(autoIncrementId) + 1;//随机生成一个id
        System.out.println("update事务Id=" + txData.getTxId() + ":开始更新id=" + updateId + "的数据");
        AtomicInteger i = new AtomicInteger();
        List<TableRow> temp = new ArrayList<>();
        txData.getSnapshotList().stream().anyMatch(row -> {
            i.getAndIncrement();
            //判断记录是否存在
            if (row.getId().equals(updateId) && row.getDeleteVersion() == null) {
                if (!waitLockMap.containsKey(updateId)) {
                    waitLockMap.putIfAbsent(updateId, "del_insert_4update");
                    System.out.println("update事务Id=" + txData.getTxId() + ":更新第一步:标记删除id=" + updateId + "的数据");
                    //先把这条记录标记删除
                    row.setDeleteVersion(txData.getTxId());
                    txData.addDelList(row);
                    System.out.println("update事务Id=" + txData.getTxId() + ":更新第二步:新增一个id=" + updateId + "的数据");
                    TableRow insertRow = new TableRow(updateId, "update_" + "from_" + row.getName(), txData.getTxId(), null);
                    temp.add(insertRow);
                    //向新增队列里添加一条数据
                    txData.addInsertList(insertRow);
                    //向快照里添加一条数据
                    txData.addSnapshotList(insertRow);
                    //记录undoLog 以备rollback使用
                    undoLog("update", row, txData.getTxId());
                    return true;
                } else {
                    System.out.println("被加锁了,不能更新id=" + updateId);
                }
                return true;
            }
            return false;
        });
        if (i.get() == txData.getSnapshotList().size()) {
            System.out.println("update事务Id=" + txData.getTxId() + ":未查到id=" + updateId + " 的数据");
        } else {
            System.out.println("update事务Id=" + txData.getTxId() + ":更新id=" + updateId + " 后的数据\n " + JSONObject.toJSONString(select(txData.getSnapshotList(), txData.getTxId())));
        }
    }

    /**
     * 新增数据
     *
     * @return
     */
    private synchronized void insert(TXData txData) {
        autoIncrementId++;
        if (!waitLockMap.containsKey(autoIncrementId)) {
            waitLockMap.putIfAbsent(autoIncrementId, "insert");
            TableRow row = new TableRow(autoIncrementId, "new_" + autoIncrementId, txData.getTxId(), null);
            txData.addSnapshotList(row);
            //记录undoLog 以备rollback使用
            undoLog("insert", row, txData.getTxId());
            txData.addInsertList(new TableRow(autoIncrementId, "new_" + autoIncrementId, txData.getTxId(), null));
            System.out.println("insert事务Id=" + txData.getTxId() + ":新增一条id=" + autoIncrementId + "的数据");
            System.out.println(JSONObject.toJSONString(select(txData.getSnapshotList(), txData.getTxId())));
        } else {
            System.out.println("被加锁了,不能插入");
        }
    }

    /**
     * 删除数据
     *
     * @return
     */
    private synchronized void delete(TXData txData) {
        //随机删除一个数据,未提交前只是做一个删除标记 把当前deleteVersion设置为当前事务id
        int delId = new Random().nextInt(7) + 1;//随机生成一个1-8的id
        txData.getSnapshotList().stream().anyMatch(o -> {
            if (o.getId().equals(delId)) {
                //如果已被标记删除,则不作操作, 实际数据库中实现 如果有另一个标记了,要进行锁等待
                if (!waitLockMap.containsKey(delId)) {
                    waitLockMap.putIfAbsent(delId, "delete");
                    //标记删除
                    o.setDeleteVersion(txData.getTxId());
                    //加入删除列表
                    txData.addDelList(o);
                    System.out.println("delete事务Id=" + txData.getTxId() + ":标记删除id=" + delId + "后的数据:");
                    //当前查询输出的结果看似已被删除 输出的没有指定id的数据了
                    System.out.println(JSONObject.toJSONString(select(txData.getSnapshotList(), txData.getTxId())));
                    //记录undoLog 以备rollback使用
                    undoLog("delete", o, txData.getTxId());
                    return true;
                } else {
                    System.out.println("被加锁了,不能删除id=" + delId);
                }
            }
            return false;
        });

    }

    /**
     * 生成一条undoLog
     * undoLog日志 是用来rollback的,
     * 如果操作是插入操作undoLog中会存入一条对应的删除操作,
     * 如果是更新删除undoLog会存入一条还原操作,
     * 如果是删除操作undoLog中会存入一条插入操作
     *
     * @param opType 当前操作类型 insert update delete
     * @param row    原始数据
     * @param txId   事务id
     */
    private void undoLog(String opType, TableRow row, int txId) {
        switch (opType) {
            case "insert":
                System.out.println("生成undoLog日志:delete from table where id=" + row.getId());
                break;
            case "update":
                System.out.println("生成undoLog日志:update table set name ='" + row.getName() + "' where id=" + row.getId());
                break;
            case "delete":
                System.out.println("生成undoLog日志:insert into table(id,name) value(" + row.getId() + ",'" + row.getName() + "')");
                break;
        }
    }

    /**
     * 处理redoLog
     */
    private void redoLog() {
        System.out.println("innodb_flush_log_at_trx_commit可以为 0 1 2");
        System.out.println("当设置为1的时候,事务每次提交都会将log buffer中的日志写入os buffer并调用fsync()刷到log file on disk中。这种方式即使系统崩溃也不会丢失任何数据,但是因为每次提交都写入磁盘,IO的性能较差。");
        System.out.println("当设置为0的时候,事务提交时不会将log buffer中日志写入到os buffer,而是每秒写入os buffer并调用fsync()写入到log file on disk中。也就是说设置为0时是(大约)每秒刷新写入到磁盘中的,当系统崩溃,会丢失1秒钟的数据。");
        System.out.println("当设置为2的时候,每次提交都仅写入到os buffer,然后是每秒调用fsync()将os buffer中的日志写入到log file on disk。");
    }

    /**
     * 查询满足条件的数据
     *
     * @param txId 当前事务id
     * @return
     */
    private synchronized List<TableRow> select(List<TableRow> sourceData, Integer txId) {
        List<TableRow> lst = new ArrayList<>();
        sourceData.stream().forEach(o -> {
                    //注意这段判断是MVCC实现的快照的重要判断 保证了可重复读
                    if (o.getCreateVersion() <= txId && (o.getDeleteVersion() == null || o.getDeleteVersion() > txId)) {
                        lst.add(new TableRow(o.getId(), o.getName(), o.getCreateVersion(), o.getDeleteVersion()));
                    }
                }
        );
        return lst;
    }
}


来源:自成e家 出处:用java实现来简单解释mysql的mvcc机制
本文由 自成e家 原创 ,转载请注明出处,你的支持是我继续写作、分享的最大动力!
期待你一针见血的评论,Come on!