Hunter0x7c7
2024-09-12 60a3f2bc64b7a5f502e4133ced31f0b25c88d3f1
src/main/java/com/github/hunter0x7c7/sync/ctrls/Controller.java
@@ -16,6 +16,8 @@
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import javafx.application.Platform;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.event.ActionEvent;
import javafx.event.EventHandler;
import javafx.fxml.FXML;
@@ -48,21 +50,37 @@
import static com.github.hunter0x7c7.sync.model.global.Parameters.*;
public class Controller {
    public static final String SQL1_QUERY = "SELECT * FROM IOT_Equipment_Info WHERE IE_Type = 'XPH物联网关'; ";
    public static final String SQL2_QUERY = "SELECT SD_Addr + '|' + SD_Code IE_Param, * " +
            "FROM TY_SensorData WHERE SD_Key IN ( '%s' );";
    public static final String SQL3_UPDATE = "UPDATE IOT_Equipment_Info " +
    //1. 在目标库,把采集类网关设备先找出来
    public static final String SQL_QUERY_TARGET = "SELECT * " +
            "FROM IOT_Equipment_Info " +
            "WHERE IE_Type = '物联网关' " +
            "OR  IE_Type = 'XPH网关' " +
            "OR IE_Type = 'XPH物联网关' " +
            "OR IE_Type = '气象站网关' " +
            "OR IE_Type = '采集物联网关' " +//温室采集机柜
            "OR IE_Type = '虫情监测网关' " +
            "OR IE_Type = '墒情网关' " +
            "OR IE_Type = '远传水表'; ";
    //2. 在源数据库,把相关网关设备的子行查找出来,拼接好Param参数00|01
    public static final String SQL_QUERY_SRC = "SELECT SD_Addr + '|' + SD_Code IE_Param, * " +
            "FROM TY_SensorData " +
            "WHERE SD_Key IN ( '%s' ); ";
    //3. 在目标库,把匹配的最新数据更新到相关的行中
    public static final String SQL_UPDATE_TARGET = "UPDATE IOT_Equipment_Info " +
            "SET IE_Realtime_Data = ?, IE_Realtime_Time = ?, Edit_User = ?, Edit_Time = ? " +
            "WHERE IE_Param = ? AND IE_Parent = ( " +
            "SELECT IE_ID FROM IOT_Equipment_Info WHERE IE_Param = ? " +
            ") ;";
            "WHERE IE_Param = ? " +
            "AND IE_Parent IN ( " +
            "SELECT IE_ID " +
            "FROM IOT_Equipment_Info " +
            "WHERE IE_Param = ? ) ;";
    @FXML
    private TextField tvInputSrcHost;
    @FXML
    private TextField tvInputSrcName;
    @FXML
    private PasswordField tvInputSrcPwd;
    private TextField tvInputSrcPwd;
    @FXML
    private TextField tvInputSrcDbName;
@@ -71,7 +89,7 @@
    @FXML
    private TextField tvInputTargetName;
    @FXML
    private PasswordField tvInputTargetPwd;
    private TextField tvInputTargetPwd;
    @FXML
    private TextField tvInputTargetDbName;
    @FXML
@@ -100,7 +118,12 @@
    private CompositeDisposable mCompositeDisposable;
    //同步频率单位列表
    private static final TimeUnit[] mFreqUnitList = {TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS};
    private static final TimeUnit[] mFreqUnitList = {
            TimeUnit.MILLISECONDS,
            TimeUnit.SECONDS,
            TimeUnit.MINUTES,
            TimeUnit.HOURS,
            TimeUnit.DAYS};
    //同步频率单位名称列表
    private static final String[] mFreqNameList = {"毫秒", "秒", "分钟", "小时", "天"};
    //同步次数
@@ -186,7 +209,13 @@
                        show.addActionListener(actionListener -> {
                            Platform.runLater(() -> {
                                if (primaryStage != null) {
                                    primaryStage.show();
                                    if (primaryStage.isIconified()) {//最小化
                                        primaryStage.setIconified(false);
                                    }
                                    if (!primaryStage.isShowing()) {
                                        primaryStage.show();
                                    }
                                    primaryStage.toFront();
                                }
                            });
                        });
@@ -200,6 +229,21 @@
                        };
                        //添加到系统托盘
                        TrayUtil.getInstance().addSystemTray(primaryStage, AppName_zh_rCN, mipmap, callback, show);
                        //最小化之后,关闭窗口
                        primaryStage.iconifiedProperty().addListener(new ChangeListener<Boolean>() {
                            @Override
                            public void changed(ObservableValue<? extends Boolean> observable, Boolean oldValue, Boolean newValue) {
                                //如果你执行了setIconified(true)之后再执行hide(),
                                // stage会被销毁,因此需要在窗口关闭前将Iconified设为false
                                if (!oldValue && newValue) {
                                    //最小化之后,关闭窗口
                                    if (primaryStage != null) {
                                        primaryStage.close();
                                    }
                                }
                            }
                        });
                    }
                }).concatMap(new Function<Object, Observable<ConfigBean>>() {//准备初始化的配置数据
                    @Override
@@ -217,6 +261,9 @@
                                }).map(new Function<Boolean, ConfigBean>() {
                                    @Override
                                    public ConfigBean apply(Boolean aBoolean) throws Exception {
                                        assert configBean != null;
                                        configBean.setSrcPwd(getDecryptData(configBean.getSrcPwd()));
                                        configBean.setTargetPwd(getDecryptData(configBean.getTargetPwd()));
                                        return configBean;
                                    }
                                }).switchIfEmpty(new Observable<ConfigBean>() {
@@ -270,11 +317,11 @@
                                        //更新界面数据
                                        setInputText(tvInputSrcHost, cb.getSrcHost());
                                        setInputText(tvInputSrcName, cb.getSrcName());
                                        setInputText(tvInputSrcPwd, getDecryptData(cb.getSrcPwd()));
                                        setInputText(tvInputSrcPwd, cb.getSrcPwd());
                                        setInputText(tvInputSrcDbName, cb.getSrcDbName());
                                        setInputText(tvInputTargetHost, cb.getTargetHost());
                                        setInputText(tvInputTargetName, cb.getTargetName());
                                        setInputText(tvInputTargetPwd, getDecryptData(cb.getTargetPwd()));
                                        setInputText(tvInputTargetPwd, cb.getTargetPwd());
                                        setInputText(tvInputTargetDbName, cb.getTargetDbName());
                                        setInputText(tvInputFreq, cb.getFreqValue());
                                        cbSelectFreqUnit.getSelectionModel().select(cb.getFreqUnit());
@@ -372,42 +419,54 @@
    //点击:退出
    private void clickExit() {
        addSubscribe(Observable.just(true)
        Disposable d = Observable.just(true)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        //1. 停止同步
                        if (isSyncing()) {
                            stopSync();//停止同步
                        }
                    }
                })
                .doOnNext(new Consumer<Object>() {
                .concatMap(new Function<Object, ObservableSource<Object>>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        //检查系统是否支持托盘
                        if (SystemTray.isSupported()) {
                            //系统托盘
                            SystemTray tray = SystemTray.getSystemTray();
                            if (tray != null) {
                                TrayIcon[] icons = tray.getTrayIcons();
                                for (TrayIcon icon : icons) {
                                    if (icon == null) continue;
                    public ObservableSource<Object> apply(Object o) throws Exception {
                        //2. 移除系统托盘
                        return Observable.create(new ObservableOnSubscribe<Object>() {
                            @Override
                            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                                //检查系统是否支持托盘
                                if (SystemTray.isSupported()) {
                                    //退出之前先移除系统托盘图标
                                    if (APPID.equals(icon.getActionCommand())) {
                                        tray.remove(icon);
                                    }
                                    final SystemTray tray = SystemTray.getSystemTray();//系统托盘
                                    final TrayIcon icon = getTrayIcon(tray, APPID);
                                    Platform.runLater(new Runnable() {
                                        @Override
                                        public void run() {
                                            try {
                                                //移除系统托盘
                                                tray.remove(icon);
                                            } catch (Exception e) {
                                                e.printStackTrace();
                                            }
                                            if (emitter != null) {
                                                emitter.onNext(o);
                                                emitter.onComplete();
                                            }
                                        }
                                    });
                                }
                            }
                        }
                        });
                    }
                })
                .subscribe(new Consumer<Boolean>() {
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                    public void accept(Object o) throws Exception {
                        //3. 停止同步和移除系统托盘后,退出程序
                        TrayUtil.getInstance().exitApp();//退出
                    }
                }, new Consumer<Throwable>() {
@@ -417,7 +476,7 @@
                        TrayUtil.getInstance().exitApp();//退出
                    }
                }));
                });
    }
