Flink案例
给定一个流数据,输入的数据结构是一个Transaction对象,里面包含
txid string,
amount int,
name string
三个字段,需要用flink对该transaction的流数据进行实时检测。 检测规则如下,如果对于某一个name,连续有三次的amount超过100,则进行报警,报警信息在控制台显示出来,如果是不是连续的三次,则无需报警。 这样的一个程序请用java代码来实现
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class TransactionDetection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = ... // 从数据源获取Transaction流数据
transactions
.keyBy(Transaction::getName)
.flatMap(new TransactionAlertFunction())
.print();
env.execute("Transaction Detection Job");
}
public static class TransactionAlertFunction implements FlatMapFunction<Transaction, String> {
private static final int THRESHOLD = 100;
private static final int CONSECUTIVE_COUNT = 3;
@Override
public void flatMap(Transaction transaction, Collector<String> out) {
if (transaction.getAmount() > THRESHOLD) {
// 检测到金额超过阈值
TransactionState state = TransactionState.getOrDefault(transaction.getName());
state.incrementCount();
if (state.getCount() >= CONSECUTIVE_COUNT) {
out.collect("ALERT: Detected 3 consecutive transactions exceeding 100 for " + transaction.getName());
state.resetCount();
}
} else {
// 金额未超过阈值,重置计数
TransactionState.resetCount(transaction.getName());
}
}
}
public static class TransactionState {
private String name;
private int count;
private static final Map<String, TransactionState> stateMap = new HashMap<>();
public static TransactionState getOrDefault(String name) {
return stateMap.computeIfAbsent(name, k -> new TransactionState(name, 0));
}
public static void resetCount(String name) {
TransactionState state = stateMap.get(name);
if (state != null) {
state.count = 0;
}
}
public void incrementCount() {
count++;
}
public int getCount() {
return count;
}
private TransactionState(String name, int count) {
this.name = name;
this.count = count;
}
}
}
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class FlinkRealTimeAlert {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactionStream = ... // 从数据源获取Transaction流数据
DataStream<Tuple2<String, Integer>> alertStream = transactionStream
.keyBy(Transaction::getName)
.process(new AlertProcessFunction());
alertStream.print();
env.execute("Flink Real-time Alert Job");
}
public static class AlertProcessFunction extends KeyedProcessFunction<String,
Transaction, Tuple2<String, Integer>> {
private static final int THRESHOLD = 100;
private static final int ALERT_COUNT = 3;
private ValueState<Integer> countState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> countDescriptor =
new ValueStateDescriptor<>("countState", Integer.class);
countState = getRuntimeContext().getState(countDescriptor);
}
@Override
public void processElement(Transaction transaction, Context context,
Collector<Tuple2<String, Integer>> out) throws Exception {
Integer count = countState.value();
if (count == null) {
count = 0;
}
if (transaction.getAmount() > THRESHOLD) {
count++;
countState.update(count);
} else {
// do nothing, keep count as it is
}
if (count >= ALERT_COUNT) {
out.collect(new Tuple2<>(transaction.getName(), transaction.getAmount()));
}
}
}
public static class Transaction {
private String txid;
private int amount;
private String name;
public Transaction(String txid, int amount, String name) {
this.txid = txid;
this.amount = amount;
this.name = name;
}
public String getTxid() {
return txid;
}
public int getAmount() {
return amount;
}
public String getName() {
return name;
}
}
}