精华内容
下载资源
问答
  • 构图与异构图CYPHER-TASK设计与TASK锁机制问题背景CYPHER-TASK设计同构图异构图check-point表结构设计task-lock表结构设计任务模块解构数据分块任务状态回滚任务状态同步任务状态锁完整实现案例构图节点TASK关系...

    问题背景

    大规模重复并发执行写入操作会导致图数据库服务堆积大量的写入请求,导致服务性能下降甚至宕机。因此TASK锁机制设计非常重要,必须保证在同一时刻写入任务不可重复执行;检查点机制的设计保证了数据同步的一致性和完整性;TASK占用过多系统内存【尤其在处理大量数据时】图数据库服务会存在宕机风险,数据分块方案的设计很好的避免了这个问题。

    CYPHER-TASK设计

    同构图

    • 每个任务都需要获取锁然后执行数据构建逻辑,不管构建逻辑是否成功执行TASK结束时必须释放锁
    • [NODE-TASK]负责锁的node_check-point更新以及后续任务的rel_check_point同步
    • [REL-TASK]负责node_check-point的回滚和任务状态同步rel_check_point=node_check_point
    # TASK执行流程
    [NODE-TASK]->[REL-TASK]
    

    异构图

    • 每个任务都需要获取锁然后执行数据构建逻辑,不管构建逻辑是否成功执行TASK结束时必须释放锁
    • [FROM-NODE-TASK]负责锁的node_check-point更新以及后续任务的rel_check_point同步
    • [TO-NODE-TASK]负责node_check-point的回滚
    • [REL-TASK]负责node_check-point的回滚和任务状态同步rel_check_point=node_check_point
    # TASK执行流程
    [FROM-NODE-TASK]->[TO-NODE-TASK]->[REL-TASK]
    

    check-point表结构设计

    任务状态表负责保存节点TASK和关系TASK的任务状态,在整个任务流上实现任务状态的传递,同时保证数据一致性和完整性。

    CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
    `huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
    `hcode` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '代码:HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
    `from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '名称',
    `relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '关联类型',
    `to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG的hcode',
    `node_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT '节点可以获取检查点时间可更改,关系TASK可以获取检查点时间【一个完整的图数据DAG-TASK必须包含节点和关系构建TASK】',
    `rel_check_point` datetime DEFAULT '1900-01-01 00:00:00' COMMENT '保存更新前node_check_point的值',
    `description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对该检查点任务的具体描述',
    `overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '同步全量数据的CYPHER:数据分块方案脚本',
    `overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '同步全量数据的CYPHER:不设置时间范围的同步脚本',
    `hcreatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `hupdatetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    `create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '创建人',
    `update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '更新人',
    `hisvalid` int(11) NOT NULL DEFAULT '1' COMMENT '逻辑删除标记:0-无效;1-有效',
    `src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT '数据来源标记',
    PRIMARY KEY (`huid`) USING BTREE,
    UNIQUE KEY `unique_key_02` (`hcode`) USING BTREE COMMENT '唯一索引',
    UNIQUE KEY `unique_key_01` (`from`,`to`,`relationship`) USING BTREE COMMENT '唯一索引',
    KEY `updateTime` (`hupdatetime`) USING BTREE,
    KEY `name` (`from`) USING BTREE,
    KEY `hisvalid` (`hisvalid`) USING BTREE,
    KEY `type` (`relationship`) USING BTREE,
    KEY `check_point` (`node_check_point`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=742715632 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='ONgDB DAG TASK检查点记录表';
    

    task-lock表结构设计

    任务状态锁表负责对任务状态进行加锁操作,保证TASK运行时的唯一性。

    任务模块解构

    数据分块

    控制加载到内存的数据量,避免占用过多堆内存保证图数据库可靠运行。

    // 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
    WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value
    

    任务状态回滚

    回滚到构建节点的任务状态,下一次构建节点关系时从回滚点开始操作【任务运行都从节点TASK开始】。

    // 当操作失败的数据包数量大于0时,回滚node_check_point
    WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
    CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
    

    任务状态同步

    关系TASK-CHECK-POINT和节点TASK-CHECK-POINT状态同步。

    // batchFailedSize>0则任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
    // batchFailedSize<=0【成功执行则节点TASK与关系TASK状态同步】正常执行则更新rel_check_point=node_check_point
    WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
    CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
    

    任务状态锁

    【图数据构建任务状态锁】【保证某一时刻关系的DAG中TASK运行的唯一性】。

    • 获取锁
    // 获取任务锁并锁定任务
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
    
    • 释放锁
    // 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
    

    完整实现案例

    同构图

    节点TASK

    // ===========================获取锁并执行TASK===========================
    // 获取任务锁并锁定任务
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
    // 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
    // 数据分块-从数据库获取检查点之后最大最小自增ID
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuaranteeV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
    // 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
    WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
    UNWIND value AS bactIdList
    WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
    // 定义SQL获取数据方式
    WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT
    hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
    // 批量迭代执行节点构建
    CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV001 {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
    WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
    // 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
    WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
    CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
    // 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
    
    

    关系TASK

    // ===========================获取锁并执行TASK===========================
    // 获取任务锁并锁定任务
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS lock WHERE lock.count>0 WITH lock
    // 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
    // 数据分块-从数据库获取检查点之后最大最小自增ID
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=?',[rawCheckPoint]) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
    // 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
    WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
    UNWIND value AS bactIdList
    WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
    // 定义SQL获取数据方式
    WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV001 WHERE hupdatetime>=? AND huid>=? AND huid<=?\',[check_point,batchMin,batchMax])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
    // 批量迭代执行节点构建
    CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV001 {hcode:row.from}),(to:HORGGuaranteeV001 {hcode:row.to}) MERGE (from)-[r:担保]->(to) SET r+={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
    // batchFailedSize>0则任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
    // batchFailedSize<=0【成功执行则节点TASK与关系TASK状态同步】正常执行则更新rel_check_point=node_check_point
    WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
    CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
    // 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV001)-[担保]->(HORGGuaranteeV001)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
    

    异构图

    异构图的TASK执行流程:(FROM-TASK)->(TO-TASK)->(REL-TASK),其中(TO-TASK)只负责node_check_point的回滚,不负责node_check_point的更新操作和任务状态同步操作。

    节点TASK

    • FROM节点TASK
    // ===========================获取锁并执行TASK===========================
    // 获取任务锁并锁定任务
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
    // 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
    // 数据分块-从数据库获取检查点之后最大最小自增ID
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,'发行证券']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
    // 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
    WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
    UNWIND value AS bactIdList
    WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
    // 定义SQL获取数据方式
    WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT org_hcode AS hcode,org_name AS name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\'发行证券\'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
    // 批量迭代执行节点构建
    CALL apoc.periodic.iterate(sqlData,'MERGE (n:HBondOrg {hcode:row.hcode}) SET n+=row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
    WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
    // 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
    WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
    CALL apoc.do.case([batchFailedSize<1,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?\',[$currentTime,$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row;'],'',{currentTime:currentTime,rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
    // 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
    
    • TO节点TASK
    // ===========================获取锁并执行TASK===========================
    // 获取任务锁并锁定任务
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
    // 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point,DATE_FORMAT(NOW(),\'%Y-%m-%d %H:%i:%s\') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
    // 数据分块-从数据库获取检查点之后最大最小自增ID
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,'发行证券']) YIELD row WITH row.min AS min,row.max AS max,check_point,currentTime,rawCheckPoint
    // 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
    WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,currentTime,rawCheckPoint
    UNWIND value AS bactIdList
    WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,currentTime,rawCheckPoint
    // 定义SQL获取数据方式
    WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode,name,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\'发行证券\'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,currentTime,rawCheckPoint
    // 批量迭代执行节点构建
    CALL apoc.periodic.iterate(sqlData,'MERGE (n:HEventBond {hcode:row.hcode}) SET n+=row', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
    WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
    // 当操作失败的数据包数量大于0时,回滚node_check_point
    WITH SUM(batch.failed) AS batchFailedSize,currentTime,rawCheckPoint
    CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=rel_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row;'],'',{}) YIELD value WITH value,batchFailedSize,currentTime,rawCheckPoint
    // 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,currentTime,rawCheckPoint;
    

    关系TASK

    // ===========================获取锁并执行TASK===========================
    // 获取任务锁并锁定任务
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK updateLock INNER JOIN (SELECT task_lock,hcode FROM ONGDB_TASK_CHECK_POINT_LOCK selectLock WHERE task_lock=0 AND hcode=?) selectLock ON updateLock.hcode=selectLock.hcode SET updateLock.task_lock=1',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS lock WHERE lock.count>0 WITH lock
    // 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,\'%Y-%m-%d %H:%i:%s\') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row WITH apoc.text.join(['\'',row.check_point,'\''], '') AS check_point,row.check_point AS rawCheckPoint
    // 数据分块-从数据库获取检查点之后最大最小自增ID
    CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT MIN(huid) AS min,MAX(huid) AS max FROM HBondOrg WHERE hupdatetime>=? AND type=?',[rawCheckPoint,'发行证券']) YIELD row WITH row.min AS min,row.max AS max,check_point,rawCheckPoint
    // 数据分块-从检查点开始按照指定数据块大小执行数据分块【设置一个默认分块,保证锁能顺利释放】
    WITH apoc.coll.union(olab.ids.batch(min,max,10000),[[0,1]]) AS value,check_point,rawCheckPoint
    UNWIND value AS bactIdList
    WITH bactIdList[0] AS batchMin,bactIdList[1] AS batchMax,check_point,rawCheckPoint
    // 定义SQL获取数据方式
    WITH REPLACE('CALL apoc.load.jdbc(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\', \'SELECT hcode AS `to`,org_hcode AS `from`,data_source,CONVERT(DATE_FORMAT(hcreatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\\\'%Y%m%d%H%i%S\\\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HBondOrg WHERE hupdatetime>=? AND huid>=? AND huid<=? AND type=?\',[check_point,batchMin,batchMax,\'发行证券\'])','check_point,batchMin,batchMax',check_point+','+batchMin+','+batchMax) AS sqlData,rawCheckPoint
    // 批量迭代执行节点构建
    CALL apoc.periodic.iterate(sqlData,'MATCH (from:HBondOrg {hcode:row.org_hcode}),(to:HEventBond {hcode:row.hcode}) MERGE (from)-[r:发行证券]->(to) SET r+=olab.reset.map(row,[\'from\',\'to\'])', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,rawCheckPoint
    // batchFailedSize>0则任务状态回滚【当任意一个批量构建关系的任务失败时回滚任务状态】【回滚:设置node_check_point等于当前的rel_check_point】
    // batchFailedSize<=0【成功执行则节点TASK与关系TASK状态同步】正常执行则更新rel_check_point=node_check_point
    WITH SUM(batch.failed) AS batchFailedSize,rawCheckPoint
    CALL apoc.do.case([batchFailedSize>0,'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=? WHERE hcode=?\',[$rawCheckPoint,\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row'],'CALL apoc.load.jdbcUpdate(\'jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC\',\'UPDATE ONGDB_TASK_CHECK_POINT SET rel_check_point=node_check_point WHERE hcode=?\',[\'HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)\']) YIELD row RETURN row',{rawCheckPoint:rawCheckPoint}) YIELD value WITH value,batchFailedSize,rawCheckPoint
    // 释放锁【TASK结束运行释放锁操作】【数据分块处设置一个默认分块,保证释放锁操作顺利执行】
    CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT_LOCK SET task_lock=0 WHERE hcode=?',['HGRAPHTASK(HBondOrg)-[发行证券]->(HEventBond)']) YIELD row AS releaseLock RETURN releaseLock,value,batchFailedSize,rawCheckPoint;
    

    备注

    通过对图数据任务的解构分析,可以设想CYPHER脚本的自动化生成完全是可以实现的。进一步设计实现一个自动化抽取图数据的系统是非常有价值的。从图数据schema设计,到数据模型推荐和图数据自动抽取,可以大大解放数据工程师的精力。

    # 图数据库相关插件包下载
    https://github.com/neo4j-contrib/neo4j-apoc-procedures
    https://github.com/ongdb-contrib/ongdb-lab-apoc
    
    展开全文
  • 异构图:与同构图相反,异构图指图中节点类型或关系类型多于一种。 二部图:一类特殊的异构图,可以将二部图的顶点集合拆分成两个子集,任意一条边的两个顶点都分别在这两个集合里。 属性图:相较于异构图,属性图给...

    在进行图数据相关的分析训练时对图数据的一些基本定义的理解能够更好选择合适的算法。

    同构图:同构图是指图中节点类型和关系类型(边的类型)都仅有一种。

    异构图:与同构图相反,异构图指图中节点类型或关系类型多于一种。

    二部图:一类特殊的异构图,可以将二部图的顶点集合拆分成两个子集,任意一条边的两个顶点都分别在这两个集合里。

    属性图:相较于异构图,属性图给图数据增加了额外的属性信息。

    展开全文
  • 同构图(Homogeneous Graph)、异构图(Heterogeneous Graph)、属性图(Property Graph)非显式图(Graph Constructed from Non-relational Data)。 (1)同构图同构图是指图中的节点类型关系类型都仅有一...

    最重要的4类图数据:

    同构图(Homogeneous Graph)、异构图(Heterogeneous Graph)、属性图(Property Graph)和非显式图(Graph Constructed from Non-relational Data)。

    (1)同构图:

    同构图是指图中的节点类型和关系类型都仅有一种。同构图是实际图数据的一种最简化的情况,如由超链接关系所构成的万维网,这类图数据的信息全部包含在邻接矩阵里。

    同构图:在图里面,节点的类型和边的类型只有一种的图
    举个例子,像社交网络中只存在一种节点类型,用户节点和一种边的类型,用户-用户之间的连边。

    (2)异构图:

    与同构图相反,异构图是指图中的节点类型或关系类型多于一种。在现实场景中,我们通常研究的图数据对象是多类型的,对象之间的交互关系也是多样化的。因此,异构图能够更好地贴近现实。

    异构图:在图里面,节点的类型+边的类型>2的一种图,
    举个例子,论文引用网络中,存在着作者节点和paper节点,边的关系有作者-作者之间的共同创作关系连边,作者-论文之间的从属关系,论文-论文之间的引用关系。

    (3)属性图:

    相较于异构图,属性图给图数据增加了额外的属性信息,如下图所示。对于一个属性图而言,节点和关系都有标签(Label)和属性(Property),这里的标签是指节点或关系的类型,如某节点的类型为“用户”,属性是节点或关系的附加描述信息,如“用户”节点可以有“姓名”“注册时间”“注册地址”等属性。属性图是一种最常见的工业级图数据的表示方式,能够广泛适用于多种业务场景下的数据表达。
    在这里插入图片描述
    属性图:图的节点上存在着初始属性attribute,可以用作后续节点的特征

    (4)非显式图:

    非显式图是指数据之间没有显式地定义出关系,需要依据某种规则或计算方式将数据的关系表达出来,进而将数据当成一种图数据进行研究。比如计算机3D视觉中的点云数据,如果我们将节点之间的空间距离转化成关系的话,点云数据就成了图数据。

    其他:

    动态图:图中的节点或者边都是随着时间变化的,可能增加或减少,一般是图的构成是按照时间片来构成,每一个时间片一个图的表示,例如t1时刻的图是初始图,t2时刻的图就是节点或连边变化后的图一直到tn时刻

    关系图:图表示了一种节点之间的隐含关系,举个例子 知识图谱

    reference

    《深入浅出图神经网络》

    展开全文
  • 01图论基础

    2021-11-16 20:13:53
    什么是 的基本示意 是描述复杂事务的数据表示形式,由节点和边组成,数学上一般表述为 G-(V,E)。其中的V(vertical)代表节点,可被理解为事物。 而E(edge)代表边,描述的是两个事物之间的关系。...

    什么是图

    图的基本示意图
    图的基本示意图

    图是描述复杂事务的数据表示形式,由节点和边组成,数学上一般表述为
    图G-(V,E)。其中的V(vertical)代表节点,可被理解为事物。
    而E(edge)代表边,描述的是两个事物之间的关系。例如一个图的社交网络图,每个人都可视为节点,而人与人之间的关系可被视为边。
    而在我们的推荐系统中,用户与物品之间的交互关系,用户与用户自身的关系,物品与物品之间的关系,完全可由一张图完整的进行描述。如下图所示:
    用户与物品之间的关系图在这里插入图片描述

    无向图与有向图

    无向图在这里插入图片描述
    有向图在这里插入图片描述
    双向图在这里插入图片描述

    无向图是由没有方向的无向边组成的图,而有向图是由具有方向的有向边组成的图,有向边的出发节点称为头节点,结束节点则称为尾节点,无向图与双向图等价

    有向有环图和无向无环图

    有向有环图在这里插入图片描述

    有向无环图在这里插入图片描述

    上图展示了有向有环图与有向无环图的区别,环是一种特殊的有向图,例如上方图中的1,2,3,4节点,1->2->4->3->1即形成一个环。在环中的任意节点经过边游走都能回到自身.
    如果在一个有向图中没有任何一个节点经过边游走能回到自身,则便是有向无环图。即下面的图

    无权图与有权图

    在这里插入图片描述
    无权图指边没有权重,有权图只边带有权重
    在这里插入图片描述

    同构图和异构图

    在这里插入图片描述
               同构图
    在这里插入图片描述
               异构图

    图的表示

    1.邻接矩阵

    虽然图在人类眼中可以很自观的展示出信息,但是对于计算机来讲,就不是那么容易理解了。所以我们需要研究图如何在计算机中表示。
    邻接矩阵(Adjacency Matrix)是一种最基础的图表示方式。假设一个图的节点数量为N,则生成一个N*N的矩阵。矩阵中的值为对应位置节点与节点之间的关系,一般用A表示。

    场景1:无向图
    在这里插入图片描述
    在这里插入图片描述

    在一个基础的无向图中,若节点i与节点j有边连接,则在邻接矩阵的对应位置赋值1即可,记作Aij=Aji=1,否则为0。如上图所示,比如节点1与节点2右边相连,所以A12和A21的位置为1。无向图的邻接矩降是一个以对角矩阵镜像对称的矩阵。




    场景2: 有向图
    在这里插入图片描述
    在这里插入图片描述
    对于一个有向图,邻接矩阵不是对称矩阵,我们将邻接矩阵的行索引代表有向图中有向边的头节点,列索引代表尽节点。那么如果有边i->j,则Aij=1,Aji不再为1,除非连接i和j是一条双向边




    场景3:有权图
    在这里插入图片描述
    在这里插入图片描述
    有权图的邻接表与无权图的唯一区别就是将边的权重代替了原来1的位置。
    值得提的是,所有邻接矩阵的对角线均为0,因为对角线其实代表了节点与自身的关系,而节点与自身并无边相连,所以为0。
    另外异构图的图表示方法略微复杂一点,我们具体放在后面讲。

    2.邻接列表

    场景1:无权图
    将一张图以矩阵的形式表示固然非常使于计算,但是对于稀疏的人图非常不友好。而邻按列表的表示方式对于稀疏人图则非常友好。
    在这里插入图片描述
    1:2,5
    2:4
    3:1
    4:3

    场景2:有权图
    在这里插入图片描述
    key: value
    1: [(2,0.2), (5,3.6)]
    2: [(4, 1.6)]
    3: [(1,0.8 )]
    4:[(3,6.2)]

    3.边集

            边集就更的简单,通常用两个头尾节点的索引元组表示一条边。例如头节点是h,尾节点是t:那么这一条有向边就是(b, t),如果是一条无向边则就用一对对称元组表示,即(h, t), (t, h)
            场景1:有向无权图
    在这里插入图片描述

    (1, 2),
    (1,5),
    (2.4),
    (4.3),
    (3,1)

            场景2:无向无权图

    在这里插入图片描述
    (1,2),
    (2,1),
    (1,5),
    (5.1),
    (2.4),
    (4,2),
    (4.3),
    (3, 4),
    (3,1),
    (1.3)
            场景3:有权图
    在这里插入图片描述

    (1.0.2.2),
    (1,3.6.5),
    (2, 1.6.4),
    (4,6.2,3),
    (3,0.8,1)

    邻居与度

    节点的邻居(neighbor)指的是与该节点在同一边另一端的节点。
    节点的度(degree)指的是该节点邻居的数量.
    场景1:无向图
    在这里插入图片描述
    节点1:
    邻居(neighbor)= 2,3,5
    度(degree)- 3

    节点2:邻居(neighbor)-1,4
    度(degree)=2

    场景2:有向图
    在这里插入图片描述
    节点1:
    前继邻居(predecessor)-3
    后继邻居(successor)= 2,5
    入度(indegree)= 1
    出度(outdegree)= 2

    结构特征、节点特征、边特征

    在这里插入图片描述
                          用户物品特征图
    如上图所示,该图表示的是推荐场景中常有的用户与物品交互关系图。整个图即代表结构特征,本章目前讨论到现在也仅是在讨论图的结构特征
    图由节点与边组成,对于节点来说,本身也可以带有一些特征或属性,例如上图中的用户画像特征。当然对于边同理,如上图的观看时长或点赞频率,边特征有时也叫关系特征:边的权重也可视为特征。
    在机器学习与深度学习时,经常会对内容进行嵌入(embedding)操作,由嵌入操作得到的节点向量与边向量也可视为它们的特征.

    处理图的库

    1. NetworkX

    Networks是最老牌的图处理库,早在2002年就发布了第一版,对于专门学习图算法的同学一定不陌生。简单易用,实现了一些基础的图算法。例如最小路径算法,最小权重生成树算法等。
    https://www.osgeo.cn/networkx/tutorial.html

    2.DGL(Deep Graph Library)

    2.DGi(第一版发布于2018年,亚马逊继MxNet后又一力作,基于深度学习框架的图神经网络库,实现了主流的图神经网络算法。兼容MxNet,PyTorch,Tensorlow, 有中文的官方教程。是目前最主流的处理图的python APl
    https://docs.dgl.ai/en/latest/guide_cn/index.html

    PGL(Paddle Graph Learning)

    3.PGL(Paddle Graph Learning),第一版发布于2020年,百度发布的国人之光框架,号称速度是DGL的13倍,使用与上手更容易。并且集成了百度自研的高效算法。但有一个致命的缺点,就是只兼容百度自家的飞浆(PaddlePaddle)深度学习框架。不知未来是否公与MxNet,PyTorch等兼容。

    展开全文
  • 同质,异质以及属性

    千次阅读 2020-09-02 17:34:06
    同构图指的是图中的节点类型关系类型都仅有一种。 异构图 指的是图中的节点类型或关系类型多于一种。 属性图 在异构图基础上增加了额外的属性信息。 对于节点类型关系类型的理解 比如我今天看了电影《流浪地球...
  • 图论----同构图(详解)

    千次阅读 2017-05-16 23:32:00
    图论当中的术语,假设G=(V,E)G1=(V1,E1)是两个,如果存在一个双射m:V→V1,使得对所有的x,y∈V均...简单来说,同构图的结点数必须相同,结构必须相同。 如3.6,第一个图形第二个图形的区别在于环的数量。...
  • 笔记整理:许泽众,浙江大学博士在读论文链接:https://arxiv.org/abs/2003.01332本文主要提出一种处理异构图的方法,所谓异构图(Heterogeneous gra...
  • 异构图HAN

    2021-09-25 18:46:28
    异构图:节点类别的数量加边的类别的数量大于等于2,也就是包含不同类型节点连接的异构图。下面的图就是一个异构图,包含电影、演员、导演。 这里介绍异构图的一个图算法Heterogeneous Graph Attention Network ...
  • 1. 的创建 1.1 同质 1.1.1 直接创建:dgl.graph() 1.1.3 节点与边的特征 1.2 异质 1.2.1 异质 Heterogeneous Graphs 1.2.2 创建异质-dgl.heterograph() 1.3 Using DGLGraph on a GPU 1.4 从外部...
  • 异构图 Heterogeneous Graph Neural Networks

    千次阅读 2021-07-20 11:46:14
    定义:包含不同类型节点链接的异构图 像下面这个例子,是一个电影、导演演员的异构图。 第一个异构图的算法 异构图图注意力机制Heterogeneous Graph Attention Networks 首先看一下作者给的异构图的定义 Node-...
  • 同构图卷积 from time import time import numpy as np import dgl import torch import torch.nn as nn import torch.nn.functional as F # 模型 class TwoLayerModel(nn.Module): def __init__(self): super()...
  • 网络学习报告

    2020-11-28 13:48:53
    PGL图网络学习 1.1、异构图的定义: 简单来说就是:包含至少两种类型的节点或边的图结构。 [外链图片转存失败,源站可能有防盗链...1.2、同构图和异构图的数学表示: 同构图:G(V,E) —— V表示节点,E表示边(关系)
  • DGL图异构神经网络

    千次阅读 2019-12-31 15:50:06
    传统同构图(Homogeneous Graph)数据中只存在一种节点和边,因此在构建图神经网络时所有节点共享同样的模型参数并且拥有同样维度的特征空间。而异构图(Heterogeneous Graph)中可以存在不只一种节点和边,因此允许...
  • 异构图神经网络

    万次阅读 2020-05-03 13:55:27
    1. 摘要     异构图表示学习的目的是为每个节点寻求一个有意义的...尽管在同构图嵌入、属性嵌入以及神经网络等方面做了大量的工作,但很少有人能够有效地联合考虑异构结构()信息以及各节点的异构内容信息...
  • 知识图谱中的异构

    2020-10-15 11:31:18
    知识图谱中的异构前言一、语言层不匹配1.语法不匹配2.逻辑表示不匹配3.原语的语义不匹配4.语言表达能力不匹配二、模式层不匹配1.概念化不匹配1.1 概念范围的不匹配1.2 模型覆盖的不匹配2.解释不匹配2.1 模型风格的不...
  • 异构图前世今生

    千次阅读 2020-06-18 21:02:03
    构图初代,主要目标是network embedding,获得节点低维映射以便进行下游任务 本文提出了metapath2vecmetapath2vec++两种模型,思路:随机游走获得元路径+skipgram+logloss 下能够比较清晰地体现论文中的网络...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 698
精华内容 279
关键字:

同构图和异构图