spark structured streaming案例
假如有三个数据源,分别是提供温度,湿度,紫外线强度三个数据的接口,这三个接口都是通过gPRC来提供Json数据格式。
其返回的数据给是分别是这样的
{
"datetime":"2024-01-01 12:00:00",
"temparature":"30",
"location":"F1D1",
}
{
"datetime":"2024-01-01 12:00:00",
"humidity":"30",
"location":"F1D1",
}
{
"datetime":"2024-01-01 12:00:00",
"ultraviolet":"30",
"location":"F1D1",
}
这些数据接口每隔1秒钟就会更新一次数据 现在需要设计一个流处理引擎,实时处理这些数据,需求如下,以十秒钟的数据为一个时间窗口,如果这个时间窗口的数据中,某个地点的温度,以及湿度,以及紫外线强度的值 都超过某一个设定的阈值,并且连续超过一分钟,则记录一个状态值为true,则进行报警,把报警信息打印出来,如果这三个指标只要有一个没有达到阈值,则重置这个状态值为false。 使用spark structured streaming框架,分别用python和java实现之
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("RealTimeDataProcessing").getOrCreate()
# Define the schema for temperature data
temperature_schema = StructType([
StructField("datetime", StringType(), True),
StructField("temperature", IntegerType(), True),
StructField("location", StringType(), True)
])
# Define the schema for humidity data
humidity_schema = StructType([
StructField("datetime", StringType(), True),
StructField("humidity", IntegerType(), True),
StructField("location", StringType(), True)
])
# Define the schema for ultraviolet data
ultraviolet_schema = StructType([
StructField("datetime", StringType(), True),
StructField("ultraviolet", IntegerType(), True),
StructField("location", StringType(), True)
])
# Read data from gRPC sources
temperature_df = spark.readStream.format("grpc").option("host", "temperature_server").load()
humidity_df = spark.readStream.format("grpc").option("host", "humidity_server").load()
ultraviolet_df = spark.readStream.format("grpc").option("host", "ultraviolet_server").load()
# Define threshold values
temperature_threshold = 30
humidity_threshold = 80
ultraviolet_threshold = 8
# Join the dataframes on 'location' and 'datetime' columns
joined_df = temperature_df.join(humidity_df, ["location", "datetime"]).join(ultraviolet_df, ["location", "datetime"])
# Define the condition for triggering an alert
alert_condition = (col("temperature") >= temperature_threshold) & (col("humidity") >= humidity_threshold) & (col("ultraviolet") >= ultraviolet_threshold)
# Define the window duration and slide duration
windowed_df = joined_df.withWatermark("datetime", "10 seconds").groupBy(window("datetime", "10 seconds")).agg(collect_list("location").alias("locations"), max(when(alert_condition, 1).otherwise(0)).alias("alert"))
# Filter the windowed dataframe to find consecutive alerts for a minute
consecutive_alerts_df = windowed_df.filter((col("alert") == 1)).groupBy("window").agg(count("alert").alias("consecutive_alerts")).filter(col("consecutive_alerts") >= 6)
# Start the streaming query to monitor consecutive alerts
query = consecutive_alerts_df.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import static org.apache.spark.sql.functions.*;
public class RealTimeDataProcessing {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession.builder()
.appName("RealTimeDataProcessing")
.getOrCreate();
// Read data from gRPC sources
Dataset<Row> temperatureDF = spark
.readStream()
.format("grpc")
.option("host", "temperature_server")
.load();
Dataset<Row> humidityDF = spark
.readStream()
.format("grpc")
.option("host", "humidity_server")
.load();
Dataset<Row> ultravioletDF = spark
.readStream()
.format("grpc")
.option("host", "ultraviolet_server")
.load();
// Define threshold values
int temperatureThreshold = 30;
int humidityThreshold = 80;
int ultravioletThreshold = 8;
// Join the dataframes on 'location' and 'datetime' columns
Dataset<Row> joinedDF = temperatureDF.join(humidityDF, "location")
.join(ultravioletDF, "location");
// Define the condition for triggering an alert
Column alertCondition = col("temperature").geq(temperatureThreshold)
.and(col("humidity").geq(humidityThreshold))
.and(col("ultraviolet").geq(ultravioletThreshold));
// Define the window duration and slide duration
Dataset<Row> windowedDF = joinedDF.withWatermark("datetime", "10 seconds")
.groupBy(window("datetime", "10 seconds"))
.agg(collect_list("location").as("locations"),
max(when(alertCondition, 1)).otherwise(0).as("alert"));
// Filter the windowed dataframe to find consecutive alerts for a minute
Dataset<Row> consecutiveAlertsDF = windowedDF.filter(col("alert").equalTo(1))
.groupBy("window")
.agg(count("alert").as("consecutive_alerts"))
.filter(col("consecutive_alerts").geq(6));
// Start the streaming query to monitor consecutive alerts
StreamingQuery query = consecutiveAlertsDF.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
}
}