flink怎么使用Event_time处理实时数据-成都创新互联网站建设

关于创新互联

多方位宣传企业产品与服务 突出企业形象

公司简介 公司的服务 荣誉资质 新闻动态 联系我们

flink怎么使用Event_time处理实时数据

本篇内容主要讲解“flink怎么使用Event_time处理实时数据”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink怎么使用Event_time处理实时数据”吧!

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名与空间、网络空间、营销软件、网站建设、贾汪网站维护、网站推广。

//flink中关于时间的三个概念
//event time:数据产生的时间
//processing time:处理数据的时间 即操作数据的之间 比如一个flink在scala中的map函数处理数据时
//ingest time:摄取数据时间,在一个streaming程序中 一个时间段收集数据的时间
//而evet time在处理实时数据时是比较有用的,例如在由于网络的繁忙的原因,某些数据未能按时到达,假设迟到了30min,
//而我们定义的最大延迟不能超过十分钟,那么一些数据包含了超时的数据那么这些数据是不会在这次操作中处理的而是会
//丢弃掉
//kafka生产者代码
package kafka.partition.test;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class PartitionProducer {
	public static void main(String[] args) {
		
		Map props = new HashMap<>();
		props.put("acks", "1");
		props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("bootstrap.servers", "bigdata01:9092");
		String topic = "event_time";
		
		KafkaProducer producer = new KafkaProducer<>(props);
		for(int i = 0 ; i <= 20;i++) {	
		//flink的watermarkassginer里面定义的超时时间是5000毫秒
			long mills = System.currentTimeMillis();
			if(i%3==0) {
			//数据的event time放在字符串的开头 以空格分割
			//kafka event_time topic的0分区超时4000毫秒
				String line = (mills-4000)+" "+"partition-0--this is a big +" +i;
				ProducerRecord< String,String> record = new ProducerRecord(topic, new Integer(0), null, i+"", line);
				producer.send(record);
			}else if(i%3==1) {
			//kafka event_time topic的1分区超时5000毫秒
				String line = (mills-5000)+" "+"partition-1--this is a big +" +i;
				ProducerRecord< String,String> record = new ProducerRecord(topic, new Integer(1), null, i+"", line);
				producer.send(record);
			}else if(i%3==2) {
			//kafka event_time topic的2分区超时8000毫秒
				String line = (mills-8000)+" "+"partition-2--this is a big +" +i;
				ProducerRecord< String,String> record = new ProducerRecord(topic, new Integer(2), null, i+"", line);
				producer.send(record);
			}
		}
		
		producer.close();
	}
}
//自定义的TimestampsAndWatermarks
package flink.streaming
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
class CustomWaterMarks extends AssignerWithPeriodicWatermarks[String]{
    //超时时间
  val maxOutOrderness = 5000l
  //flink过一段时间便会调一次该函数获取水印
  def getCurrentWatermark():Watermark ={
    val  sysMilssecons =  System.currentTimeMillis()
     new Watermark(sysMilssecons-maxOutOrderness) 
    
  }
  //每条记录多会调用 来获得even time 在生产的数据中 even_time放在字符串的第一个字段 用空格分割
  def extractTimestamp(element: String,previousElementTimestamp: Long): Long = {
   ((element.split(" ")).head).toLong
  }
}
package flink.streaming
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
object StreamWithEventTimeAndWaterMarks {
  
  def main(args: Array[String]): Unit = {
    val kafkaProps = new Properties()
    //kafka的一些属性
    kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092")
    //所在的消费组
    kafkaProps.setProperty("group.id", "group2")
    //获取当前的执行环境
    val evn = StreamExecutionEnvironment.getExecutionEnvironment
    //配制处理数据的时候使用event time
    evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //kafka的consumer,test1是要消费的topic
    val kafkaSource = new FlinkKafkaConsumer[String]("event_time",new SimpleStringSchema,kafkaProps)
    //添加自定义的 TimestampsAndWatermarks
    kafkaSource.assignTimestampsAndWatermarks(new CustomWaterMarks)
    //设置从最新的offset开始消费
    //kafkaSource.setStartFromGroupOffsets()
    kafkaSource.setStartFromLatest()
    //自动提交offset
    kafkaSource.setCommitOffsetsOnCheckpoints(true)
    
    //flink的checkpoint的时间间隔
    //evn.enableCheckpointing(2000)
    //添加consumer
    val stream = evn.addSource(kafkaSource)
    evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
    //stream.setParallelism(3)
    val text = stream.flatMap{ _.toLowerCase().split(" ").drop(1).filter { _.nonEmpty} }
          .map{(_,1)}
          .keyBy(0)
          .timeWindow(Time.seconds(5))
          .sum(1)
          .map(x=>{(x._1,(new Integer(x._2)))})
     text.print()
     //启动执行    
     
     //text.addSink(new Ssinks())
     
    evn.execute("kafkawd")  
    
  }
  
}
打印结果 partition-2中的数据因为超时没有出现
1> (big,14)
4> (is,14)
1> (+0,1)
2> (+1,1)
3> (partition-1--this,7)
4> (+15,1)
3> (+12,1)
1> (partition-0--this,7)
3> (+6,1)
1> (+16,1)
4> (+10,1)
2> (+18,1)
4> (+7,1)
3> (+3,1)
2> (+9,1)
3> (+19,1)
2> (+13,1)
3> (a,14)
2> (+4,1)

到此,相信大家对“flink怎么使用Event_time处理实时数据”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


网页名称:flink怎么使用Event_time处理实时数据
网页地址:http://kswsj.cn/article/pjccoc.html

其他资讯