您当前的位置:首页 > 互联网百科 > 大数据

Flink流处理API——Source

时间:2020-08-11 11:04:31  来源:  作者:

本文主要从以下几个方面介绍Flink的流处理API——Source

一、从集合中读取数据

二、从文件中读取数据

三、从Kafka中读取数据

四、自定义Source

数据处理的过程基本可以分为三个阶段分别是,数据从来哪里,做什么业务逻辑,落地到哪里去。

这三部分在Flink中分别被称为Source、Transform和Sink

版本:

scala:2.11.12

Kafka:0.8.2.2

Flink:1.7.2

pom.xml依赖部分(log日志的依赖一定要加上,否则当Flink从Kafka0.8中读取数据报Failed to instantiate SLF4J LoggerFactory Reported exception)

    <dependencies>

        <dependency>
            <groupId>org.Apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>


        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.22</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.22</version>
        </dependency>

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>MySQL</groupId>
            <artifactId>mysql-connector-JAVA</artifactId>
            <version>5.1.38</version>
        </dependency>


    </dependencies>

一、从集合中读取数据

package xxx

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

// 样例类,传感器ID,时间戳,温度 (后面都使用这个样例类作为数据的类型)
case class SensorReading(id: String, timestamo: Long, temperature: Double){
  override def toString: String = {
    id+":"+ timestamo.toString + "," + temperature
  }
}

/**
*从集合中读取数据
*/
object Sensor {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    import  org.apache.flink.api.scala._
    val stream1: DataStream[SensorReading] = environment.fromCollection(List(
      SensorReading("sensor_1", 1547718199, 35.80018327300259),
      SensorReading("sensor_6", 1547718201, 15.402984393403084),
      SensorReading("sensor_7", 1547718202, 6.720945201171228),
      SensorReading("sensor_10", 1547718205, 38.101067604893444)
    ))

    stream1.print("Stream1:").setParallelism(1)
    environment.execute()
  }
}

二、从文件中读取数据

Flink流处理API——Source

 

package xxx

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

// 样例类,传感器ID,时间戳,温度
case class SensorReading(id: String, timestamo: Long, temperature: Double){
  override def toString: String = {
    id+":"+ timestamo.toString + "," + temperature
  }
}
/**
*从文件中读取数据
*/
object Sensor {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val stream2: DataStream[String] = environment.readTextFile(
       "D:\Scala\Code\FlinkTest\src\main\resources\sensor.txt")
    stream2.print("Stream2:").setParallelism(1)
    environment.execute()
  }
}

三、从Kafka中读取数据

Kafka的brokerList:slave1:9092,slave2:9092,slave3:9092

zookeeper集群:slave2:2181,slave3:2181,slave3:2181

package xxx

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08

/**
 * 从kafka中读取数据
 */
object ReadDataFromKafka {
  def main(args: Array[String]): Unit = {

    // 设置读取的kafka参数
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "slave1:9092,slave2:9092,slave3:9092")
    properties.setProperty("group.id", "flink_group1")
    properties.setProperty("zookeeper.connect", "slave2:2181,slave3:2181.slave4:2181")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // key的反序列化
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // value的反序列化
    properties.setProperty("auto.offset.reset", "latest") // 偏移量

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 链接kafka读取数据
    val kafkaStream: DataStream[String] = environment.addSource(new FlinkKafkaConsumer08[String]("sensor",
      new SimpleStringSchema(), properties))

    kafkaStream.print().setParallelism(1)
    environment.execute("readDataFromKafka")

  }

}

四、自定义Source

package xxx

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

import scala.util.Random

/**
 * 自定义Source
 */
object ReadDataFromMySource {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = environment.addSource(new MySource())

    dataStream.print().setParallelism(1)

    environment.execute("MySource")
    
  }

}


class MySource extends  SourceFunction[String]{
  // 表示数据源是否正常运行
  var running:Boolean = true

  // 数据正常生成
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    val random = new Random()

    var temp = 1.to(10).map(
      i => (i, 100 + random.nextGaussian() * 100)
    )
    
