-
2022-02-17 16:24:35
flink加载外部数据源
override def open(parameters: Configuration): Unit = {
logger.info(“init…”)
query()
// new Timer
val timer = new Timer(true)
// schedule is 10 second 定义了一个10秒的定时器,定时执行查询数据库的方法
timer.schedule(new TimerTask {
override def run(): Unit = {
query()
}
}, 10000, 10000)}
或者可以使用:
scheduledExecutorService.scheduleAtFixedRate(() -> {
TASK_CONFIG_MAP.putAll(configFetch.getSnapshotTasks());
log.info(“load snapshot config={}”, TASK_CONFIG_MAP);
}, 5, 5,TimeUnit.MINUTES);
建议采用第二种方式**可以在richfunction的open方法中,通过该方法实现。
第二个参数是延迟多少执行,第三个参数才是循环执行的。更多相关内容 -
如何使用微搭低代码平台外部数据源.docx
2021-03-24 14:06:57如何使用微搭低代码平台外部数据源 -
知识库系统与外部数据源接口的研究
2020-12-16 07:43:03知识库系统与外部数据源接口的研究以实践认知世界,以实事构架世界,知识库系统与外部数据源接口的研究总...该文档为知识库系统与外部数据源接口的研究,是一份很不错的参考资料,具有较高参考价值,感兴趣的可以... -
silverstripe-externaldata:自定义DataObjectModelAdmin实现以使用外部数据源
2021-05-02 08:25:16自定义DataObject / ModelAdmin实现,可与外部数据源一起使用 目的是有一种简单的方法将ModelAdmin和GridField与来自外部数据源的数据一起使用。 只需创建一个扩展ExternalDataObject的模型并实现您自己的get() , ... -
javashuffle源码-spark-data-sources:使用V2API开发Spark外部数据源
2021-06-04 12:56:42外部数据源 该项目说明了 Spark 2.3.0 中引入的新 V2 Apache Spark 外部数据源 API。 它包括: 一个简单的内存数据库系统 (ExampleDB),它支持说明 API 特性所需的所有数据访问范式 一系列不同复杂度的数据源,全部... -
clickhouse外部数据源导入通用方案-非jdbc
2022-03-02 18:04:39clickhouse外部数据源导入通用方案-非jdbc需求:
1、在工作中,我们常常需要将外部 hive 或者 mysql、oracle 等数据源导入到clickhouse中,对于多种外部数据源,是否有通用的数据导入方案?2、我们在clickhouse上维持一张查询主表,但外部数据源表是hive增量表,新增数据需要同步更新到clickhouse上,是否有不通过第三方组件的插入方式?
解决:
1、外部数据源有可能是多种,如下图:
那么这里不纠结某个外部数据源的导入速度,我这里提供一种通用方式,可以先在clickhouse中
创建一张外部引擎表,详细官网都有格式介绍:https://clickhouse.com/docs/zh/engines/table-engines/integrations/hdfs/由于clickhouse支持insert into xxx select * from xxx 语句进行写入
故我们可以将外部引擎表通过此函数导入进需要经常查询的MergeTree引擎表中
那么有人可能会说:既然外部引擎表可以供我们查询,我们为何要重新写入到MergeTree引擎表?
这是因为clickhouse在OLAP查询分析中的核心引擎就是MergeTree,而外部表只是嫁接表,无法使用clickhouse的核心能力,故需要有一层导入过程,这里可以将外部引擎表想象成中间层,最终供外部查询的数据都应该在MergeTree表中,如下图:
2、在hive增量表层面上,新增数据大部分会存储到一张临时表然后合并到hive主表,亦或者以文件的方式存储再转入至hive目录下,针对这种新增数据很明确的情况,我们常常使用的方式是通过clickhouse创建一张临时外部表指向hive临时表,然后通过 insert into xxx select * from xxx 语句写入进主表即可。
-
大数据背景下的电力行业数据与外部数据融合分析探索研究
2020-10-15 20:42:09随着电力企业数据的迅猛增长及电力行业信息化系统的不断建设和完善,使得电力行业数据步入了大数据时代,基于电力企业自身的海量数据,通过与外部数据的交互融合及挖掘应用分析,对内可支撑公司运营效率提升,促进... -
知识库系统与外部数据源接口的研究.doc
2021-09-18 12:00:55知识库系统与外部数据源接口的研究.doc -
Excel2016数据处理与分析 第10章 获取外部数据源.pptx
2020-02-26 17:44:4510.1 常用的导入外部数据的方法;10.1 常用的导入外部数据的方法;10.1 常用的导入外部数据的方法;10.1 常用的导入外部数据的方法;10.1 常用的导入外部数据的方法;10.1 常用的导入外部数据的方法;10.1 常用的导入外部... -
spark自定义外部数据源
2019-08-03 13:06:27对于spark外部数据源来说,要先了解这几个类 BaseRelation:定义数据的schema信息,把我们的数据转成RDD[Row] RelationProvider:是一个relation的提供者,创建BaseRelation TableScan:读取数据并构建行,拿出所有的...对于spark外部数据源来说,要先了解这几个类
BaseRelation:定义数据的schema信息,把我们的数据转成RDD[Row]
RelationProvider:是一个relation的提供者,创建BaseRelation
TableScan:读取数据并构建行,拿出所有的数据
PrunedScan:列裁剪的
PrunedFilteredScan:列裁剪➕过滤InsertableRelation:回写数据的relation
insertableRelation有如下假设需要注意:
/**- A BaseRelation that can be used to insert data into it through the insert method.
- If overwrite in insert method is true, the old data in the relation should be overwritten with
- the new data. If overwrite in insert method is false, the new data should be appended.
- InsertableRelation has the following three assumptions.
-
- It assumes that the data (Rows in the DataFrame) provided to the insert method
- exactly matches the ordinal of fields in the schema of the BaseRelation.
-
- It assumes that the schema of this relation will not be changed.
- Even if the insert method updates the schema (e.g. a relation of JSON or Parquet data may have a
- schema update after an insert operation), the new schema will not be used.
-
- It assumes that fields of the data provided in the insert method are nullable.
- If a data source needs to check the actual nullability of a field, it needs to do it in the
- insert method.
- @since 1.3.0
*/
@InterfaceStability.Stable
trait InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit
}
对于spark外部数据源来说,要先了解这几个类
BaseRelation:定义数据的schema信息,把我们的数据转成RDD[Row]
RelationProvider:是一个relation的提供者,创建BaseRelation
TableScan:读取数据并构建行,拿出所有的数据
PrunedScan:列裁剪的
PrunedFilteredScan:列裁剪➕过滤InsertableRelation:回写数据的relation
insertableRelation有如下假设需要注意:
/**- A BaseRelation that can be used to insert data into it through the insert method.
- If overwrite in insert method is true, the old data in the relation should be overwritten with
- the new data. If overwrite in insert method is false, the new data should be appended.
- InsertableRelation has the following three assumptions.
-
- It assumes that the data (Rows in the DataFrame) provided to the insert method
- exactly matches the ordinal of fields in the schema of the BaseRelation.
-
- It assumes that the schema of this relation will not be changed.
- Even if the insert method updates the schema (e.g. a relation of JSON or Parquet data may have a
- schema update after an insert operation), the new schema will not be used.
-
- It assumes that fields of the data provided in the insert method are nullable.
- If a data source needs to check the actual nullability of a field, it needs to do it in the
- insert method.
- @since 1.3.0
*/
@InterfaceStability.Stable
trait InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit
}
下面我们可以简单看一下JDBCRelation
private[sql] case class JDBCRelation(
parts: Array[Partition], jdbcOptions: JDBCOptions)(@transient val sparkSession: SparkSession)
extends BaseRelation
with PrunedFilteredScan
with InsertableRelation继承了 BaseRelation ,BaseRelation 是一个抽象类,就需要实现他的属性和方法:
override def sqlContext: SQLContext = sparkSession.sqlContextoverride val needConversion: Boolean = false
override val schema: StructType = {
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
jdbcOptions.customSchema match {
case Some(customSchema) => JdbcUtils.getCustomSchema(
tableSchema, customSchema, sparkSession.sessionState.conf.resolver)
case None => tableSchema
}
}
…import org.apache.spark.sql.SQLContext import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider} import org.apache.spark.sql.types.StructType /** * @Author: lih * @Date: 2019/8/2 11:40 PM * @Version 1.0 */ class DefaultSource extends RelationProvider with SchemaRelationProvider{ override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { createRelation(sqlContext,parameters,null) } override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = { val path = parameters.get("path") path match { case Some(p)=> new TextDataSourceRelation(sqlContext,p,schema) case _=> throw new IllegalArgumentException("path is required ...") } } }
import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ /** * @Author: lih * @Date: 2019/8/2 11:41 PM * @Version 1.0 */ class TextDataSourceRelation(override val sqlContext: SQLContext, path: String, userSchema: StructType ) extends BaseRelation with TableScan with Logging with Serializable { override def schema: StructType = { if (userSchema != null) { userSchema } else { StructType( StructField("id", LongType, false) :: StructField("name", StringType, false) :: StructField("gender", StringType, false) :: StructField("salar", LongType, false) :: StructField("comm", LongType, false) :: Nil ) } } override def buildScan(): RDD[Row] = { logError("this is ruozedata custom buildScan...") var rdd = sqlContext.sparkContext.wholeTextFiles(path).map(_._2) val schemaField = schema.fields // rdd + schemaField val rows = rdd.map(fileContent => { val lines = fileContent.split("\n") val data = lines.map(_.split(",").map(x=>x.trim)).toSeq val result = data.map(x => x.zipWithIndex.map{ case (value, index) => { val columnName = schemaField(index).name caseTo(if(columnName.equalsIgnoreCase("gender")) { if(value == "0") { "男" } else if(value == "1"){ "女" } else { "未知" } } else { value }, schemaField(index).dataType) } }) result.map(x => Row.fromSeq(x)) }) rows.flatMap(x=>x) } def caseTo(value: String, dataType: DataType) = { dataType match { case _:DoubleType => value.toDouble case _:LongType => value.toLong case _:StringType => value } } }
数据:
1,li,0,100000,2000
2,zhang,1,20000,23223object Test { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName(this.getClass.getSimpleName).master("local[8]") .getOrCreate() val df = spark .read .format("com.ztgx.datasourse.DefaultSource") .load("file:///Users/mac/Desktop/1.txt") df.show() spark.stop() } }
结果:
+---+-----+------+------+-----+ | id| name|gender| salar| comm| +---+-----+------+------+-----+ | 1| li| 男|100000| 2000| | 2|zhang| 女| 20000|23223| +---+-----+------+------+-----+
-
勤哲电子表格服务器ES引用MYSQL外部数据源教程.docx
2020-10-12 20:57:14勤哲电子表格服务器 MY SQ外部数据源教程 ES引用 作者: 日期: ES2010引用MYSQL外部数据源教程 整理安哥 qq:29154754 2 一 安装mysql的ODBC驱动程序mysql官网有下载 二 mysqlODBC数据源配置 1mysql驱动程序安装好后点... -
外部数据源
2018-05-21 12:23:37不同压缩格式,不同存储接口,用户肯定希望从不同数据源收集数据方便、快速从不同数据源()经过混合处理(json直接和parqent jion)再将结果以特定格式写回到指定系统上去sparksql 1.2====》外部数据源api问题...产生背景
概述
目标
操作parquet文件数据
操作hive数据
操作mysql数据
统一
产生背景
每一个spark都是以加载数据开始,经过一系列处理,最后存储到其他地方;
不同格式,不同压缩格式,不同存储接口,用户肯定希望从不同数据源收集数据
方便、快速从不同数据源()经过混合处理(json直接和parqent jion)再将结果以特定格式写回到指定系统上去
sparksql 1.2====》外部数据源api
问题:
1.加载、保存数据并不简单,比如从关系数据库sqoop加载到hdfs然后..
2.解析原生数据(text/json/parquet)
3.转换数据格式
数据集存储在不同的存储系统、格式上面
api概述:
一种扩展方式,将外部数据源整合到sparl sql中
读写各种格式(指定格式和路径local,分布式)
目标:
开发人员:是否需要把代码合并到spark源码中?比如访问微博(按照dataframe api实现)
对于使用人员:非常容易加载保存
支持:build-in json parquet jdbc 其他数据源:packages:外部的,非spark内置的,一个网站
操作 parqeut文件数据
-
关于图书编目利用外部数据源问题的探讨.docx
2021-11-26 16:58:48关于图书编目利用外部数据源问题的探讨.docx -
实战 | 如何使用微搭低代码平台外部数据源
2021-03-25 20:16:15但是随着应用开发的深入,有时候也避免不了要调用第三方提供的接口服务,我们今天就带着大家使用一下低码平台的外部数据源。 创建外部数据源 登录低码的控制台在数据源管理菜单中点击【新建数据源】,在下拉选项中... -
Python---Working-with-External-Data-Sources:该存储库包含在Python中使用外部数据源进行写入和读取外部...
2021-03-25 07:55:41Python-使用外部数据源 该存储库包含在Python中使用外部数据源进行写入和读取外部数据源的示例。 包括文件合并 -
Spark 外部数据源调用代码
2018-05-08 11:08:00Spark 外部数据源调用代码,CSV文件 和HIVE读取方式。 -
勤哲电子表格服务器ES2010引用MYSQL外部数据源教程.docx
2021-10-04 07:43:20勤哲电子表格服务器ES2010引用MYSQL外部数据源教程.docx -
extraneous-ui:用于外部数据源NCB的UI组件
2021-05-17 12:59:57外部用户界面 用于外部数据源NCB的UI组件 -
【精美排版】勤哲电子表格服务器ES引用MYSQL外部数据源教程.docx
2021-10-04 06:45:18【精美排版】勤哲电子表格服务器ES引用MYSQL外部数据源教程.docx -
勤哲电子表格服务器ES2010引用MYSQL外部数据源教程.doc
2020-01-13 18:56:21ES2010引用MYSQL外部数据源教程 整理:安哥 qq:29154754 zxa2000@163.com 20130827 一安装mysql的ODBC驱动程序mysql官网有下载 二mysqlODBC数据源配置 1mysql驱动程序安装好后点开始/管理工具/数据源 点添加 点完成 ... -
SQL Server 2012数据库导入外部数据源(Excel)
2021-11-06 14:39:55点击下一步,选择数据源格式,Excel文件路径,我以excel为例进行测试,选择完毕,点击下一步 设置身份验证方式,根据实际情况进行相应的身份验证,设置完,进行下一步操作 默认操作,继续下一步 选择要操作的... -
Flink DataStream 内外部数据源的各种情况汇总
2019-12-26 10:05:09一、内置数据源 (1)文件数据源 在 StreamExecutionEnvironment 中,可以使用 readTextFile 方法直接读取文本文件,也可以使用 readFile 方法通过指定文件 InputFormat 来读取特定数据类型的文件,如 ... -
泛微OA ecology 配置了外部数据源,但是读取不到数据里的表?
2021-05-14 10:55:02配置的数据源不仅需要有查询表数据的权限, 也需要读取系统表结构的权限。 oracle数据库:系统表 相关表 user_tab_columns ,user_tables,user_views ; sqlserver数据库:sysobjects ,syscolumns; mysql数据库: ... -
从外部数据源接入到kafka及kafka存储数据的原理机制
2020-06-19 22:33:341、外部数据是怎样的接入到kafka的? 外部数据接入到kafka的流程示意图: (1)接入数据流程 (1)producer先从broker-list的节点中找到该partition的leader; (2)然后producer将消息发送给作为leader的partition... -
pb9 tree+外部数据源dw例子
2010-06-17 00:20:48pb9 tree+外部数据源dw例子 pb9 tree+外部数据源dw例子 -
八、使用pxf读取写外部数据源
2019-04-09 19:12:51使用PXF读写外部数据 Greenplum平台扩展框架...如果想通过pxf访问外部数据源,则必须开启pxf。同时还必须给想用的用户赋访问pxf的权限。 安装初始化pxf并分配权限后,您可以使用该CREATE EXTERNAL TABLE命令使用pxf... -
使用 access 导入表时,报错:该属性在外部数据源或用低版本的microsoft jet所创建的数据库中不受支持。
2021-01-15 14:12:57在使用 access 导入表时,报错:该属性在外部数据源或用低版本的microsoft jet所创建的数据库中不受支持。 报错原因:表的列名中包含空格或其他不规范字符 解决方法:修改数据源中不规范的列名,或者在 “第一... -
SQLServer——SQLServer链接外部数据源
2018-01-25 08:50:00一、新建ODBC数据源 1、打开控制面板→管理工具→ODBC数据源→系统DSN 2、添加新系统数据源 二、新建SQLServer连接服务器 1、打开服务器对象→链接服务器→新建链接服务器 2、新建连接服务器设置 ... -
SparkSQL2.0扩展外部数据源原理(读取外部系统)
2017-04-27 10:02:38spark2.0中,提供了两种扩展外部数据源的接口, 第一种外部数据源为文件,第二种外部数据源为系统 spark内部调用外部数据源包的类是下面,包括解析BaseRelation,提取schema等 package org.apache.spark.sql.... -
实时更新Excel文档外部数据源的数据
2016-03-06 15:12:00实时更新Excel文档外部数据源的数据 单元格区域、Excel 表、数据透视表或数据透视图均可以连接到外部数据源(数据源:用于连接数据库的一组存储的“源”信息。数据源包含数据库服务器的名称和位置、数据库驱动程序...