关于用一周时间学习分库分表技术的一些分享

MySQL分库分表、Spark查询分库分表数据方案

1、为什么需要对Mysql进行分库分表

随着数据平台数据接入量的持续增长,对于海量数据管理,查询、插入、更新操作都将耗尽单节点服务器的性能,影响服务,采用读写分离、对数据进行分库分表的方式进行数据管理是是有必要的。

2、技术方案选型

目前主流的数据分库分表框架有这两种主流夸:mycat、shardingsphere。

2.1 mycat、shardingsphere-jdbc对比

2.1.1 mycat

优点:

1、mycat是作为一个中间件安装在服务器进行服务,通过代码里连接mycat,由mycat做sql改写分发结果归集,归并数据结果完全解耦,保证数据库的安全性,支持多种开发语言的连接。

2、不需要调整代码,只需要连接mycat服务端,即可实现分库分表,应用端耦合性低。

缺点:

1、mycat通过拦截应用端发起的sql语句进行解析后服务,效率低。

2、mycat进行分库分表不够灵活,都需要在服务器进行文件配置,运维成本高。

2.1.2 shardingsphere-jdbc

优点:

1、采用shardingsphere-jdbc不需要单独部署,在java的JDBC层提供额外的服务,以jar包形式提供服务,不需要额外部署和依赖。完全兼容JDBC和各种ORM框架。

2、支持第三方的数据库连接池:DBCP、C3P0,BoneCP,HikariCP。

3、shardingsphere-jdbc更加灵活。

缺点:

1、代码侵入高,需要集成在应用端所以耦合性高。

2、开发成本高。

2.2 技术选择shardingsphere-jdbc

两种技术方案选择上,综合考虑运行稳定性,部署的便捷性,事务支持、性能损耗、以及框架社区活跃度上,最终选择shardingsphere-jdbc。

3、shardingsphere-jdbc架构

shardingsphere-jdbc对于java应用友好度极佳,可通过java api进行灵活的动态配置。

4、shardingsphere-jdbc

使用驱动方式主要有两种,分别是通过yml文件进行数据源信息配置和使用Java API进行配置

yml文件配置方便但是不够灵活,Java API是最基础的方式,其他方式最终也会转换成为Java API的方式,适用于动态配置的情况。根据项目特点故采用Java API的方式。

4.1 spring-boot-starter构建数据源

全部通过配置依赖文件进行数据源的构建,不够灵活

使用版本:

shardingsphere-jdbc-core-spring-boot-starter:5.0.0-alpha

4.1.1 引入依赖:

配合mybatis-plus使用,不做mybatis-plus依赖引入说明

gradle:

compile('org.apache.shardingsphere:shardingsphere-jdbc-core-spring-boot-starter:5.0.0-alpha')

maven:

<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.0.0-alpha</version>
</dependency>

在spring中使用shardingsphere的数据源:

@Resource
private DataSource dataSource;

4.1.2 数据源配置:

以编写的demo为示例,进行yml文件配置说明;

  #shardingsphere配置
  shardingsphere:
    datasource:
      # 数据源公共参数
      common:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        username: root
        password: root
      names: ds0,ds1
      ds0:
        jdbc-url: jdbc:mysql://192.168.238.129:3307/gmall_ums?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8
      ds1:
        jdbc-url: jdbc:mysql://192.168.238.129:3316/gmall_ums?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8
    rules:
      sharding:
        # 数据库分库算法名称,依赖字段字段
        default-database-strategy:
          standard:
            sharding-column: order_id
            sharding-algorithm-name: database-inline
        # 广播表
        broadcast-tables: t_address
        # 绑定分库分表 表
        binding-tables: t_order,t_order_item
        # 分布式序列策略配置
        key-generators:
          snowflake:
            type: SNOWFLAKE
            # 必须配置 props,否则报错找不到算法信息
            props:
              worker-id: 123
        tables:
          # t_order 表的分区
          t_order:
            actual-data-nodes: ds$->{0..1}.t_order_$->{0..1}
            # 分表策略
            table-strategy:
              standard:
                sharding-column: id
                sharding-algorithm-name: t-order-inline
            # 分布式策略配置
            key-generate-strategy:
              column: id
              keygenerator-name: snowflake
          t_order_item:
            actual-data-nodes: ds$->{0..1}.t_order_item_$->{0..1}
            table-strategy:
              standard:
                sharding-column: order_id
                sharding-algorithm-name: t-order-item-inline
        # 算法信息
        sharding-algorithms:
          database-inline:
            type: INLINE
            props:
              algorithm-expression: ds$->{user_id % 2}
          t-order-inline:
            type: INLINE
            props:
              algorithm-expression: t_order_$->{id % 2}
          t-order-item-inline:
            type: INLINE
            props:
              algorithm-expression: t_order_item_$->{order_id % 2}
    # 打开日志
    props:
      sql:
        show: true
    enabled: true
  # 覆盖本地的 bean
  main:
    allow-bean-definition-overriding: true