    while (running){
      // 更新数值
      temp = temp.map(
        t=>(t._1, t._2 + random.nextGaussian())
      )

      // 当前时间
      val curTime = System.currentTimeMillis()

      temp.foreach(t=>{
        sourceContext.collect(curTime+": "+ t._1 + "--> "+ t._2)
      })

      Thread.sleep(500)
    }

  }

  // 取消数据生成
  override def cancel(): Unit ={
    running = false
  }
}


Tags:Flink流处理   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
本文主要从以下几个方面介绍Flink的流处理API&mdash;&mdash;Source一、从集合中读取数据二、从文件中读取数据三、从Kafka中读取数据四、自定义Source数据处理的过程基本可...【详细内容】
2020-08-11  Tags: Flink流处理  点击:(74)  评论:(0)  加入收藏
▌简易百科推荐
张欣安科瑞电气股份有限公司 上海嘉定 201801 摘要:随着电力行业各系统接入,海量数据涌现,如何利用电网信息化中大量数据,对客户需求进行判断分析,服务于营销链条,提升企业市场竞...【详细内容】
2021-12-14  安科瑞张欣    Tags:大数据   点击:(9)  评论:(0)  加入收藏
1、什么是数据分析结合分析工具,运用数据分析思维,分析庞杂数据信息,为业务赋能。 2、数据分析师工作的核心流程:(1)界定问题:明确具体问题是什么;●what 发生了什么(是什么)●why 为...【详细内容】
2021-12-01  逆风北极光    Tags:大数据   点击:(25)  评论:(0)  加入收藏
在实际工作中,我们经常需要整理各个业务部门发来的数据。不仅分散,而且数据量大、格式多。单是从不同地方汇总整理这些原始数据就花了大量的时间,更不用说还要把有效的数据收集...【详细内容】
2021-11-30  百数    Tags:数据   点击:(21)  评论:(0)  加入收藏
数据作为新的生产要素,其蕴含的价值日益凸显,而安全问题却愈发突出。密码技术,是实现数据安全最经济、最有效、最可靠的手段,对数据进行加密,并结合有效的密钥保护手段,可在开放环...【详细内容】
2021-11-26  炼石网络    Tags:数据存储   点击:(17)  评论:(0)  加入收藏
导读:网易大数据平台的底层数据查询引擎,选用了Impala作为OLAP查询引擎,不但支撑了网易大数据的交互式查询与自助分析,还为外部客户提供了商业化的产品与服务。今天将为大家分享...【详细内容】
2021-11-26  DataFunTalk    Tags:大数据   点击:(15)  评论:(0)  加入收藏
导读:数据挖掘是一种发现知识的手段。数据挖掘要求数据分析师通过合理的方法,从数据中获取与挖掘项目相关的知识。作者:赵仁乾 田建中 叶本华 常国珍来源:华章科技数据挖掘是一...【详细内容】
2021-11-23  华章科技  今日头条  Tags:数据挖掘   点击:(20)  评论:(0)  加入收藏
今天再给大家分享一个不错的可视化大屏分析平台模板DataColour。 data-colour 可视化分析平台采用前后端分离模式,后端架构设计采用微服务架构模式。 前端技术:Angularjs、Jq...【详细内容】
2021-11-04  web前端进阶    Tags:DashboardClient   点击:(39)  评论:(0)  加入收藏
在Kubernetes已经成了事实上的容器编排标准之下,微服务的部署变得非常容易。但随着微服务规模的扩大,服务治理带来的挑战也会越来越大。在这样的背景下出现了服务可观测性(obs...【详细内容】
2021-11-02  大数据推荐杂谈    Tags:Prometheus   点击:(40)  评论:(0)  加入收藏
同一产品对老客户的要价竟然比新客户要高?这是当下“大数据杀熟”的直接结果。近年来,随着平台经济的蓬勃发展,大数据在为用户服务之外,也引发了多种不合理现象。为了有效遏制“...【详细内容】
2021-10-29    海外网   Tags:大数据   点击:(31)  评论:(0)  加入收藏
本人03年开始从事贸易行业,多年来一直致力于外贸获客和跨境电商选品等领域,最近有些小伙伴反馈海关数据演示的都挺好为啥用起来不是那么回事?大家看到数据时关注的有产品、采购...【详细内容】
2021-10-28  QD云龙    Tags:数据   点击:(33)  评论:(0)  加入收藏
相关文章
    无相关信息
最新更新
栏目热门
栏目头条