精华内容
下载资源
问答
  • 存代码实现Oracle不同数据库之间的数据迁移 不用Oracle数据库之间数据迁移的实现,只要主表的数据发生了变化,那么就要自动同步到从库中 mvn依赖 <dependency> <groupId>commons-dbutils</groupId...

    纯代码实现Oracle不同数据库之间的数据迁移

    不同Oracle数据库之间数据迁移的实现,只要主表的数据发生了变化,那么就要自动同步到从库中

    mvn依赖

    <dependency>
        <groupId>commons-dbutils</groupId>
        <artifactId>commons-dbutils</artifactId>
        <version>1.7</version>
    </dependency>
    

    数据库连接

    /**
     * 数据库连接类
     * @author pihao
     *
     */
    public class DBTools {
    	static {
    		try {
    			Class.forName("oracle.jdbc.driver.OracleDriver");
    		} catch (ClassNotFoundException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 创建一个数据库链接
    	 * 
    	 * @return
    	 * @throws ClassNotFoundException 
    	 */
    	public static Connection getConnection(String url, String username, String password){
    		Connection conn = null;
    		try {
    			conn = DriverManager.getConnection(url, username, password);
    		} catch (SQLException e) {
    			e.printStackTrace();
    		}
    		return conn;
    	}
    
    	/**
    	 * Close a <code>Connection</code>, avoid closing if null.
    	 * 
    	 * @param conn
    	 *            Connection to close.
    	 * @throws SQLException
    	 *             if a database access error occurs
    	 */
    	public static void close(Connection conn) throws SQLException {
    		if (conn != null) {
    			conn.close();
    		}
    	}
    
    	/**
    	 * Close a <code>Connection</code>, avoid closing if null and hide any
    	 * SQLExceptions that occur.
    	 * 
    	 * @param conn
    	 *            Connection to close.
    	 */
    	public static void closeQuietly(Connection conn) {
    		try {
    			close(conn);
    		} catch (SQLException e) { // NOPMD
    			// quiet
    			e.printStackTrace();
    		}
    	}
    }
    
    

    实现数据迁移的核心方法

    @SuppressWarnings("all")
    public class TableDao {
    	private static final Logger logger  =LoggerFactory.getLogger(TableDao.class);
    	private static QueryRunner query = new QueryRunner();
    	private static String pksSql;
    	
    	static {
    		try {
    			pksSql = Global.getConfig("pksSql");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		
    	}
    	
        //pksSql = select cu.COLUMN_NAME from user_cons_columns cu, user_constraints au where cu.constraint_name = au.constraint_name and au.constraint_type = 'P' and au.table_name = ?
    	
    	/**
    	 * 根据表名获取表的主键列名
    	 * @param tableName
    	 * @return
    	 */
    	public static List<String> findPrimaryKeysByTableName(String tableName,ConnectionInfo info){
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		tableName = tableName.toUpperCase();
    		List<Object> primaryKeys = null;
    		List<String> pks = null;
    		try {
    			primaryKeys = query.query(conn,pksSql,new ColumnListHandler<>("column_name"),tableName);
    			pks = new ArrayList<>();
    			for (Object key : primaryKeys) {
    				String k = (String) key;
    				pks.add(k.toLowerCase());
    			}
    			return pks;
    		} catch (Exception e) {
    			logger.error("获取  {} 的主键列失败,sql为:{},原因:{}",tableName,pksSql,e.getMessage());
    		} finally {
    			DBTools.closeQuietly(conn);
    		}
    		
    		return pks;
    	}
    	
    	/**
    	 * 根据表名查找该表的列的信息
    	 * @param tableName
    	 * @return
    	 */
    	public static List<ColumnInfo> findColumninfoListByTableName(String tableName,ConnectionInfo info) {
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		String columnsSql = "select * from user_tab_columns where table_name = ?";
    		List<ColumnInfo> list = null;
    		try {
    			BeanProcessor bean = new GenerousBeanProcessor();
    			RowProcessor processor = new BasicRowProcessor(bean);
    			list = query.query(conn, columnsSql, new BeanListHandler<ColumnInfo>(ColumnInfo.class,processor),
    					new Object[] { tableName.toUpperCase() });
    		} catch (SQLException e) {
    			logger.error("获取  {} 的列信息失败,sql为:{},错误:{}",tableName,columnsSql,e.getMessage());
    		} finally {
    			DBTools.closeQuietly(conn);
    		}
    		return list;
    	}
    	
    	
    	/**
    	 * 根据表名查找到该表的所有字段的名称
    	 * @param tableName
    	 * @return
    	 */
    	public static List<String> findColumesNamesByTableName(String tableName,ConnectionInfo info){
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		String sql="select * from user_tab_columns where table_name = ?";
    		tableName = tableName.toUpperCase();
    		List<Object> columes = null;
    		List<String> cols = null;
    		try {
    			columes = query.query(conn, sql,new ColumnListHandler<>("column_name"),tableName);
    			cols = new ArrayList<>();
    			for (Object col : columes) {
    				String co = (String) col;
    				cols.add(co.toLowerCase());
    			}
    			return cols;
    		} catch (Exception e) {
    			logger.error("获取 {} 的字段名称失败,sql为{},错误为:{}",tableName,sql,e.getMessage());
    		} finally {
    			DBTools.closeQuietly(conn);
    		}
    		
    		
    		return cols;
    	}
    	/**
    	 * 根据主键找到一条记录
    	 * @param tableName
    	 * @param pks
    	 * @param params
    	 * @return
    	 */
    	public static Map<String,Object> findOneMapByPrimaryKeys(String tableName,Object[] pks,Object[] params,ConnectionInfo info){
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		StringBuffer sb = new StringBuffer();
    		String sql = "select * from "+tableName;
    		sb.append(sql);
    		if(pks!=null && pks.length>0 && pks.length == params.length){
    			sb.append(" where ");
    			for (int i = 0; i < pks.length; i++) {
    				sb.append( pks[i] +" = ?");
    				if(i+1<pks.length){
    					sb.append(" and ");
    				}
    			}
    		}
    		
    		try {
    			sql = sb.toString();
    			Map<String, Object> map = query.query(conn,sql, new MapHandler(),params);
    			logger.info("查询成功,主键为:{},sql语句:{}, 找到记录:{}",pks,sql,map);
    			return map;
    		} catch (Exception e) {
    			logger.error("查询失败,主键为:{}, sql语句:{},原因:{}",pks,sql,e.getMessage());
    		} finally {
    			DBTools.closeQuietly(conn);
    		}
    		return null;
    	}
    	
    	/**
    	 * 根据表名找到bean的list集合(废弃使用)
    	 * @param tableName
    	 * @param T
    	 * @return
    	 * @throws SQLException
    	 */
    	@Deprecated
    	public static <T> List<T>  getTableBeanList(String tableName,Class<T> T,ConnectionInfo info) throws SQLException{
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		String sql = "select * from "+tableName;
    		List<T> beanList = new ArrayList<>();
    		beanList = (List<T>) query.query(conn,sql, new BeanListHandler<>(T));
    		return beanList;
    		
    	}
    	
    	/**
    	 * 根据表名找到该表的所有数据
    	 * @param tableName
    	 * @return
    	 */
    	public static List<Map<String,Object>> findMapListByTableName(String tableName,ConnectionInfo info) {
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		String sql = "select * from "+tableName;
    		
    		List<Map<String,Object>> mapList = null;
    		try {
    			mapList = query.query(conn,sql,new MapListHandler());
    			return mapList;
    		} catch (Exception e) {
    			logger.error("获取表的所有记录失败,sql:{},原因:{}",sql,e.getMessage());
    		} finally {
    			DBTools.closeQuietly(conn);
    		}
    		return mapList;
    	}
    	
    	
    	/**
    	 * 插入记录
    	 * @param tableName 表名
    	 * @param map 字段和参数
    	 */
    	public static void insert(String tableName,Map<String,Object> map,ConnectionInfo info){
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		String sql = "insert into "+tableName+"(";
    		StringBuffer sb = new StringBuffer();
    		sb.append(sql);
    		
    		Set<Entry<String,Object>> entrySet = map.entrySet();
    		Iterator<Entry<String, Object>> iterator = entrySet.iterator();
    		String cols = "";
    		String vals = "";
    		List<Object> params = new ArrayList<>();
    		
    		while (iterator.hasNext()) {
    			Entry<String, Object> next = iterator.next();
    			cols += next.getKey()+",";
    			vals += " ?,";
    			
    			params.add(next.getValue());
    			
    			//去除最后的逗号
    			if(!iterator.hasNext()){
    				StringBuffer s1 = new StringBuffer(cols);
    				s1 = s1.deleteCharAt(s1.length()-1);
    				s1.append(")");
    				cols = s1.toString();
    				
    				StringBuffer s2 = new StringBuffer(vals);
    				s2 = s2.deleteCharAt(s2.length()-1);
    				s2.append(")");
    				vals =  s2.toString();
    			}
    			
    		}
    		
    		sb.append(cols).append(" values(").append(vals);
    		sql = sb.toString();
    		
    		//执行
    		try {
    			query.update(conn,sql,params.toArray());
    			logger.info("插入数据成功,sql为:{}",sql);
    			logger.debug("参数为:{}",params);
    		} catch (Exception e) {
    			logger.error("插入数据失败,sql为:{}",sql);
    			logger.error("参数为:{}",params);
    			logger.error("失败原因:{}",e.getMessage());
    		} finally{
    			DBTools.closeQuietly(conn);
    		}
    		
    	}
    	
    	/**
    	 * 插入操作,只适用于同一个数据库下(废弃使用)
    	 * 例如: insert into tab1 select * from tab2 where ...
    	 * @param destTable
    	 * @param sourTable
    	 * @param pks
    	 * @param params
    	 */
    	@Deprecated
    	public static void directInsert(String destTable,String sourTable,String[] pks,Object[] params,ConnectionInfo info) {
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		StringBuffer sb = new StringBuffer();
    		String sql = "insert into "+ destTable+ " select * from "+ sourTable;
    		sb.append(sql);
    		if(pks!=null && pks.length>0 && pks.length == params.length){
    			sb.append(" where ");
    			for (int i = 0; i < pks.length; i++) {
    				if(params[i] instanceof Integer || params[i] instanceof Long ||params[i] instanceof Double){ //数字类型
    					sb.append( pks[i] +" = "+ params[i]);
    				} else if(params[i] instanceof String){ //字符串类型加单引号  ''
    					sb.append( pks[i] +" = "+ "'"+params[i]+"'");
    				} else{ //其他类型暂时不考虑,当字符串处理
    					sb.append( pks[i] +" = "+ "'"+params[i]+"'");
    				}
    				
    				if(i+1<pks.length){
    					sb.append(" and ");
    				}
    			}
    		}
    		sql = sb.toString();
    		try {
    			query.update(conn,sql);
    			logger.info("插入目标表:{} 成功,插入的sql语句为:{}",destTable,sql);
    		} catch (Exception e) {
    			logger.error("插入目标表:{} 失败,插入的sql语句为:{}",destTable,sql);
    			logger.error("失败原因:{}",e.getMessage());
    		}
    		
    	}
    
    	/**
    	 * 删除记录
    	 * @param tableName 表名
    	 * @param pks 主键
    	 * @param params 参数
    	 */
    	public static void delete(String tableName,Object[] pks,Object[] params,ConnectionInfo info){
    		if(pks.length == 0){
    			logger.error("删除指定数据请声明条件");
    			return;
    		}
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		
    		StringBuffer sb = new StringBuffer();
    		String sql = "delete from "+tableName+" where ";
    		sb.append(sql);
    		if(pks!=null && pks.length>0 && pks.length == params.length){
    			for (int i = 0; i < pks.length; i++) {
    				sb.append( pks[i] +" = ?");
    				
    				if(i+1<pks.length){
    					sb.append(" and ");
    				}
    			}
    		}
    		
    		sql = sb.toString();
    		try {
    			query.update(conn, sql,params);
    			logger.info("删除指定记录成功,sql为:{}",sql);
    			logger.debug("参数为:{}",Arrays.toString(params));
    		} catch (Exception e) {
    			logger.error("删除指定记录失败,sql为:{},原因:{}",sql,e.getMessage());
    			logger.error("参数为:{}",Arrays.toString(params));
    		} finally{
    			DBTools.closeQuietly(conn);
    		}
    	}
    	
    	/**
    	 * 更新记录 
    	 * @param tableName 要更新表的名字
    	 * @param pks 主键
    	 * @param otherCols 除主键外的要更新的列名
    	 * @param params 要修改的字段的值和主键的值,按顺序排列
    	 */
    	public static void update(String tableName,Object[] pks,Object[] otherCols,Object[] params,ConnectionInfo info){
    		if(pks.length == 0 || otherCols.length == 0 || pks.length+otherCols.length != params.length){
    			logger.error("参数不规范:	无主键,无修改的字段,参数个数不对");
    			return;
    		}
    		
    		Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod());
    		
    		String sql = "update "+tableName+" set ";
    		StringBuffer sb = new StringBuffer();
    		sb.append(sql);
    		for (int i = 0; i < otherCols.length; i++) {
    			sb.append(otherCols[i] + " = ?");
    			
    			if(i+1<otherCols.length){
    				sb.append(",");
    			}
    		}
    		
    		sb.append(" where ");
    		
    		for (int i = 0; i < pks.length; i++) {
    			sb.append(pks[i] + " =?");
    			
    			if(i+1<pks.length){
    				sb.append(" and ");
    			}
    		}
    		
    		sql = sb.toString();
    		//执行
    		try {
    			query.update(conn,sql,params);
    			logger.info("表名:{},更新成功:",tableName);
    			logger.debug("sql为:{}",sql);
    			logger.debug("参数为:{}",params);
    		} catch (Exception e) {
    			logger.error("表名:{},更新失败;	原因:{}",tableName,e.getMessage());
    			logger.error("sql为:{}",sql);
    			logger.error("参数为:{}",Arrays.toString(params));
    		} finally{
    			DBTools.closeQuietly(conn);
    		}
    		
    	}
    	
    	
    	
    	
    	
    	
    	/**
    	 * MD5加密
    	 * @param str 加密字符串
    	 * @return
    	 * @throws Exception 
    	 */
    	public static String getMD5(String str){
    		MessageDigest md5 = null;
    		byte[] byteArray = null;
    		try {
    			 md5 = MessageDigest.getInstance("MD5");
    			 byteArray = str.getBytes("UTF-8");
    			
    		} catch (Exception e) {
    			logger.error("获取MD5密文失败!!!");
    			return "1";  //返回相同的密文,防止数据更新错误
    		}
            byte[] md5Bytes = md5.digest(byteArray);
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i < md5Bytes.length; i++) {
                int val = ((int) md5Bytes[i]) & 0xff;
                if (val < 16) {
                    sb.append("0");
                }
                sb.append(Integer.toHexString(val));
            }
            return sb.toString();
        }
    
       
    
    	
    	
    	
    	/**
    	 * 返回为Map,进行比较
    	 * @param masterListMap
    	 * @param slaveListMap
    	 */
    	public static void updateTbDataByinsert2(List<Map<String,Object>> masterListMap,List<Map<String,Object>> slaveListMap){
    		
    	}
    	
    	public static void updateTbDataByDelete2(List<Map<String,Object>> masterListMap,List<Map<String,Object>> slaveListMap){
    		
    	}
    	
    	public static void updateTbDataByUpdate2(List<Map<String,Object>> masterListMap,List<Map<String,Object>> slaveListMap){
    		
    	}
    	
    	
    }
    
    

    测试数据库同步

    
    /**
     * 启动主程序
     * 
     * @author zhoucl
     *
     */
    public class StartupMain {
    	private static Logger logger = LoggerFactory.getLogger(StartupMain.class);
    	private static final String TABLE_MASTER = "TB_BASE_COUNTRY_INFO";
    	private static final String TABLE_SLAVE = "TB_BASE_COUNTRY_INFO";
    	private static final String MD5_KEY_NAME = "md5_record_value";
    
    	public static void main(String[] args) throws Exception {
    
    		//取主表所有记录成list hashmap
    		//对记录进行for循环,将所有的数据计算MD5值(规则是所有)A=DDD,DSF=DSF,DF=DSF,SDF,再计算MD5值 方法DigestUtils.md5Hex,然后修改haspmap
    		//增加一个KEY进去,KEY的名字叫MD5_RECORD_VALUE(这个名字定义成常量)
    		//比较两个list,找出需要新增的数据,找出需要修改数据,找出需要删除的记录
    		//FOR循环	insert 新增的数据
    		//FOR循环	update 修改数据
    		//FOR循环	delete 删除的记录
    		
    		
    		ConnectionInfo slaveConnection = new ConnectionInfo();
    		slaveConnection.setJdbcUrl("jdbc:oracle:thin:@112.74.163.78:1521:xe");
    		slaveConnection.setUserName("system");
    		slaveConnection.setPdwrod("oracle");
    		
    		ConnectionInfo masterConnection = new ConnectionInfo();
    		masterConnection.setJdbcUrl("jdbc:oracle:thin:@112.74.163.97:1521:xe");
    		masterConnection.setUserName("system");
    		masterConnection.setPdwrod("oracle");
    		
    		
    		//获取主表的主键
    		List<String> pksMaster = TableDao.findPrimaryKeysByTableName(TABLE_MASTER,masterConnection);
    		//获取从表的主键
    		List<String> pksSlave = TableDao.findPrimaryKeysByTableName(TABLE_SLAVE,slaveConnection);
    		//获取主表的所有列名
    		List<String> colsMaster = TableDao.findColumesNamesByTableName(TABLE_MASTER,masterConnection);
    		//获取从表的所有列名
    		List<String> colsSlave = TableDao.findColumesNamesByTableName(TABLE_SLAVE,slaveConnection);
    		//除主键外的其他字段的列名
    		List<String> otherCols = TableDao.findColumesNamesByTableName(TABLE_MASTER,masterConnection);
    		otherCols.removeAll(pksMaster); 
    		
    		
    		//判断主键是否相同
    		boolean isPksSame = isListSame(pksMaster,pksSlave);
    		//判断字段
    		boolean isColsSame = isListSame(colsMaster,colsSlave);
    		
    		if(!isPksSame){
    			throw new RuntimeException("{"+TABLE_MASTER+"},{"+TABLE_SLAVE+"} 两张表的主键不一致,请查看");
    		}
    		if(!isColsSame){
    			throw new RuntimeException("{"+TABLE_MASTER+"},{"+TABLE_SLAVE+"} 两张表的字段不一致,请查看");
    		}
    		
    		logger.info("========================================调整MAP结构=======================================");
    		
    		//获取主表的所有数据
    		List<Map<String, Object>> masterMapList = TableDao.findMapListByTableName(TABLE_MASTER,masterConnection);
    		masterMapList = reConstructMap(masterMapList);
    		//获取从表的所有数据
    		List<Map<String, Object>> slaveMapList = TableDao.findMapListByTableName(TABLE_SLAVE,slaveConnection);
    		slaveMapList = reConstructMap(slaveMapList);
    		logger.info("========================================MAP结构设置完毕=======================================");
    		
    		logger.info("==========================================开始更新=========================================");
    		//待插入的数据集合
    		List<Map<String,Object>> needInsertMapList = new ArrayList<>();
    		//待更新的数据集合
    		List<Map<String,Object>> needUpdateMapList = new ArrayList<>();
    		//待删除的数据集合
    		List<Map<String,Object>> needDeleteMapList = new ArrayList<>();
    		
    		
    		//找到需要插入和更新的数据
    		findNeedInsertAndUpdateData(masterMapList,slaveMapList,needInsertMapList,needUpdateMapList,pksMaster);
    		//找到需要删除的数据
    		findNeedDeleteMapList(masterMapList,slaveMapList,needDeleteMapList,pksSlave);
    		
    		//批量新增
    		batchInsert(needInsertMapList,slaveConnection);
    		
    		//批量删除
    		batchDelete(needDeleteMapList, pksSlave,slaveConnection);
    		
    		//批量更新
    		batchUpdate(needUpdateMapList, otherCols, pksSlave,slaveConnection);
    		
    		logger.info("==========================================更新完毕==========================================");
    	}
    
    	
    	
    	
    	
    	/**
    	 * 批量插入
    	 * @param tableName
    	 * @param mapList
    	 */
    	public static void batchInsert(List<Map<String,Object>> mapList,ConnectionInfo conn){
    		for (int i = 0,size=mapList.size();i < size; i++) {
    			Map<String, Object> map = mapList.get(i);
    			logger.info("正在插入,总的记录数:{},进度:[{}/{}],当前插入数据:{}",size,i+1,size,map);
    			map.remove(MD5_KEY_NAME);
    			
    			TableDao.insert(TABLE_SLAVE, map,conn);
    		}
    		
    	}
    	
    	/**
    	 * 批量删除
    	 * @param mapList
    	 * @param pks
    	 */
    	public static void batchDelete(List<Map<String,Object>> mapList,List<String> pksSlave,ConnectionInfo conn){
    		for (int i = 0,size=mapList.size();i < size; i++) {
    			Map<String, Object> map = mapList.get(i);
    			logger.info("正在删除,总的记录数:{},进度:[{}/{}],当前删除数据:{}",size,i+1,size,map);
    			map.remove(MD5_KEY_NAME);
    			
    			Object[] slavePkValues = getValueToPks(pksSlave,map);
    			TableDao.delete(TABLE_SLAVE, pksSlave.toArray(), slavePkValues,conn);
    		}
    	}
    	
    	/**
    	 * 批量更新
    	 * @param mapList
    	 * @param otherCols
    	 * @param slavePks
    	 */
    	public static void batchUpdate(List<Map<String,Object>> mapList,List<String> otherCols,List<String> pksSlave,ConnectionInfo conn){
    		//TableDao.update(tableName, pks, otherCols, params);
    		for (int i = 0,size=mapList.size();i < size; i++) {
    			Map<String, Object> map = mapList.get(i);
    			logger.info("正在更新,总的记录数:{},进度:[{}/{}],当前插入数据:{}",size,i+1,size,map);
    			Object[] pkValues = getValueToPks(pksSlave,map);
    			List<Object> params = new ArrayList<>();
    			for (int j = 0, num=otherCols.size();j < num; j++) {
    				String col = otherCols.get(j);
    				Object val = map.get(col);
    				params.add(val);
    			}
    			
    			for (int z = 0; z < pkValues.length; z++) {
    				Object obj = pkValues[z];
    				params.add(obj);
    			}
    			
    			//更新
    			TableDao.update(TABLE_SLAVE, pksSlave.toArray(), otherCols.toArray(), params.toArray(),conn);
    			
    		}
    		
    	}
    	
    	
    	/**
    	 * 找到需要插入和更新的map
    	 * @param masterMapList 主表数据集合
    	 * @param slaveMapList 从表数据集合
    	 * @param needInsertMapList 待插入数据集合
    	 * @param needUpdateMapList 待更新数据集合
    	 * @param pksMaster 主键列表
    	 */
    	public static void findNeedInsertAndUpdateData(List<Map<String,Object>> masterMapList,List<Map<String,Object>> slaveMapList,
    													List<Map<String,Object>> needInsertMapList,List<Map<String,Object>> needUpdateMapList,
    													List<String> pksMaster){
    		for (int i = 0,size=masterMapList.size(); i < size; i++) {
    			Map<String, Object> masterMap = masterMapList.get(i);
    			String masterMd5 = (String) masterMap.get(MD5_KEY_NAME);
    			Object[] masterPkValues = getValueToPks(pksMaster,masterMap);
    			boolean flag = false;
    			for (int j = 0,num=slaveMapList.size(); j < num; j++) {
    				Map<String, Object> slaveMap = slaveMapList.get(j);
    				Object[] slavePkValues = getValueToPks(pksMaster,slaveMap);
    				if(Arrays.equals(masterPkValues, slavePkValues)){
    					flag = true;  //找到了相同的主键
    					//判断MD5的值
    					String slaveMd5 = (String) slaveMap.get(MD5_KEY_NAME);
    					if(masterMd5.equals(slaveMd5)){
    						break;
    					}else{
    						//放入待更新数据集合
    						needUpdateMapList.add(masterMap);
    					}
    				}
    			}
    			if(!flag){ //遍历了一轮还没有找到相同的主键的值,放入待新增数据
    				needInsertMapList.add(masterMap);
    			}
    		}
    		logger.info("==============================待插入数据条数:{}",needInsertMapList.size());
    		logger.info("==============================待更新数据条数:{}",needUpdateMapList.size());
    		
    	}
    	
    	/**
    	 * 找到需要删除的map
    	 * @param masterMapList 主表数据集合
    	 * @param slaveMapList 从表数据集合
    	 * @param needDeleteMapList 待删除数据集合
    	 * @param pksSlave 主键列表
    	 */
    	public static void findNeedDeleteMapList(List<Map<String,Object>> masterMapList,List<Map<String,Object>> slaveMapList,
    											 List<Map<String,Object>> needDeleteMapList,List<String>pksSlave){
    		for (int i = 0,size=slaveMapList.size();i<size; i++) {
    			Map<String, Object> slaveMap = slaveMapList.get(i);
    			Object[] slavePkValuees = getValueToPks(pksSlave,slaveMap);
    			boolean flag = false;
    			for (int j = 0,num=masterMapList.size(); j < num; j++) {
    				Map<String, Object> masterMap = masterMapList.get(j);
    				Object[] masterPkValues = getValueToPks(pksSlave,masterMap);
    				if(Arrays.equals(slavePkValuees, masterPkValues)){
    					flag = true;
    					break;
    				}
    			}
    			if(!flag){ //如果主表中根据主键没有找到数据,放入待删除数据
    				needDeleteMapList.add(slaveMap);
    			}
    			
    		}
    		logger.info("==============================待删除数据条数:{}",needDeleteMapList.size());
    	}
    	
    	
    	
    	/**
    	 * 为每条记录添加md5字段
    	 * @param msaterMapList 主表或从表数据集合
    	 * @return
    	 */
    	public static List<Map<String, Object>> reConstructMap(List<Map<String, Object>> msaterMapList) {
    		for (int i = 0, size =msaterMapList.size() ; i < size; i++) {
    			Map<String, Object> map = msaterMapList.get(i);
    			StringBuffer sb = new StringBuffer();
    			Set<Entry<String,Object>> entrySet = map.entrySet();
    			Iterator<Entry<String, Object>> it = entrySet.iterator();
    			sb.append("{");
    			while(it.hasNext()){
    				Entry<String, Object> entry = it.next();
    				sb.append(entry.getKey()+" = ");
    				sb.append(String.valueOf(entry.getValue()));
    				
    				if(it.hasNext()){
    					sb.append(", ");
    				}
    			}
    			sb.append("}");
    			String jsonInfo = sb.toString();
    //			logger.info("json为:{}",jsonInfo);
    			//对字符串加密
    			String md5Hex = DigestUtils.md5Hex(jsonInfo);
    			map.put(MD5_KEY_NAME, md5Hex);
    //			logger.info("map为:{}",map);
    		}
    		
    		return msaterMapList;
    	}
    
    
    
    	/**
    	 * 获取Key加载信息
    	 */
    	public static void printKeyLoadMessage() {
    		StringBuilder sb = new StringBuilder();
    		sb.append("\r\n======================================================================\r\n");
    		sb.append("\r\n    欢迎使用小表广播程序  \r\n");
    		sb.append("\r\n======================================================================\r\n");
    		System.out.println(sb.toString());
    	}
    	
    
    	/**
    	 * 判断主键,cols是否相同
    	 * @param pks1
    	 * @param pks2
    	 * @return
    	 */
    	private static boolean isListSame(List<?> pks1,List<?> pks2){
    		if(pks1 != null && pks2 != null){
    			if(pks1.containsAll(pks2) && pks2.containsAll(pks1)){
    				return true;
    			}
    		}
    		logger.warn("list1:{}",pks1);
    		logger.warn("list2:{}",pks2);
    		return false;
    	}
    	
    	/**
    	 * 用于获取每条记录主键的值
    	 * @param pks 主键列表
    	 * @param map
    	 * @return
    	 */
    	public static Object[] getValueToPks(List<String> pks, Map<String, Object> map) {
    		List<Object> values = new ArrayList<>();
    		for (String pk : pks) {
    			Object val = map.get(pk);
    			values.add(val);
    		}
    		return values.toArray();
    	}
    }
    
    
    展开全文
  • 最近项目有可能从oracle转到pgsql,这中间就会涉及到数据库的迁移,网上搜了一下,发现都是在说用Ora2Pg进行转换,我在服务器上怎么装都没装成功,就想还有没有别方法。然后无意中想到以前用Navicat传输数据,就去...

    最近项目有可能从oracle转到pgsql,这中间就会涉及到数据库的迁移,网上搜了一下,发现都是在说用Ora2Pg进行转换,我在服务器上怎么装都没装成功,就想还有没有别的方法。然后无意中想到以前用Navicat传输数据,就去看了一下,发现Navicat还真的提供数据库迁移功能。

    步骤:打开Navicat -> 工具 -> 数据传输
    传输界面
    迁移后的结构样子:
    迁移结果
    有问题的数据:
    问题数据

    其中发现NUMBER的数据迁过来都变成1000位的0了,至于有没有其他的问题,待后续慢慢了解,至少知道这个方法是可行的。
    这个工具提供了目标为文件的选项,也提供了高级选项,应该还要更好的玩法,期待后续深入了解。

    展开全文
  • 本文适用于使用DTS实现相同实例下库名不同数据库之间的数据迁移。本文以使用DTS将同一RDS实例下的amptest库迁移到jiangliu_amptest库为例来说明如何使用DTS实现相同实例下库名不同数据库之间的数据迁移。 ...

    适用场景

    本文适用于使用DTS实现相同实例下库名不同的数据库之间的数据迁移。本文以使用DTS将同一RDS实例下的amptest库迁移到jiangliu_amptest库为例来说明如何使用DTS实现相同实例下库名不同的数据库之间的数据迁移。

    说明:当源和目标实例类型不为RDS时,配置流程相同。

    环境准备

    • 创建RDS账号,不同的数据库类型,要求的迁移账号权限不同,具体权限要求可以参考产品手册-数据迁移中的相关文档。

    • 在同一RDS实例下创建好amptest数据库以及jiangliu_amptest数据库

    操作步骤

    1. 以目标RDS实例所属阿里云账号登录数据传输DTS控制台,单击右上角的创建迁移任务,开始任务配置。

    2. 实例连接信息配置设置实例连接信息,如下图所示:

      在这个步骤中,主要配置迁移任务名称,迁移源实例及目标实例连接信息。其中:

      • 任务名称

        默认情况下,DTS为每个任务自动生成一个任务名称。任务名称没有唯一性要求,您可以修改这个名称,为任务配置一个具有业务意义的名称,便于后续的任务识别。

      • 源实例连接信息

        • 实例类型:选择 RDS实例
        • RDS实例ID:配置RDS实例的实例ID
        • 数据库账号:RDS实例访问账号
        • 数据库密码:上面RDS访问账号对应的密码
      • 目标实例连接信息

        • 实例类型:选择 RDS实例
        • RDS实例ID:配置RDS实例的实例ID
        • 数据库账号:RDS实例访问账号
        • 数据库密码:上面RDS访问账号对应的密码
    3. 设置迁移类型及迁移对象,选择了迁移库amptest,此时要配置库名映射,将源实例的amptest映射到到目标RDS实例的库jiangliu_amptest,如下图所示:

      跨库迁移_实例连接信息

    4. 设置库名映射,将鼠标挪到右边已选择对象框中的,amptest对应的行上,右边出现 编辑 按钮。单击编辑,进入库名映射配置页面,如下图所示:

      跨库迁移_对象名映射

    5. 配置完成后单击预检查并启动,如下图所示。在迁移任务正式启动之前,会先进行前置预检查:

      • 预检查成功:成功启动迁移。
      • 预检查失败:可以单击具体检查项后的按钮,查看具体的失败详情,并根据失败原因修复后,重新进行预检查。

        跨库迁移_预检查

        当预检查通过后,可以启动迁移任务。

        至此,完成使用DTS实现相同RDS实例下库名不同的数据库之间的数据迁移。

    转载于:https://www.cnblogs.com/weifeng1463/p/9532357.html

    展开全文
  • 以前的工作迁移过oracle到Informix...因为成本预算等多方面的原因,公司决定要去o,在去o之前首先要决定拿什么来替代oracle,拿什么工具将源数据库的数据导到目标数据库,怎么导,等等的问题。导的过程的增量数据怎么处

    以前的工作迁移过oracle到Informix、oracle和SQLSERVER、oracle到mysql。 在目前的公司又因为去o的关系,做了大量的迁移工作,栽了不少坑,所以和大家交流一下在迁移的过程中的一些实践。

    因为成本预算等多方面的原因,公司决定要去o,在去o之前首先要决定拿什么来替代oracle,拿什么工具将源数据库的数据导到目标数据库,怎么导,等等的问题。导的过程的增量数据怎么处理。导的时候源数据和目标和数据的数据类型差异如何处理,像视图,存储过程,触发器这种数据库对象之间的不同怎么解决,导的时候如何不影响源数据库性能。导完以后的数据比对以及数据无误后应用的性能问题都是要考虑的。

    在我们做数据迁移之前先确认的就是target database ,就是要迁到什么数据库上,经过了一些调研从速度,流行度等多个方面选择了MySQL 因为相信被Oracle收购后表现会越来越好,当然也想过使用PosgreSQL,不过做了一个测试,发现MySQL5.7的QPS在比同样配置的PG要高,基于在线事务对性能的要求,最终还是选择了MySQL.选择了mysql以后,对于mysql的分枝和版本的选择也很头痛。Percona增加了很多性能相关补丁,MariaDB支持更多的引擎,官方的版本也能满足目前的需求,从保守的原则上,我们的核心的数据库最终还是使用了官方的版本,一些不是太核心的数据库,其他的分支也有在用。因为mycat的支持关系最终选择的是5.6的版本(目前mycat1.6对mysql5.7的支持不是太好)为了达到像oracle的dg/ogg一样稳定的架构,我们把mysql的架构做成了双机房的mha,并且用了mycat做了读写分离。同样的ORACLE这边因为同时还有应用在跑,为了分散oracle的压力.所有的同步作业也是在备库和异机房的ogg端进行的操作。

    在选择了合适的db来替换oracle之后,下一步就是选择一个合适的迁移工具来做迁移了,我们在迁移工具的选择方面花费了大量时间和精力。迁移是一个漫长而困难的工作,我们在迁移的过程中也历经了不同的阶段,使用了不同的方法。从最初级的load csv升级成自已写的程序,再去找oracle和mysql官方推荐的工具,最后也尝试了一些 ETL的工具,被这么多工具摧残之后,幸运的是能够在不同的场情下使用不同的方式。接下来我们对每一种都进行一个简单的介绍和使用中遇到的一些问题.

    我们在最之前的时候只是进行某个项目的迁移工作,因为时间的关系并没有进行迁移工具的选型以及使用,使用了最简单的方式 就是sqlload
    所有的操作步骤比把大象放进冰箱还要简单,简单的只要分两步,第一步把oracle的数据导成csv或者sql,然后再load或者source到mysql中就可以了。

    把oracle的数据导成csv或者sql可以用很多工具,比如sql developer或者toad,不过我还是更推荐spool,大家应该都用过spool,他可以结合set把内容输出到指定的文件中,然后选择合理的行列分隔符,就可以产生csv文件了。

    使用sqlload的优点就是速度快和超级简单,不过同样的,他也会有很多弊端,它很难做成自动化和全面普及到很多张表上,每有一张表的操作就要写sql拼csv,然后还不能保证是一样的分隔符,大多数时候对lob字段操作也很麻烦。对类似于comments的评论字段也很难原样的copy过去。我们来看一个简单的例子:
    1.png
    图1:第一步我先在oracle这边创建了一张表,很简单只有四列,然后insert了三条数据查看了一下内容。
    2.png
    图2:然后做了一些简单的可能会用到的查询
    3.png

    图3:然后看一下导出用的spool的内容,实际用的时候时候肯定会比这个更复杂,要对换行、time、lob等进行更多的函数处理。然后把数据导了出来看一下。
    4.png
    图4:接着我又在mysql创建一张一样的表把数据load了进去。load的语法不是我们今天要分享的重点,它的作用就是把file load into table.可以指定行列分隔符。 可以看到数据load进去了三行,同时也给出了三个警告,第二行一个,第三行两个,分别是int类型的列传了一个空字符串和时间类型的被截取了。查看一下表里的数据发现和预期的不一样。
    5.png

    图5:然后把刚刚在oracle那边进行查询再次查询一下发现结果都变的不一样了。
    这是因为在MySQL里int类型如果插入的为空,结果会自动转成0。
    官方文档上有明确的说明:
    An empty field value is interpreted different from a missing field:
    For string types, the column is set to the empty string.
    For numeric types, the column is set to 0.
    For date and time types, the column is set to the appropriate “zero” value for the type.

    6.png
    我们再看一下用etl工具迁移过来的数据,可以发现数据被insert成了null 符合了oracle的意思,其实这就是sqlload的时候一些弊端,数据类型可能弄的不是原来的数据了。同样的我们也可以设置成严格的模式,int类型的不允许插入null,我们会在的面的sql_mode里讲到。

    迁了部分数据之后觉得load数据虽然简单和快,但是瑜不掩瑕,总是有这样那样的问题,迁移之后往往还会同时伴随着大量的数据修复工作。很快的我们就弃用了这种操作,在这里要说明一下sqlload的操作因为速度又快又不依赖其他组件所以适用于数据类型并不复杂的单表操作,然后就写了python代码来接替它来完成数据迁移的操作,使用python的话其实也很简单,可以分为三步,第一步就是建立配置表,同时和mysql的表进行mapping,标识出是全量的还是增量的,如果是增量的,以什么做为增量来处理,然后第二步就是根据mapping进行code、code、code,最后根据不同的入参写好crontab就可以进行调度就可以了。

    使用python处理的过程中可以对一些数据进行转换,也更加灵活的配置了一些选项,实现了较强的逻辑控制,当然也有一些缺点它的速度慢了太多(不过也只比load慢,比起来后面要介绍的java编写的软件还是快的很多)。对于异常的处理也花费了大量的代码逻辑,同时也要会写代码。我们可以简单的来看一下实现

    7.png

    图七:这一个代码片断,显示了增量同步每一天的数据的逻辑。
    8.png
    图8:这是每天跑批之后生成的log,可以看出来把warning和error都列了出来,同时也对行数进行了统计。已经可以说是一个不错的小型产品了。可以看出来6w条数据用了4s和load来比算是慢的,但是和java之类的比算是快的了。

    因为python开发的这一套东西虽然也不算太慢,但是因为要自己用代码实现较强的逻辑,并且有些需求在oracle的业务还没有完全下线之前要实时的同步到mysql里来,所以我们又研究了一下ogg的做法。先提前说一下ogg的应用场景就是那种要求实时并且可能需要回写数据的。

    ogg的用法说起来很简单只要配置好oracle端,配置好mysql端,然后对应的进程起起来就可以了。但是用过ogg的人都知道配置一套ogg本身就很麻烦了,异构数据库之间再进行同步的话调通并可用需要很久的配置时间,所以我们大致说一下做法,除非真的有这种硬性需求,不然不推荐使用。我们来简单说一下做法和过程如果要用ogg的要注意的事项:

    1. 5.6版本需要12.1.2版本的OGG才支持
    2. 异构数据库之间不支持DDL复制
      从oracle同步到mysql,属于异构架构,不支持ddl同步,包括添加和删除字段,添加和删除索引,重命名表,表分析统计数据。
      若是涉及到源端和目标端ddl操作,需要进行源端和目标端同时手工操作。

    3. 必须要配置defgen,且文件必须放在相同的目录。

    4. 如果要是双向的话就必需把mysql端的binglog设置成row
      binlog_format: This parameter sets the format of the logs. It must be set to the value of ROW, which directs the database to log DML statements in binary format. Any other log format (MIXED or STATEMENT) causes Extract to abend.

    5. goldengate对mysql只支持innodb引擎
      所以,在创建mysql端的表的时候,要指定表为innodb引擎。
      create table mysql (name char(10)) engine=innodb;
      所有的帮助可以online help里去看
      http://docs.oracle.com/goldengate/c1221/gg-winux/GIMYS/system_requirements.htm#GIMYS122

    OGG是ORACLE官方推荐的工具,使用的原理就是基于日志的结构化数据复制,通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标数据库,那mysql官方没有提供工具呢?答应是肯定的.mysql的官方同样的也提供一个用于异构之间的数据迁移的工具,从mysql到其他数据库,或者从其他数据库到mysql都是可以的。这个工具就是MySQL Migration Toolkit。这个工具可以单独被下载,也被集成到了mysql wrokbench里。不过如果单独下载的话 只有windows的版本。
    https://downloads.mysql.com/archives/migration/
    这是一个基于java的程序,所以依赖于jar包,使用它的第一步就是load一个odbc.jar.接着就可以把源端和目标端进行配置连接,然后选择要导入的对象(可以包含视图,但是一般有子查询的会报错)。然后进行导入就可以了。

    使用它的优点就是可以在mysql端自动创建表,但是有可能自动convert的类型有问题需要人为参与一下进行处理,比如Oracle中通常会对Timestamp类型的数据设置默认值sysdate,但在MySQL中是不能识别的。缺点就是只有windows的平台有,如果在导大数据量的时候,极有可能就hang住了。所以个人感觉它的适用场景就是一次性导入的小批量的数据。

    上面提到的几种工具都是一步一个坑的使用过之后发现并没有尽善尽美,总有这样或者那样的不足,接下来我们来推荐的就是终级必杀的好用的etl工具:KETTLE.

    它是一款纯java编写的软件,就像它的名字(水壶)一样,就是用来把各种数据放到一个壶里,然后以一种指定的格式流出。当然同样的你也可以使用ds(datastage)或者informatica。不过这两个是收费的,而kettle是免费开源的。

    我们这里只介绍它的最简单的能满足我们把数据从oracle迁移到mysql的功能。第一步同样的就是把jar包load进去,不同的是这次要load的是mysql的jar包。需要注意的是如果你的mysql的版本不同可能需要load不同的jar包。第二步同样的也是配置连接信息,保证你的源和目标都连接成功,最后一步就是简单的拖拖拽拽了。最后run一下就可以了。它的优点就是配置起来比ogg快,但是同样的也可以通过job做到实时同步,处理速度和python旗鼓相当,却不用自己来写mapping关系,并且提供了图形化界面。也同样的能和Migration Toolkit一样同时创建表(新增一个java脚本),进行类型转换,但日志更详细。不过可能学习成本高一点,要看的懂一些java报错方便调试。

    接下来我们再简单的看一个demo:

    9.png
    图9:我们运行spoon.sh之后可以打开这个界面。view一界显示了这个转换的名字、数据源、处理步骤等,中间区域是你拖拽出来的操作,一个输入,一个输出。这就是一个简单的数据迁移的所有步骤。

    10.png
    图10:打开input的内容,就是很简单的一条sql在哪个源数据库上抽取数据,当然这条sql也可以是拖拽生成出来,类似于congos的拖拽生成报表。千万一定要注意的是不要加分号。
    11.png
    图11:output的内容就显示丰富了很多,选择目标数据源,以及会自动的mapping列的信息,还有在迁移之间要不要先清空,迁移的过程中如果遇到问题了是不是中止,

    12.png

    图12:这里就是显示了它超越Migration tools的log的log 最细粒度到行级别,可以更快的分析到问题。

    13.png
    图13:这里则是详细的日志输出 一般如果定时跑批处理的话把它重定向具体的log里然后当做发送邮件。

    我们刚刚用了很长的篇幅来了解了一下几种迁移的工具,每种迁移的方式都是各有千秋,在合适的场景下选择适合自己的方法进行操作。不过刚刚的迁移的都是表和数据对象,我们都知道在数据库还有一些其它的对象,像视图,物化视图,存储过程,函数,包,或者一个索引,同样的sql是不是也需要改写也是我们需要考虑到的一个因素。
    接下来我们来看一下其它对象怎么迁移。
    1、view:
    在mysql里view是不可以嵌套子查询的:
    create view v_test as select * from (select * from test) t;
    ERROR 1349 (HY000): View's SELECT contains a subquery in the FROM clause
    解决方法就是view的嵌套:
    create view v_sub_test as select * from test;
    Query OK, 0 rows affected (0.02 sec)
    create view v_test as select * from v_sub_test;
    Query OK, 0 rows affected (0.00 sec)
    2、物化视图:
    物化视图用于预先计算并保存表连接或聚集等耗时较多的操作的结果,这样,在执行查询时,就可以避免进行这些耗时的操作,而从快速的得到结果,但是mysql里没有这个功能。不过通过事件调度和存储过程模拟物化视图,不过实现的难点在于更新物化视图,如果要求实时性高的更新,并且表太大的话可能会有一些性能问题。

    3、Trigger、存储过程、package:
    1).ORACLE创建触发器的时候允许or,但是mysql不允许。所以迁移的时候如果有需要写两个.
    2).两种数据库定义变量的位置不同。而且mysql里不支持%type. 但是这个在oracle中用的太频繁了,而且还是个好习惯。
    3).elseif的逻辑分支语法不同,并且mysql里也没有for循环
    4).在mysql中不可以返回cursor,并且声明时就要赋对象
    5).oracle用包来把存储过程分门别类,而且在package里可以定义公共的变量/类型,既方便了编程,又减少了服务器的编译开销。但是mysql里根本没有这个概念。所以mysql的函数也不可以重载
    6).预定义函数.mysql里没有to_char() to_date()之类的函数。也并不是所有的oracle都是好的,就像substring()和load_file()这样函数的,mysql有的oracle就没有。
    7).mysql里可以使用set和=号给变量赋值,但是不可以使用:=。 而且在mysql里没||来拼接字符串。
    8).mysql的注释必须要求-- 和内容之间有一个空格
    9).Mysql存储过程中只能使用leave退出当前存储过程.不可以使用return. 
    10).mysql的异常对象不同,mysql同样的也可以定义和处理异常,对象名字不一样。

    4、分页语句:
    MySql中使用的是limit关键字,但是在oracle中使用的是rownum关键字。所以每有的和分页相关的语句都要进行调整。
    5、JOIN:
    如果你的sql里有大量的(+) 这绝对是一个很头疼的问题。需要改写。
    6、group by语句:
    在oracle里在查询字段出现的列一定要出现在group by后面,而mysql里却不用。只是这样出来的结果可能并不是预期的结果。能够造成mysql这种奇怪的特性的是因为sql_mode的设置,一会我们会详细的说一下sql_mode.不过从oracle迁移到mysql的过程中,group by语句不会有跑不通的情况,反过来迁移可能就需要很长的时间来调整了。
    7、bitmap位图索引
    在oracle里可以利用bitmap来实现布隆过滤,进行一些查询的优化,同时这一特性也为oracle一些数据仓库相关的操作提供了很好的支持,但是在mysql里没有这种索引,所以以前在oracle里利于bitmap进行优化的sql可能在mysql还是有很大的性能问题,目前也没有什么较好的解决方案,可以尝试着建btree的索引看是否能解决问题。要求mysql提供bitmap索引在mysql的bug库里被人当作一个中级的问题提交了上去,不过至今还是没有解决。
    8、分区表(Partitioned table)
    需要特殊处理,与Oracle的做法不同,MySQL会将分区键视作主键和唯一键的一部分。为确保不对应用逻辑和查询产生影响,必须用恰当的分区键重新定义目标架构。
    9、角色:
    mysql8.0以前也没有role的对象。在迁移的过程中如果遇到的角色则是需要拼sql来重新赋权。不过mysql更好的一点是是mysql的用户与主机有关。
    10、表情和特殊字符:
    在oracle里我们一般都选择AL32UTF8的字符集,已经可以支付生僻字和emoji的表情了,因为在迁移的时候有的表包含了大量的表情字符,在mysql里设置了为utf8却是不行,导过去之后所有的都是问号,后来改成了utf8mb4才解决问题,所以推荐默认就把所有的db都装成utf8mb4吧。
    oracle和mysql差异远远不止这些,像闪回,awr这些有很多,我们只谈一些和迁移工作相关的。

    当数据迁移完成以后,如何确保数据的正确迁移,没有遗漏和错误是一个很难的问题,这里的难不是实现起来困难,而是把它自动化,然后节省人力有点难,因为两者的数据类型不同,数据量偏大,写一些脚本去做检查效果不大

    我们的数据校检工作主要分为在导入的过程中的log和警告,在load的时候SHOW WARNINGS和errors,在使用python、ogg、kettle等工具时详细的去看每个errors信息。

    迁移或增量操作完成以后,用最简单的count(*)去检查,在mysql和oracle上检查进行比对。如果数据量一致,再进行数据内容的验证。由于数据量太大,只进行了抽样检测.人工的手动检验如果没有问题了,可以使用应用程序对生产数据库的副本进行测试,在备库上进行应用程序的测试。近而进行再一次的验检。 另外推荐的一种方式就是使用etl工具配置好mysql和oracle的数据源,分别对数据进行抽取,然后生成cube。进行多纬度的报表展现。数据是否有偏差 可以一目了然的看的很清楚。
    数据的完整性验证是十分重要的,千万不要怕验证到错误后还要好长时候去抽取同步的操作,因为一旦没有验证到错误,数据进行了使用,一旦乱掉,后果将更严重。

    https://dev.mysql.com/doc/refman/5.5/en/sql-mode.html
    MySQL服务器能够工作在不同的SQL模式下,并能针对不同的客户端以不同的方式应用这些模式。这样,应用程序就能对服务器操作进行量身定制以满足自己的需求。这类模式定义了MySQL应支持的SQL语法,以及应该在数据上执行何种确认检查。
    TRADITIONAL
    设置“严格模式”,限制可接受的数据库输入数据值(类似于其他数据库服务器),该模式的简单描述是当在列中插入不正确的值时“给出错误而不是警告”
    ONLY_FULL_GROUP_BY
    在mysql的sql_mode=default的情况下是非ONLY_FULL_GROUP_BY语义,也就是说一条select语句,mysql允许target list中输出的表达式是除聚集函数,group by column以外的表达式,这个表达式的值可能在经过group by操作后变成undefined,无法确定(实际上mysql的表现是分组内第一行对应列的值)
    select list中的所有列的值都是明确语义,简单的说来,在ONLY_FULL_GROUP_BY模式下,target list中的值要么是来自于聚集函数的结果,要么是来自于group by list中的表达式的值
    Without Regard to any trailing spaces
    All MySQL collations are of type PADSPACE. This means that all CHAR, VARCHAR, and TEXT values in MySQL are compared without regard to any trailing spaces. “Comparison” in this context does not include the LIKE pattern-matching operator, for which trailing spaces are significant. MySQL校对规则属于PADSPACE,mysql对CHAR和VARCHAR值进行比较都忽略尾部空格,和服务器配置以及MySQL版本都没关系。
    explicit_defauls_for_timestamp
    mysql中TIMESTAMP类型和其他的类型有点不一样(在没有设置explicit_defaults_for_timestamp=1的情况下),在默认情况下,如果TIMESTAMP列没有显式的指明null属性,那么该列会被自动加上not null属性(而其他类型的列如果没有被显式的指定not null,那么是允许null值的),如果往这个列中插入null值,会自动的设置该列的值为current timestamp值,表中的第一个TIMESTAMP列,如果没有指定null属性或者没有指定默认值,也没有指定ON UPDATE语句。那么该列会自动被加上DEFAULT CURRENT_TIMESTAMP和ON UPDATE CURRENT_TIMESTAMP属性。第一个TIMESTAMP列之后的其他的TIMESTAMP类型的列,如果没有指定null属性,也没有指定默认值,那么该列会被自动加上DEFAULT '0000-00-00 00:00:00'属性。如果insert语句中没有为该列指定值,那么该列中插入'0000-00-00 00:00:00',并且没有warning
    如果我们在启动的时候在配置文件中指定了explicit_defaults_for_timestamp=1,mysql会按照如下的方式处理TIMESTAMP 列
    此时如果TIMESTAMP列没有显式的指定not null属性,那么默认的该列可以为null,此时向该列中插入null值时,会直接记录null,而不是current timestamp。并且不会自动的为表中的第一个TIMESTAMP列加上DEFAULT CURRENT_TIMESTAMP 和ON UPDATE CURRENT_TIMESTAMP属性,除非你在建表的时候显式的指明

    我们可以在导入数据的时候预先的修改一些参数,来获取最大性能的处理,比如可以把自适应hash关掉,Doublewrite关掉,然后调整缓存区,log文件的大小,把能变大的都变大,把能关的都关掉来获取最大的性能,我们接下来说几个常用的:
    innodb_flush_log_at_trx_commit
    如果innodb_flush_log_at_trx_commit设置为0,log buffer将每秒一次地写入log file中,并且log file的flush(刷到磁盘)操作同时进行.该模式下,在事务提交的时候,不会主动触发写入磁盘的操作。
    如果innodb_flush_log_at_trx_commit设置为1,每次事务提交时MySQL都会把log buffer的数据写入log file,并且flush(刷到磁盘)中去.
    如果innodb_flush_log_at_trx_commit设置为2,每次事务提交时MySQL都会把log buffer的数据写入log file.但是flush(刷到磁盘)的操作并不会同时进行。该模式下,MySQL会每秒执行一次 flush(刷到磁盘)操作。
    注意:由于进程调度策略问题,这个“每秒执行一次 flush(刷到磁盘)操作”并不是保证100%的“每秒”。

    sync_binlog
    sync_binlog 的默认值是0,像操作系统刷其他文件的机制一样,MySQL不会同步到磁盘中去而是依赖操作系统来刷新binary log。
    当sync_binlog =N (N>0) ,MySQL 在每写 N次 二进制日志binary log时,会使用fdatasync()函数将它的写二进制日志binary log同步到磁盘中去。
    注:
    如果启用了autocommit,那么每一个语句statement就会有一次写操作;否则每个事务对应一个写操作。

    max_allowed_packet
    在导大容量数据特别是CLOB数据时,可能会出现异常:“Packets larger than max_allowed_packet are not allowed”。这是由于MySQL数据库有一个系统参数max_allowed_packet,其默认值为1048576(1M),可以通过如下语句在数据库中查询其值:show VARIABLES like '%max_allowed_packet%';修改此参数的方法是在mysql文件夹找到my.cnf文件,在my.cnf文件[mysqld]中添加一行:max_allowed_packet=16777216

    innodb_log_file_size
    InnoDB日志文件太大,会影响MYSQL奔溃恢复的时间,太小会增加IO负担,所以我们要调整合适的日志大小。在数据导入的时候先把这个值调的大一点。避免无谓的buffer pool的flush操作。但是也不能太大 innodb_log_file_size开得太大,会明显增加innodb的log写入操作,而且会造成操作系统需要更多 的Disk Cache开销。

    innodb_log_buffer_size
    InnoDB用于将日志文件写入磁盘时的缓冲区大小字节数。为了实现较高写入吞吐率可增大该参数的默认值。一个大的log buffer 让一个大的事务运行不需要写日志到磁盘在事务提交前,因此,如果你有事务比如update,insert或者delete 很多的记录, 让log buffer 足够大来节约磁盘I/O.

    innodb_buffer_pool_size 
    这个参数主要缓存innodb表的索引,数据,插入数据时的缓冲。为Innodb加速优化首要参数。一般让他等于你所有的innodb_log_buffer_size的大小就可以
    innodb_log_file_size要越大越好,

    innodb_buffer_pool_instances
    InnoDB缓冲池拆分成的区域数量。对于数GB规模缓冲池的系统,通过减少不同线程读写缓冲页面的争用,将缓冲池拆分为不同实例有助于改善并发性。

    展开全文
  • 1. 不同数据库之间的数据迁移; 2. 表结构变化很大; 3. 支持多张表多种字段的对应的数据迁移; 4. 考虑数据量很大的迁移。 然后就写了一个小工具,能解决这样的问题,感觉功能还是蛮通用的,所以就分享一下,...
  • 准备工作(准备测试数据等): bepone> uname -a OSF1 bepone V4.0 1229 alpha bepone> bepone> bepone> sqlplus /nolog SQL*Plus: Release 8.1.7.0.0 - Production on Thu Jan 23 15:24:12 2003 (c) Copyright 2000...
  • 首先进行的是表结构的创建,在GBase数据库创建了923张要迁移的数据的表结构,目前是个小白,还不知道有什么好的工具在这两个数据库之间迁移数据表结构的好的方法,GBase数据库也是刚接触,之前搜索过很多资料,但是...
  • PostgreSQL数据库之间的迁移 1、同版本数据库之间的迁移 同版本数据库的含义:数据库的主...3.不同数据库之间的迁移 不同数据库的含义:(举例:将PostgreSQL数据库中的数据迁移到MySQL等)(难度大) 待更新 ...
  • 现在新旧系统切换,需要在新库中把老库一些表数据导入到新库,但是新库表结构是经过优化,导致了新、老库表结构不一样情况,请教一个迁移方案,谢谢(老库是sqlserver,新库是mysql)
  • 更新(不能导出新增数据,只能导入原有更新后的数据)updateonl import与export sqoop export:将数据从hadoop(先将数据从hive,hbase导入hdfs) 导入到关系型数据库(mysql,Oracle) sqoop import:将数据从关系型...
  • 各种数据库之间数据迁移工具

    万次阅读 2017-03-20 18:01:28
    前阶段需要把sqlserver上的数据迁移到mysql上,找到了非常好用的工具DB2DB。 下载地址:http://www.szmesoft.com/DB2DB DB2DB 是目前经过测试速度最快、最稳定实现多种数据库之间进行数据转换的工具。支持 SQL ...
  • 本例主要实现Oracle向Access的迁移,综合运用Java JDBC驱动以及JDBC-ODBC桥分别连接Oracle数据库管理系统和Access数据库,其他的数据迁移也是一样,只不过修改一下连接方式而已!本例中将Oracle中获取的数据,直接...
  • 随着数据库管理系统和操作系统平台更新换代速度加快,数据库管理员经常需要在两个不同的数据库之间或在两种不同的系统平台之间进行数据迁移。本文介绍了数据库数据迁移的一般步骤以及实现向Oracle8i数据库进行...
  • 在日常工作中,有时不可避免需要进行数据迁移,但是经常会出现数据库类型不一致情况,而重新安装mysql或者oracle都是稍微有点“重量级”操作,不仅要配置各种环境变量,还会导致机子运行速度变慢。比较简单...
  • 相同数据库软件之间效果好,不同数据库软件之间可能会有一些问题 方案3:人工手动迁移 1.导出成SQL脚本,手动处理成另一种数据库sql语句 2.写程序,连接旧数据库,取出数据,连接新数据库,存入数据 Cla...
  • 分享主题:如何利用Python 进行 Oracle 与 Mysql 不同数据库类型之间的数据 diff 一、适用场景 项目工作中,可能会有 A 类型数据库数据需要迁移到 B 类型的数据库中的需求。例如:假设现有一个数据库的迁移需求,...
  • 主要介绍了Oracle数据库的数据迁移方法,详细内容请大家参考下文: 随着数据库管理系统和操作系统平台的更新换代的速度的加快,数据库管理员经常需要在两个不同数据库之间或在两种不同的系统平台之间进行数据迁移...
  • 用system用户登陆testdb数首页 → 数据库技术背景:阅读新闻Oracle不同表空间之间的数据迁移[日期:2013-01-29]来源:Linux社区作者:Linux[字体:]--将数据库为testdb,用户名为testuser中默认(users)表空间里...
  • 主要介绍了Oracle数据库的数据迁移方法,详细内容请大家参考下文: 随着数据库管理系统和操作系统平台的更新换代的速度的加快,数据库管理员经常需要在两个不同数据库之间或在两种不同的系统平台之间进行数据迁移...
  • 一般在项目里,我们难免会遇到不同数据库间的数据迁移,或者是不同数据库之间的数据同步对接。而kettle软件作为第三方软件可以很好的实现对接。 由于本人也是刚刚使用这个软件,所以在这里分享几个简单的数据对接...
  • Oracle数据库的数据迁移方法发表于:2008-04-14来源:作者:点击数:随着数据库管理系统和操作系统平台的更新换代的速度的加快,数据库管理员经常需要在两个不同数据库之间或在两种不同的系统平台之间进行数据迁移...
  • 1、详细讲解 Oracle 数据库的数据迁移方法 ( 1)随着数据库管理系统和操作系统平台的更新换代的速度的加快, 数据库管理员经常需要在两个不同数据库之间或 在两种不同的系统平台之间进行数据迁移。 本文介绍了...
  • 数据迁移的方案受影响的因素有同种数据库的不同版本之间的迁移,不同数据库之间的迁移。 迁移数据量大小,迁移效率的限制等 exp/imp逻辑备份与恢复(常规备份) 它是oracle最常用最简单的方法,一般是基于应用的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 627
精华内容 250
关键字:

不同数据库之间的数据迁移