@@ -495,11 +554,21 @@
    //点击:停止同步
    private void clickStopSync() {
        if (isSyncing()) {
            stopSync();//停止同步
        } else {
            updateResult("未启动同步功能!");
        stopSync();//停止同步
    }
    public TrayIcon getTrayIcon(SystemTray tray, String key) {
        if (tray != null && key != null) {
            TrayIcon[] icons = tray.getTrayIcons();
            for (TrayIcon icon : icons) {
                if (icon == null) continue;
                //找到指定的托盘图标
                if (key.equals(icon.getActionCommand())) {
                    return icon;
                }
            }
        }
        return null;
    }
    //解密
@@ -745,7 +814,10 @@
    //启动同步功能
    private void startSync() {
        mSyncCount = 0;
        setInputText(txSyncStatus, "启动中");
        setInputText(txSyncCount, String.valueOf(mSyncCount));
        setInputText(txStartSyncTime, "-");
        setInputText(txLastSyncTime, "-");
        final String srcHost = getInputText(tvInputSrcHost);
        final String srcName = getInputText(tvInputSrcName);
@@ -800,14 +872,18 @@
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        //UI线程
                        setInputText(txStartSyncTime, DateUtils.formatDate(new Date()));
                        updateResult("连接中...");
                        btnStartSync.setDisable(true);
                        btnStopSync.setDisable(false);
                    }
                })
                .observeOn(Schedulers.newThread())
                .concatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String targetFreq) throws Exception {
                        mSyncCount = 0;
                        setSyncing(true);
                        return getStartSyncObservable(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName)
                                .map(new Function<String, String>() {
                                    @Override
@@ -821,20 +897,18 @@
                    @Override
                    public void accept(Object o) throws Exception {
                        //启动成功
                        setSyncing(true);
                        mSyncCount++;
                        if (mSyncCount >= Integer.MAX_VALUE) {
                            mSyncCount = 0;
                        if (isSyncing()) {
                            mSyncCount++;
                            if (mSyncCount >= Integer.MAX_VALUE) {
                                mSyncCount = 0;
                            }
                            setInputText(txLastSyncTime, DateUtils.formatDate(new Date()));
                            setInputText(txSyncCount, String.valueOf(mSyncCount));
                            setInputText(txSyncStatus, "同步中");
                            updateResult("启动成功!");
                            btnStartSync.setDisable(true);
                            btnStopSync.setDisable(false);
                        }
                        setInputText(txLastSyncTime, DateUtils.formatDate(new Date()));
                        setInputText(txSyncCount, String.valueOf(mSyncCount));
                        btnStartSync.setDisable(true);
                        btnStopSync.setDisable(false);
                        setInputText(txSyncStatus, "同步中");
                        setInputText(txStartSyncTime, DateUtils.formatDate(new Date()));
                        updateResult("启动成功!");
                    }
                })
                .map(new Function<String, Integer>() {
@@ -866,14 +940,17 @@
                                                    @Override
                                                    public void accept(Object o) throws Exception {
                                                        //同步成功!
                                                        setSyncing(true);
                                                        mSyncCount++;
                                                        if (mSyncCount >= Integer.MAX_VALUE) {
                                                            mSyncCount = 0;
                                                        if (isSyncing()) {
                                                            mSyncCount++;
                                                            if (mSyncCount >= Integer.MAX_VALUE) {
                                                                mSyncCount = 0;
                                                            }
                                                            setInputText(txLastSyncTime, DateUtils.formatDate(new Date()));
                                                            setInputText(txSyncCount, String.valueOf(mSyncCount));
                                                            updateResult("同步成功!");
                                                            btnStartSync.setDisable(true);
                                                            btnStopSync.setDisable(false);
                                                        }
                                                        setInputText(txLastSyncTime, DateUtils.formatDate(new Date()));
                                                        setInputText(txSyncCount, String.valueOf(mSyncCount));
                                                        updateResult("同步成功!");
                                                    }
                                                }, new Consumer<Throwable>() {
                                                    @Override
@@ -910,16 +987,16 @@
    //停止同步
    private void stopSync() {
        setSyncing(false);
        mSyncCount = 0;
        if (mCompositeDisposable != null) {
            mCompositeDisposable.clear();
            mCompositeDisposable.clear();//注意:会把所有操作都取消
        }
        setInputText(txSyncStatus, "已停止");
        updateResult("同步停止!");
        btnStartSync.setDisable(false);
        btnStopSync.setDisable(true);
        setSyncing(false);
        setInputText(txSyncStatus, "已停止");
        updateResult("停止成功!");
    }
    //保存 在启动时”启动“同步功能
@@ -975,7 +1052,8 @@
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        String targetUrl = "jdbc:sqlserver://" + targetHost + ";databaseName=" + targetDbName + ";integratedSecurity=false;";
                        String urlString = "jdbc:sqlserver://%s;databaseName=%s;integratedSecurity=false;";
                        String targetUrl = String.format(urlString, targetHost, targetDbName);
                        List<String> keyList = new ArrayList<>();
                        //1.从目标库查出来有多少传感器需要查找
@@ -983,7 +1061,7 @@
                             Statement stmt = con.createStatement()
                        ) {
                            ResultSet rs = stmt.executeQuery(SQL1_QUERY);
                            ResultSet rs = stmt.executeQuery(SQL_QUERY_TARGET);
                            while (rs.next()) {
                                keyList.add(rs.getString("IE_Param"));
                            }
@@ -1000,9 +1078,9 @@
                        for (String key : keyList) {
                            StringUtil.append(sb, key, "','");
                        }
                        String sql = String.format(SQL2_QUERY, sb);
                        String sql = String.format(SQL_QUERY_SRC, sb);
                        String srcUrl = "jdbc:sqlserver://" + srcHost + ";databaseName=" + srcDbName + ";integratedSecurity=false;";
                        String srcUrl = String.format(urlString, srcHost, srcDbName);
                        //System.out.println("sql2:" + sql);
                        Map<String, List<TargetBean>> map = new HashMap<>();
@@ -1052,7 +1130,7 @@
                                    for (TargetBean tb : list) {
                                        if (tb == null) continue;
                                        PreparedStatement ps = con3.prepareStatement(SQL3_UPDATE);
                                        PreparedStatement ps = con3.prepareStatement(SQL_UPDATE_TARGET);
                                        ps.setString(1, tb.getData());
                                        ps.setTimestamp(2, tb.getTime());
                                        ps.setString(3, version);