博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的CsvTableSink
阅读量:7251 次
发布时间:2019-06-29

本文共 7626 字,大约阅读时间需要 25 分钟。

本文主要研究一下flink的CsvTableSink

TableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSink.scala

trait TableSink[T] {  /**    * Returns the type expected by this [[TableSink]].    *    * This type should depend on the types returned by [[getFieldNames]].    *    * @return The type expected by this [[TableSink]].    */  def getOutputType: TypeInformation[T]  /** Returns the names of the table fields. */  def getFieldNames: Array[String]  /** Returns the types of the table fields. */  def getFieldTypes: Array[TypeInformation[_]]  /**    * Return a copy of this [[TableSink]] configured with the field names and types of the    * [[Table]] to emit.    *    * @param fieldNames The field names of the table to emit.    * @param fieldTypes The field types of the table to emit.    * @return A copy of this [[TableSink]] configured with the field names and types of the    *         [[Table]] to emit.    */  def configure(fieldNames: Array[String],                fieldTypes: Array[TypeInformation[_]]): TableSink[T]}复制代码
  • TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法

BatchTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/BatchTableSink.scala

trait BatchTableSink[T] extends TableSink[T] {  /** Emits the DataSet. */  def emitDataSet(dataSet: DataSet[T]): Unit}复制代码
  • BatchTableSink继承了TableSink,定义了emitDataSet方法

StreamTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/StreamTableSink.scala

trait StreamTableSink[T] extends TableSink[T] {  /** Emits the DataStream. */  def emitDataStream(dataStream: DataStream[T]): Unit}复制代码
  • StreamTableSink继承了TableSink,定义了emitDataStream方法

TableSinkBase

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSinkBase.scala

trait TableSinkBase[T] extends TableSink[T] {  private var fieldNames: Option[Array[String]] = None  private var fieldTypes: Option[Array[TypeInformation[_]]] = None  /** Return a deep copy of the [[TableSink]]. */  protected def copy: TableSinkBase[T]  /**    * Return the field names of the [[Table]] to emit. */  def getFieldNames: Array[String] = {    fieldNames match {      case Some(n) => n      case None => throw new IllegalStateException(        "TableSink must be configured to retrieve field names.")    }  }  /** Return the field types of the [[Table]] to emit. */  def getFieldTypes: Array[TypeInformation[_]] = {    fieldTypes match {      case Some(t) => t      case None => throw new IllegalStateException(        "TableSink must be configured to retrieve field types.")    }  }  /**    * Return a copy of this [[TableSink]] configured with the field names and types of the    * [[Table]] to emit.    *    * @param fieldNames The field names of the table to emit.    * @param fieldTypes The field types of the table to emit.    * @return A copy of this [[TableSink]] configured with the field names and types of the    *         [[Table]] to emit.    */  final def configure(fieldNames: Array[String],                      fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {    val configuredSink = this.copy    configuredSink.fieldNames = Some(fieldNames)    configuredSink.fieldTypes = Some(fieldTypes)    configuredSink  }}复制代码
  • TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法

CsvTableSink

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/CsvTableSink.scala

class CsvTableSink(    path: String,    fieldDelim: Option[String],    numFiles: Option[Int],    writeMode: Option[WriteMode])  extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] {  /**    * A simple [[TableSink]] to emit data as CSV files.    *    * @param path The output path to write the Table to.    * @param fieldDelim The field delimiter, ',' by default.    */  def this(path: String, fieldDelim: String = ",") {    this(path, Some(fieldDelim), None, None)  }  /**    * A simple [[TableSink]] to emit data as CSV files.    *    * @param path The output path to write the Table to.    * @param fieldDelim The field delimiter.    * @param numFiles The number of files to write to.    * @param writeMode The write mode to specify whether existing files are overwritten or not.    */  def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {    this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))  }  override def emitDataSet(dataSet: DataSet[Row]): Unit = {    val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(",")))    if (numFiles.isDefined) {      csvRows.setParallelism(numFiles.get)    }    val sink = writeMode match {      case None => csvRows.writeAsText(path)      case Some(wm) => csvRows.writeAsText(path, wm)    }    if (numFiles.isDefined) {      sink.setParallelism(numFiles.get)    }    sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))  }  override def emitDataStream(dataStream: DataStream[Row]): Unit = {    val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(",")))    if (numFiles.isDefined) {      csvRows.setParallelism(numFiles.get)    }    val sink = writeMode match {      case None => csvRows.writeAsText(path)      case Some(wm) => csvRows.writeAsText(path, wm)    }    if (numFiles.isDefined) {      sink.setParallelism(numFiles.get)    }    sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))  }  override protected def copy: TableSinkBase[Row] = {    new CsvTableSink(path, fieldDelim, numFiles, writeMode)  }  override def getOutputType: TypeInformation[Row] = {    new RowTypeInfo(getFieldTypes: _*)  }}/**  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.  *  * @param fieldDelim The field delimiter.  */class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {  override def map(row: Row): String = {    val builder = new StringBuilder    // write first value    val v = row.getField(0)    if (v != null) {      builder.append(v.toString)    }    // write following values    for (i <- 1 until row.getArity) {      builder.append(fieldDelim)      val v = row.getField(i)      if (v != null) {        builder.append(v.toString)      }    }    builder.mkString  }}/**  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.  *  * @param fieldDelim The field delimiter.  */class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {  override def map(row: Row): String = {    val builder = new StringBuilder    // write first value    val v = row.getField(0)    if (v != null) {      builder.append(v.toString)    }    // write following values    for (i <- 1 until row.getArity) {      builder.append(fieldDelim)      val v = row.getField(i)      if (v != null) {        builder.append(v.toString)      }    }    builder.mkString  }}复制代码
  • CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink
  • emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
  • CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件

小结

  • TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法;BatchTableSink继承了TableSink,定义了emitDataSet方法;StreamTableSink继承了TableSink,定义了emitDataStream方法;TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法
  • CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink;emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
  • CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件

doc

转载地址:http://lxhbm.baihongyu.com/

你可能感兴趣的文章
Android开发套路收集整理与讨论
查看>>
代码规范的重要性,我已经放弃治疗
查看>>
笨办法学C 练习30:自动化测试
查看>>
mui初级入门教程(五)— 聊聊即时通讯(IM),基于环信 web im SDK
查看>>
[vs2008]Visual Studio 2008 SP1添加或删除功能提示查找SQLSysClrTypes.msi文件
查看>>
JS 设计模式二(封装)
查看>>
JavaScript “跑马灯”抽奖活动代码解析与优化(一)
查看>>
为什么我们选择 segmentfault 写作?
查看>>
多模型融合推荐算法在达观数据的运用
查看>>
JDK 11 马上就要来了!JDK 12 还会远吗?
查看>>
Kali Linux 2019.1 发布,Metasploit 更新到 5.0 版本
查看>>
【mysql的设计与优化专题(1)】ER图,数据建模与数据字典
查看>>
Jibo’s Name: How did we pick it?
查看>>
device's media capture mechanism,利用input:file调用设备的照相机/相册、摄像机、录音机...
查看>>
BroadLink:三款新品力求无障碍人机交互,三大平台分三期对外开放 ...
查看>>
掌门1对1获3.5亿美元E-1轮融资,华人文化产业基金、中金甲子基金等投资 ...
查看>>
Unity中的通用对象池
查看>>
ORA-00600: internal error code, arguments: [16703], [1403], [28], [...
查看>>
忆芯科技发布新一代国产主控芯片STAR1000P!4月完成量产版本 ...
查看>>
如何用条码标签打印软件实现商品价签制定会员价 ...
查看>>