配置文件支持inline表达式

4.2 shardingsphere-jdbc-core-spring

代码层面实现数据源配置

使用版本:

shardingsphere-jdbc-core-spring:5.0.0-alpha

4.2.1 引入依赖

gradle:

compile 'org.apache.shardingsphere:shardingsphere-jdbc-core-spring:5.0.0-alpha'

maven:

<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring</artifactId>
<version>5.0.0-alpha</version>
</dependency>

4.2.2 构建数据源

在yml文件配置关于数据源驱动、池、连接信息,不再配置固有的分库分表规则

分库规则、分表规则通过配置可以通过代码的方式进行灵活配置

完整信息如下:

@Component
public class ShardingsphereDatasource {
    private static Logger logger = LoggerFactory.getLogger(ShardingsphereDatasource.class);
    @Value("${shardingsphere.datasource.common.driver-class-name}")
    private String driver;
    @Value("${shardingsphere.datasource.common.sharding-count}")
    private String mod;
    @Value("${shardingsphere.datasource.names}")
    private String names;
    @Value("${shardingsphere.datasource.ds0.jdbc-url}")
    private String url1;
    @Value("${shardingsphere.datasource.ds0.username}")
    private String username1;
    @Value("${shardingsphere.datasource.ds0.password}")
    private String password1;
    @Value("${shardingsphere.datasource.ds1.jdbc-url}")
    private String url2;
    @Value("${shardingsphere.datasource.ds1.username}")
    private String username2;
    @Value("${shardingsphere.datasource.ds1.password}")
    private String password2;
    public DataSource dataSource(String code, String col) {
        Properties properties = new Properties();
        properties.setProperty("sql-show", "true");
        try {
            return ShardingSphereDataSourceFactory.createDataSource(dataSourceMap(), Collections.singleton(tableRules(code, col)), properties);
        } catch (SQLException e) {
            return null;
        }
    }
    public Map<String, DataSource> dataSourceMap() {
        Map<String, DataSource> dataSourceMap = new HashMap<>();
        HikariDataSource dataSource1 = new HikariDataSource();
        dataSource1.setJdbcUrl(url1);
        dataSource1.setDriverClassName(driver);
        dataSource1.setUsername(username1);
        dataSource1.setPassword(password1);
        dataSourceMap.put("ds0", dataSource1);
        HikariDataSource dataSource2 = new HikariDataSource();
        dataSource2.setJdbcUrl(url2);
        dataSource2.setDriverClassName(driver);
        dataSource2.setUsername(username2);
        dataSource2.setPassword(password2);
        dataSourceMap.put("ds1", dataSource2);
        return dataSourceMap;
    }
    public ShardingRuleConfiguration tableRules(String code, String col) {
        ShardingRuleConfiguration configuration = new ShardingRuleConfiguration();
        ShardingAutoTableRuleConfiguration rules = getShardingAutoTableRuleConfiguration(code, col);
        configuration.setAutoTables(Collections.singleton(rules));
        Map<String, ShardingSphereAlgorithmConfiguration> shardingAlgorithms = new LinkedHashMap<>();
        Properties shardingAlgorithmsProps = new Properties();
        shardingAlgorithmsProps.setProperty("sharding-count", mod);
        shardingAlgorithms.put("mod", new ShardingSphereAlgorithmConfiguration("MOD", shardingAlgorithmsProps));
        configuration.setShardingAlgorithms(shardingAlgorithms);
        Map<String, ShardingSphereAlgorithmConfiguration> keyGenerators = new LinkedHashMap<>();
        Properties snows = new Properties();
        snows.setProperty("worker-id", "123");
        keyGenerators.put("snowflake", new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", snows));
        configuration.setKeyGenerators(keyGenerators);
        return configuration;
    }
    @NotNull
    public ShardingAutoTableRuleConfiguration getShardingAutoTableRuleConfiguration(String code, String col) {
        ShardingAutoTableRuleConfiguration rules = new ShardingAutoTableRuleConfiguration(code, names);
        rules.setShardingStrategy(new StandardShardingStrategyConfiguration(col, "mod"));
        return rules;
    }
}

部分代码说明:

// 调用ShardingSphereDataSourceFactory创建一个由shardingsphere管理的数据源
ShardingSphereDataSourceFactory.createDataSource(dataSourceMap(), Collections.singleton(tableRules(code, col)), properties)
// 组装真实数据源,交给shardingsphere进行分库分表时进行正确的路由信息
public Map<String, DataSource> dataSourceMap() {  
Map<String, DataSource> dataSourceMap = new HashMap<>();  
// 省略真实数据源的配置信息  
return dataSourceMap;
}
public ShardingRuleConfiguration tableRules(String code, String col) {
// 分区规则
    ShardingRuleConfiguration configuration = new ShardingRuleConfiguration();
// 项目采用自动分区规则,调用getShardingAutoTableRuleConfiguration
// 指定数据源为ds0,ds1,分表算法为MOD取模
// 根据配置sharding-count指定分表个数
    ShardingAutoTableRuleConfiguration rules = getShardingAutoTableRuleConfiguration(code, col);
    configuration.setAutoTables(Collections.singleton(rules));
// 配置算法信息
    Map<String, ShardingSphereAlgorithmConfiguration> shardingAlgorithms = new LinkedHashMap<>();
    Properties shardingAlgorithmsProps = new Properties();
    shardingAlgorithmsProps.setProperty("sharding-count", mod);
    shardingAlgorithms.put("mod", new ShardingSphereAlgorithmConfiguration("MOD", shardingAlgorithmsProps));
    configuration.setShardingAlgorithms(shardingAlgorithms);
// 配置分布式序列算法
    Map<String, ShardingSphereAlgorithmConfiguration> keyGenerators = new LinkedHashMap<>();
    Properties snows = new Properties();
    snows.setProperty("worker-id", "123");
    keyGenerators.put("snowflake", new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", snows));
    configuration.setKeyGenerators(keyGenerators);
    return configuration;
}

4.2.3 使用分区数据源

@Autowired
private ShardingsphereDatasource shardingsphereDatasource;

通过传入逻辑表名,分片键可以实现动态实现分库分表。查询时只需要使用逻辑表名也即传入的表名即可实现数据源的查询功能,shardingsphere在jbdc层会根据数据源信息路由到各自的库表进行查询,在进行拼接。插入数据时也只需要逻辑表名。

创建表时,需要分库分表必须要要有分片键,是不可缺参数

查询、和插入数据只要逻辑表名即可。

public void test(String table, String col) {  
      try {  
           Connection connection = shardingsphereDatasource.dataSource(table, col).getConnection(); 
           String creatSql = "create table " + table + "(id int ,name varchar(20),age int);";   
           String delSql = "drop table " + table + ";";
           PreparedStatement statement = connection.prepareStatement(delSql);   
           statement.execute(); 
           statement.close();
           connection.close();
      } catch (SQLException e) {
           e.printStackTrace();  
      }}

4.2.4 分库分表结果

sharding-count:4

table:shardingshphere

col:id

根据配置生成表4张,插入9条数据,shardingsphere框架分别根据MOD取余插入不同库表

查询时shardingsphere进行路由后组装数据时目前不能解决数据排序功能

5、分库分表技术在spark中进行调用分析

5.1.1 问题分析:

由于shardingsphere管理库表是jdbc层进行的,不直接操作真实数据源,在spark侧进行load数据时无法通过逻辑表名进行数据操作,spark侧底层实现是用JdbcUtils这个类进行统一Jdbc侧数据查询。

查看源码只得知scala进行数据连接时调用的是java.sql包下的connect接口,也即通过调用数据源连接接口生成数据源连接,由于JdbcUtils这个类是不可继承的(object类),无法通过重写这个类改成调用shardingsphere框架进行数据源连接。

5.1.2 解决方法

采用重现实现sprak中DatasourceV2接口进行数据源管理,目前初步实现基于shardingsphere的数据源查询功能,后续实际使用还需细化插入数据功能,此处仅仅将此数据源管理思路进行说明。

第一步:创建ShardingsphereJdbc类实现DatasourceV2和ReadSupport接口

public class ShardingsphereJdbc implements DataSourceV2, ReadSupport {
    /**
     * 指定数据源StructType,后续可以根据实际情况进行动态生成
     */
    public static final StructType SCHEMA = new StructType(
            new StructField[]{
                    new StructField("id", DataTypes.IntegerType, false, new MetadataBuilder().build()),
                    new StructField("age", DataTypes.IntegerType, false, new MetadataBuilder().build())
            }
    );
    @Override
    public DataSourceReader createReader(DataSourceOptions dataSourceOptions) {
        Optional<String> db = dataSourceOptions.get("db");
        Optional<String> table = dataSourceOptions.get("table");
        return new ExampleDataSourceReader(db, table);
    }
}

第二步:创建ExampleDataSourceReader类实现DataSourceReader接口

在此处改写构造器去接收参数,以便动态知晓查询的库表信息

public class ExampleDataSourceReader implements DataSourceReader {
    public static String dbCode;
    public static String dbTable;
    public ExampleDataSourceReader(Optional<String> db, Optional<String> table) {
        dbCode = db.get();
        dbTable = table.get();
    }
    @Override
    public StructType readSchema() {
        return ShardingsphereJdbc.SCHEMA;
    }
    @Override
    public List<InputPartition<InternalRow>> planInputPartitions() {
        List<InputPartition<InternalRow>> partitions = new ArrayList();
        // 全局唯一的数据分区
        partitions.add(new ExampleInputPartition(dbCode, dbTable));
        return partitions;
    }
}

第三步:创建ExampleInputPartition实现InputPartition和Serializable(序列化更快,后续可以引入scala的序列化,目前还未进行大量数据源读写,先不做测试)

改写构造器接收库表参数

public class ExampleInputPartition implements InputPartition, Serializable {
    public static String db;
    public static String table;
    public ExampleInputPartition(String dbCode, String dbTable) {
        db = String.valueOf(dbCode);
        table = String.valueOf(dbTable);
    }
    @SneakyThrows
    @Override
    public InputPartitionReader createPartitionReader() {
        return new ExamplePartitionReader(db,table);
    }
}

第四步:创建ExamplePartitionReader实现InputPartitionReader 接口

构造器内调用data方法,data中使用shardingsphere进行数据查询,将数据组装为一个List后交给InputPartitionReader中get()和next()接口去管理(需要自己实现方法,目前还未实现优雅的数据读取),即可进行数据的读操作。

public class ExamplePartitionReader implements InputPartitionReader<InternalRow> {
    public ExamplePartitionReader(String db, String table) {
        data(db, table);
    }
    /**
     * 构建一个数据结构
     */
    private static class Person {
        private int id;
        private int age;
        public Person(int id, int age) {
            this.id = id;
            this.age = age;
        }
        public int getAge() {
            return age;
        }
    }
    private static List<Person> people = new ArrayList<>();
    private void data(String db, String table) {
        Connection connection;
        PreparedStatement statement;
        {
            try {
                String sql = "select id,age from " + table;
                ShardingsphereDatasource datasource = new ShardingsphereDatasource();
                connection = datasource.dataSource(table, "id").getConnection();
                statement = connection.prepareStatement(sql);
                ResultSet set = statement.executeQuery();
                while (set.next()) {
                    people.add(new Person(set.getInt("id"), set.getInt("age")));
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    private int index = 0;
    @SneakyThrows
    @Override
    public boolean next() {
        return index < people.size();
    }
    @SneakyThrows
    @Override
    public InternalRow get() {
        GenericInternalRow genericInternalRow;
        // 进行connection的操作
        genericInternalRow = new GenericInternalRow(new Object[]{people.get(index).id, people.get(index).age});
        index++;
        return genericInternalRow;
    }
    @Override
    public void close() throws IOException {
    }
}

5.1.3 使用方法

使配置format参数调用重写的DatasourceV2接口的ShardingsphereJdbc类

配置option参数,目前未实现db的配置使用,因为涉及到shardingsphere数据源管理的改写,暂时不做。table参数根据不同逻辑表名可查询正确数据。

private void mysqlRead(String db, String tableCode) {
    Dataset<Row> dataset = sparkSession.read()
         .format("com.wk.data.spark.service.demo.executor.ShardingsphereJdbc")
            .option("db", "db")
            .option("table", "shardingsphere")
            .load();
    dataset.show();
}

查询示例:

可以观察到数据源已经交给shardingsphere管理,且查询出数据正确,正如上文所说,数据排序功能有所欠缺。

2 打赏
打赏 50 积分后可见