Hunter0x7c7
2024-09-12 60a3f2bc64b7a5f502e4133ced31f0b25c88d3f1
src/main/java/com/github/hunter0x7c7/sync/ctrls/Controller.java
@@ -7,13 +7,17 @@
import com.github.hunter0x7c7.sync.model.storage.Session;
import com.github.hunter0x7c7.sync.utils.*;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.*;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import javafx.application.Platform;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.event.ActionEvent;
import javafx.event.EventHandler;
import javafx.fxml.FXML;
@@ -46,21 +50,37 @@
import static com.github.hunter0x7c7.sync.model.global.Parameters.*;
public class Controller {
    public static final String SQL1_QUERY = "SELECT * FROM IOT_Equipment_Info WHERE IE_Type = 'XPH物联网关'; ";
    public static final String SQL2_QUERY = "SELECT SD_Addr + '|' + SD_Code IE_Param, * " +
            "FROM TY_SensorData WHERE SD_Key IN ( '%s' );";
    public static final String SQL3_UPDATE = "UPDATE IOT_Equipment_Info " +
    //1. 在目标库,把采集类网关设备先找出来
    public static final String SQL_QUERY_TARGET = "SELECT * " +
            "FROM IOT_Equipment_Info " +
            "WHERE IE_Type = '物联网关' " +
            "OR  IE_Type = 'XPH网关' " +
            "OR IE_Type = 'XPH物联网关' " +
            "OR IE_Type = '气象站网关' " +
            "OR IE_Type = '采集物联网关' " +//温室采集机柜
            "OR IE_Type = '虫情监测网关' " +
            "OR IE_Type = '墒情网关' " +
            "OR IE_Type = '远传水表'; ";
    //2. 在源数据库,把相关网关设备的子行查找出来,拼接好Param参数00|01
    public static final String SQL_QUERY_SRC = "SELECT SD_Addr + '|' + SD_Code IE_Param, * " +
            "FROM TY_SensorData " +
            "WHERE SD_Key IN ( '%s' ); ";
    //3. 在目标库,把匹配的最新数据更新到相关的行中
    public static final String SQL_UPDATE_TARGET = "UPDATE IOT_Equipment_Info " +
            "SET IE_Realtime_Data = ?, IE_Realtime_Time = ?, Edit_User = ?, Edit_Time = ? " +
            "WHERE IE_Param = ? AND IE_Parent = ( " +
            "SELECT IE_ID FROM IOT_Equipment_Info WHERE IE_Param = ? " +
            ") ;";
            "WHERE IE_Param = ? " +
            "AND IE_Parent IN ( " +
            "SELECT IE_ID " +
            "FROM IOT_Equipment_Info " +
            "WHERE IE_Param = ? ) ;";
    @FXML
    private TextField tvInputSrcHost;
    @FXML
    private TextField tvInputSrcName;
    @FXML
    private PasswordField tvInputSrcPwd;
    private TextField tvInputSrcPwd;
    @FXML
    private TextField tvInputSrcDbName;
@@ -69,7 +89,7 @@
    @FXML
    private TextField tvInputTargetName;
    @FXML
    private PasswordField tvInputTargetPwd;
    private TextField tvInputTargetPwd;
    @FXML
    private TextField tvInputTargetDbName;
    @FXML
@@ -98,7 +118,12 @@
    private CompositeDisposable mCompositeDisposable;
    //同步频率单位列表
    private static final TimeUnit[] mFreqUnitList = {TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS};
    private static final TimeUnit[] mFreqUnitList = {
            TimeUnit.MILLISECONDS,
            TimeUnit.SECONDS,
            TimeUnit.MINUTES,
            TimeUnit.HOURS,
            TimeUnit.DAYS};
    //同步频率单位名称列表
    private static final String[] mFreqNameList = {"毫秒", "秒", "分钟", "小时", "天"};
    //同步次数
@@ -114,10 +139,10 @@
    }
    @Override
    /*@Override
    protected void finalize() throws Throwable {
        super.finalize();
    }
    }*/
    private void init() {
        mCompositeDisposable = new CompositeDisposable();
@@ -177,14 +202,20 @@
                    public void accept(Object o) throws Exception {
                        //添加到系统托盘
                        Stage primaryStage = Session.getInstance().getPrimaryStage();
                        String tooltip = String.format("%s v%s", AppName, VersionName);//SyncTools v1.0
                        //String tooltip = String.format("%s v%s", AppName, VersionName);//SyncTools v1.0
                        String mipmap = "mipmap/ic_chinese_cabbage_16.png";
                        MenuItem show = new MenuItem("显示");//显示Show
                        //绑定系统托盘事件
                        show.addActionListener(actionListener -> {
                            Platform.runLater(() -> {
                                if (primaryStage != null) {
                                    primaryStage.show();
                                    if (primaryStage.isIconified()) {//最小化
                                        primaryStage.setIconified(false);
                                    }
                                    if (!primaryStage.isShowing()) {
                                        primaryStage.show();
                                    }
                                    primaryStage.toFront();
                                }
                            });
                        });
@@ -197,77 +228,127 @@
                            }
                        };
                        //添加到系统托盘
                        TrayUtil.getInstance().addSystemTray(primaryStage, AppNameString, mipmap, callback, show);
                    }
                }).doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        //初始化已保存的配置信息
                        ConfigBean cb = getSaveConfig();
                        if (cb != null) {
                            //UI线程
                            Platform.runLater(new Runnable() {
                                @Override
                                public void run() {
                                    setInputText(tvInputSrcHost, cb.getSrcHost());
                                    setInputText(tvInputSrcName, cb.getSrcName());
                                    setInputText(tvInputSrcPwd, getDecryptData(cb.getSrcPwd()));
                                    setInputText(tvInputSrcDbName, cb.getSrcDbName());
                                    setInputText(tvInputTargetHost, cb.getTargetHost());
                                    setInputText(tvInputTargetName, cb.getTargetName());
                                    setInputText(tvInputTargetPwd, getDecryptData(cb.getTargetPwd()));
                                    setInputText(tvInputTargetDbName, cb.getTargetDbName());
                                    setInputText(tvInputFreq, cb.getFreqValue());
                                    cbSelectFreqUnit.getSelectionModel().select(cb.getFreqUnit());
                        TrayUtil.getInstance().addSystemTray(primaryStage, AppName_zh_rCN, mipmap, callback, show);
                        //最小化之后,关闭窗口
                        primaryStage.iconifiedProperty().addListener(new ChangeListener<Boolean>() {
                            @Override
                            public void changed(ObservableValue<? extends Boolean> observable, Boolean oldValue, Boolean newValue) {
                                //如果你执行了setIconified(true)之后再执行hide(),
                                // stage会被销毁,因此需要在窗口关闭前将Iconified设为false
                                if (!oldValue && newValue) {
                                    //最小化之后,关闭窗口
                                    if (primaryStage != null) {
                                        primaryStage.close();
                                    }
                                }
                            });
                            //默认启动同步功能
                            if (cb.isStartSync()) {
                                startSync();    //启动同步功能
                            }
                        } else if (BuildType.equals(BuildTypeEnum.DEBUG)) {
                            try {
                                String srcHost = ConfigProperties.SRC_HOST;
                                String srcName = ConfigProperties.SRC_NAME;
                                String srcPwd = ConfigProperties.SRC_PWD;
                                String srcDbName = ConfigProperties.SRC_DB_NAME;
                        });
                    }
                }).concatMap(new Function<Object, Observable<ConfigBean>>() {//准备初始化的配置数据
                    @Override
                    public Observable<ConfigBean> apply(Object o) throws Exception {
                        //初始化已保存的配置信息
                        final ConfigBean configBean = getSaveConfig();
                        return Observable.just(configBean != null)
                                .subscribeOn(Schedulers.io())
                                .filter(new Predicate<Boolean>() {
                                    @Override
                                    public boolean test(Boolean aBoolean) throws Exception {
                                        //return false过滤, true不过滤。false立刻执行switchIfEmpty
                                        return aBoolean != null && aBoolean;
                                    }
                                }).map(new Function<Boolean, ConfigBean>() {
                                    @Override
                                    public ConfigBean apply(Boolean aBoolean) throws Exception {
                                        assert configBean != null;
                                        configBean.setSrcPwd(getDecryptData(configBean.getSrcPwd()));
                                        configBean.setTargetPwd(getDecryptData(configBean.getTargetPwd()));
                                        return configBean;
                                    }
                                }).switchIfEmpty(new Observable<ConfigBean>() {
                                    @Override
                                    protected void subscribeActual(Observer<? super ConfigBean> observer) {
                                        //如果是Debug环境才初始化测试数据
                                        if (BuildType.equals(BuildTypeEnum.DEBUG)) {
                                            String srcHost = ConfigProperties.SRC_HOST;
                                            String srcName = ConfigProperties.SRC_NAME;
                                            String srcPwd = ConfigProperties.SRC_PWD;
                                            String srcDbName = ConfigProperties.SRC_DB_NAME;
                                String targetHost = ConfigProperties.TARGET_HOST;
                                String targetName = ConfigProperties.TARGET_NAME;
                                String targetPwd = ConfigProperties.TARGET_PWD;
                                String targetDbName = ConfigProperties.TARGET_DB_NAME;
                                            String targetHost = ConfigProperties.TARGET_HOST;
                                            String targetName = ConfigProperties.TARGET_NAME;
                                            String targetPwd = ConfigProperties.TARGET_PWD;
                                            String targetDbName = ConfigProperties.TARGET_DB_NAME;
                                            String freqValue = "5";
                                            int freqUnitIndex = 1;//秒
                                            ConfigBean cb = new ConfigBean(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName, freqValue, freqUnitIndex, false);
                                            if (observer != null) {
                                                observer.onNext(cb);
                                                observer.onComplete();
                                            }
                                        } else {
                                            if (observer != null) {
                                                observer.onNext(new ConfigBean());
                                                observer.onComplete();
                                            }
                                        }
                                    }
                                });
                    }
                }).concatMap(new Function<ConfigBean, ObservableSource<ConfigBean>>() {//更新配置数据到界面
                    @Override
                    public ObservableSource<ConfigBean> apply(final ConfigBean cb) throws Exception {
                        //数据为空,不更新界面
                        if (cb == null) {
                            return Observable.just(new ConfigBean());
                        }
                        //更新界面
                        return Observable.create(new ObservableOnSubscribe<ConfigBean>() {
                            @Override
                            public void subscribe(ObservableEmitter<ConfigBean> emitter) throws Exception {
                                //UI线程
                                Platform.runLater(new Runnable() {
                                    @Override
                                    public void run() {
                                        setInputText(tvInputSrcHost, srcHost);
                                        setInputText(tvInputSrcName, srcName);
                                        setInputText(tvInputSrcPwd, srcPwd);
                                        setInputText(tvInputSrcDbName, srcDbName);
                                        setInputText(tvInputTargetHost, targetHost);
                                        setInputText(tvInputTargetName, targetName);
                                        setInputText(tvInputTargetPwd, targetPwd);
                                        setInputText(tvInputTargetDbName, targetDbName);
                                        setInputText(tvInputFreq, "10");
                                        cbSelectFreqUnit.getSelectionModel().select(1);
                                        //更新界面数据
                                        setInputText(tvInputSrcHost, cb.getSrcHost());
                                        setInputText(tvInputSrcName, cb.getSrcName());
                                        setInputText(tvInputSrcPwd, cb.getSrcPwd());
                                        setInputText(tvInputSrcDbName, cb.getSrcDbName());
                                        setInputText(tvInputTargetHost, cb.getTargetHost());
                                        setInputText(tvInputTargetName, cb.getTargetName());
                                        setInputText(tvInputTargetPwd, cb.getTargetPwd());
                                        setInputText(tvInputTargetDbName, cb.getTargetDbName());
                                        setInputText(tvInputFreq, cb.getFreqValue());
                                        cbSelectFreqUnit.getSelectionModel().select(cb.getFreqUnit());
                                        if (emitter != null) {
                                            emitter.onNext(cb);
                                            emitter.onComplete();
                                        }
                                    }
                                });
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        }
                        });
                    }
                }).subscribe(new Consumer<Object>() {
                }).subscribe(new Consumer<ConfigBean>() {
                    @Override
                    public void accept(Object o) throws Exception {
                    public void accept(ConfigBean cb) throws Exception {
                        //UI线程
                        Platform.runLater(new Runnable() {
                            @Override
                            public void run() {
                                updateResult("数据同步工具");
                                //默认启动同步功能
                                if (cb != null && cb.isStartSync()) {
                                    startSync();    //启动同步功能
                                }
                            }
                        });
                    }
@@ -338,24 +419,66 @@
    //点击:退出
    private void clickExit() {
        if (isSyncing()) {
            stopSync();//停止同步
        }
        //系统托盘
        SystemTray tray = SystemTray.getSystemTray();
        if (tray != null) {
            TrayIcon[] icons = tray.getTrayIcons();
            for (TrayIcon icon : icons) {
                if (icon == null) continue;
                //退出之前先移除系统托盘图标
                if (APPID.equals(icon.getActionCommand())) {
                    tray.remove(icon);
                }
            }
        }
        //退出
        Platform.exit();
        Disposable d = Observable.just(true)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        //1. 停止同步
                        if (isSyncing()) {
                            stopSync();//停止同步
                        }
                    }
                })
                .concatMap(new Function<Object, ObservableSource<Object>>() {
                    @Override
                    public ObservableSource<Object> apply(Object o) throws Exception {
                        //2. 移除系统托盘
                        return Observable.create(new ObservableOnSubscribe<Object>() {
                            @Override
                            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                                //检查系统是否支持托盘
                                if (SystemTray.isSupported()) {
                                    //退出之前先移除系统托盘图标
                                    final SystemTray tray = SystemTray.getSystemTray();//系统托盘
                                    final TrayIcon icon = getTrayIcon(tray, APPID);
                                    Platform.runLater(new Runnable() {
                                        @Override
                                        public void run() {
                                            try {
                                                //移除系统托盘
                                                tray.remove(icon);
                                            } catch (Exception e) {
                                                e.printStackTrace();
                                            }
                                            if (emitter != null) {
                                                emitter.onNext(o);
                                                emitter.onComplete();
                                            }
                                        }
                                    });
                                }
                            }
                        });
                    }
                })
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        //3. 停止同步和移除系统托盘后,退出程序
                        TrayUtil.getInstance().exitApp();//退出
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable e) throws Exception {
                        e.printStackTrace();
                        TrayUtil.getInstance().exitApp();//退出
                    }
                });
    }
    //点击:关于
    private void clickAbout() {
@@ -394,7 +517,7 @@
                                , targetFreq, freqSelect, startSync);
                        //写入文件
                        String path = PathUtil.projectPath + "\\config.json";
                        String path = PathUtil.getConfigFilePath("config.json");
                        String content = JsonUtil.toJson(cb);
                        //使用 BufferedWriter 写文件
@@ -431,11 +554,21 @@
    //点击:停止同步
    private void clickStopSync() {
        if (isSyncing()) {
            stopSync();//停止同步
        } else {
            updateResult("未启动同步功能!");
        stopSync();//停止同步
    }
    public TrayIcon getTrayIcon(SystemTray tray, String key) {
        if (tray != null && key != null) {
            TrayIcon[] icons = tray.getTrayIcons();
            for (TrayIcon icon : icons) {
                if (icon == null) continue;
                //找到指定的托盘图标
                if (key.equals(icon.getActionCommand())) {
                    return icon;
                }
            }
        }
        return null;
    }
    //解密
@@ -461,7 +594,7 @@
    //从本地配置文件中获取配置信息
    private ConfigBean getSaveConfig() {
        try {
            String json = FileUtil.bufferedReader(PathUtil.projectPath + "\\config.json");
            String json = FileUtil.bufferedReader(PathUtil.getConfigFilePath("config.json"));
            return JsonUtil.fromJson(json, ConfigBean.class);
        } catch (Exception e) {
            e.printStackTrace();
@@ -483,7 +616,7 @@
        stage.setTitle("参数设置");
        stage.setResizable(false);
        stage.setWidth(360);
        stage.setMinHeight(180);
        stage.setMinHeight(218);
        stage.getIcons().add(new Image("mipmap/ic_chinese_cabbage_48.png"));
        //APPLICATION_MODAL 全局模态,该窗口运行时,其他窗口不能进行操作
        //WINDOW_MODAL 为父窗口禁用,其他窗口可以使用
@@ -503,6 +636,16 @@
        generalList.setPadding(new Insets(0, 20, 30, 20));
        generalList.setSpacing(4.0);
        generalList.getChildren().add(cbStartSync);
        Label foot = new Label(String.format("* 更改将于重新启动 %s 后生效", AppName));//"* 更改将于重新启动 SyncTools 后生效");
        foot.setAlignment(Pos.CENTER);
        foot.setFont(new Font(12));
        VBox content = new VBox();
        content.setStyle("-fx-background-color: white");
        content.setPadding(new Insets(20));
        content.setSpacing(10);
        content.getChildren().addAll(title, generalList, foot);
        Button defButton = new Button("确定");
@@ -539,9 +682,11 @@
                saveConfigForIsStartSync(startSync, callback);
            }
        });
        Button cancelButton = new Button("取消");
        cancelButton.setAlignment(Pos.CENTER);
        cancelButton.setPrefSize(84, 24);
        cancelButton.setCancelButton(true);
        cancelButton.setOnAction(new EventHandler<ActionEvent>() {
            @Override
@@ -552,26 +697,16 @@
            }
        });
        Label label = new Label(String.format("* 更改将于重新启动 %s 后生效", AppName));//"* 更改将于重新启动 SyncTools 后生效");
        label.setAlignment(Pos.CENTER);
        label.setFont(new Font(12));
        VBox content = new VBox();
        content.setStyle("-fx-background-color: white");
        content.setPadding(new Insets(20));
        content.setSpacing(10);
        content.getChildren().addAll(title, generalList, label);
        HBox operate = new HBox();
        operate.setAlignment(Pos.CENTER_RIGHT);
        operate.setSpacing(8);
//        operate.setStyle("-fx-background-color: #F0F0F0");
        operate.setPadding(new Insets(12, 20, 6, 20));
        operate.setPadding(new Insets(12, 20, 12, 20));
        operate.getChildren().addAll(defButton, cancelButton);
        VBox vBox = new VBox();
        vBox.setSpacing(4.0);
        vBox.setSpacing(0);
        //vBox.setPadding(getInsets());
        vBox.getChildren().addAll(content, operate);
        stage.setScene(new Scene(vBox));
@@ -621,7 +756,7 @@
        VBox vBox = new VBox();
        vBox.setPadding(new Insets(20, 20, 4, 20));
        vBox.setPadding(new Insets(20, 20, 10, 20));
        vBox.getChildren().addAll(label1, label2, label3, label4);
        stage.setScene(new Scene(vBox));
@@ -679,7 +814,10 @@
    //启动同步功能
    private void startSync() {
        mSyncCount = 0;
        setInputText(txSyncStatus, "启动中");
        setInputText(txSyncCount, String.valueOf(mSyncCount));
        setInputText(txStartSyncTime, "-");
        setInputText(txLastSyncTime, "-");
        final String srcHost = getInputText(tvInputSrcHost);
        final String srcName = getInputText(tvInputSrcName);
@@ -734,14 +872,18 @@
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        //UI线程
                        setInputText(txStartSyncTime, DateUtils.formatDate(new Date()));
                        updateResult("连接中...");
                        btnStartSync.setDisable(true);
                        btnStopSync.setDisable(false);
                    }
                })
                .observeOn(Schedulers.newThread())
                .concatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String targetFreq) throws Exception {
                        mSyncCount = 0;
                        setSyncing(true);
                        return getStartSyncObservable(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName)
                                .map(new Function<String, String>() {
                                    @Override
@@ -755,20 +897,18 @@
                    @Override
                    public void accept(Object o) throws Exception {
                        //启动成功
                        setSyncing(true);
                        mSyncCount++;
                        if (mSyncCount >= Integer.MAX_VALUE) {
                            mSyncCount = 0;
                        if (isSyncing()) {
                            mSyncCount++;
                            if (mSyncCount >= Integer.MAX_VALUE) {
                                mSyncCount = 0;
                            }
                            setInputText(txLastSyncTime, DateUtils.formatDate(new Date()));
                            setInputText(txSyncCount, String.valueOf(mSyncCount));
                            setInputText(txSyncStatus, "同步中");
                            updateResult("启动成功!");
                            btnStartSync.setDisable(true);
                            btnStopSync.setDisable(false);
                        }
                        setInputText(txLastSyncTime, DateUtils.formatDate(new Date()));
                        setInputText(txSyncCount, String.valueOf(mSyncCount));
                        btnStartSync.setDisable(true);
                        btnStopSync.setDisable(false);
                        setInputText(txSyncStatus, "同步中");
                        setInputText(txStartSyncTime, DateUtils.formatDate(new Date()));
                        updateResult("启动成功!");
                    }
                })
                .map(new Function<String, Integer>() {
@@ -781,9 +921,9 @@
                        }
                    }
                })
                .concatMap(new Function<Integer, ObservableSource<String>>() {
                .concatMap(new Function<Integer, ObservableSource<Object>>() {
                    @Override
                    public ObservableSource<String> apply(Integer freq) throws Exception {
                    public ObservableSource<Object> apply(Integer freq) throws Exception {
                        int delay = freq > 0 ? freq : 10;
                        TimeUnit timeUnit = ListUtil.getDataByList(mFreqUnitList, freqSelect);
@@ -791,16 +931,40 @@
                        //System.out.println("delay:" + delay + " unit:" + unit);
                        return Observable.timer(delay, unit)
                                /*.doOnNext(new Consumer<Long>() {
                                .doOnNext(new Consumer<Long>() {
                                    @Override
                                    public void accept(Long aLong) throws Exception {
                                        startSync(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName);
                                        //如果中途某次执行失败,不能导致整个计划的失败
                                        addSubscribe(getStartSyncObservable(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName)
                                                .subscribe(new Consumer<Object>() {
                                                    @Override
                                                    public void accept(Object o) throws Exception {
                                                        //同步成功!
                                                        if (isSyncing()) {
                                                            mSyncCount++;
                                                            if (mSyncCount >= Integer.MAX_VALUE) {
                                                                mSyncCount = 0;
                                                            }
                                                            setInputText(txLastSyncTime, DateUtils.formatDate(new Date()));
                                                            setInputText(txSyncCount, String.valueOf(mSyncCount));
                                                            updateResult("同步成功!");
                                                            btnStartSync.setDisable(true);
                                                            btnStopSync.setDisable(false);
                                                        }
                                                    }
                                                }, new Consumer<Throwable>() {
                                                    @Override
                                                    public void accept(Throwable e) throws Exception {
                                                        e.printStackTrace();
                                                        //执行失败
                                                        updateResult(e.getMessage());
                                                    }
                                                }));
                                    }
                                })*/
                                .concatMap(new Function<Long, ObservableSource<String>>() {
                                }).map(new Function<Long, Object>() {
                                    @Override
                                    public ObservableSource<String> apply(Long o) throws Exception {
                                        return getStartSyncObservable(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName);
                                    public Object apply(Long aLong) throws Exception {
                                        return String.valueOf(aLong);
                                    }
                                }).repeat();
                    }
@@ -809,37 +973,30 @@
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        setSyncing(true);
                        mSyncCount++;
                        if (mSyncCount >= Integer.MAX_VALUE) {
                            mSyncCount = 0;
                        }
                        setInputText(txLastSyncTime, DateUtils.formatDate(new Date()));
                        setInputText(txSyncCount, String.valueOf(mSyncCount));
                        updateResult("同步成功!");
                        //流程执行完毕
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        throwable.printStackTrace();
                        updateResult(throwable.getMessage());
                    public void accept(Throwable e) throws Exception {
                        e.printStackTrace();
                        //执行失败
                        updateResult(e.getMessage());
                    }
                }));
    }
    //停止同步
    private void stopSync() {
        setSyncing(false);
        mSyncCount = 0;
        if (mCompositeDisposable != null) {
            mCompositeDisposable.clear();
            mCompositeDisposable.clear();//注意:会把所有操作都取消
        }
        setInputText(txSyncStatus, "已停止");
        updateResult("同步停止!");
        btnStartSync.setDisable(false);
        btnStopSync.setDisable(true);
        setSyncing(false);
        setInputText(txSyncStatus, "已停止");
        updateResult("停止成功!");
    }
    //保存 在启动时”启动“同步功能
@@ -858,7 +1015,7 @@
                        }
                        cb.setStartSync(startSync);
                        String path = PathUtil.projectPath + "\\config.json";
                        String path = PathUtil.getConfigFilePath("config.json");
                        String content = JsonUtil.toJson(cb);
                        //使用 BufferedWriter 写文件
@@ -895,7 +1052,8 @@
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        String targetUrl = "jdbc:sqlserver://" + targetHost + ";databaseName=" + targetDbName + ";integratedSecurity=false;";
                        String urlString = "jdbc:sqlserver://%s;databaseName=%s;integratedSecurity=false;";
                        String targetUrl = String.format(urlString, targetHost, targetDbName);
                        List<String> keyList = new ArrayList<>();
                        //1.从目标库查出来有多少传感器需要查找
@@ -903,7 +1061,7 @@
                             Statement stmt = con.createStatement()
                        ) {
                            ResultSet rs = stmt.executeQuery(SQL1_QUERY);
                            ResultSet rs = stmt.executeQuery(SQL_QUERY_TARGET);
                            while (rs.next()) {
                                keyList.add(rs.getString("IE_Param"));
                            }
@@ -920,9 +1078,9 @@
                        for (String key : keyList) {
                            StringUtil.append(sb, key, "','");
                        }
                        String sql = String.format(SQL2_QUERY, sb);
                        String sql = String.format(SQL_QUERY_SRC, sb);
                        String srcUrl = "jdbc:sqlserver://" + srcHost + ";databaseName=" + srcDbName + ";integratedSecurity=false;";
                        String srcUrl = String.format(urlString, srcHost, srcDbName);
                        //System.out.println("sql2:" + sql);
                        Map<String, List<TargetBean>> map = new HashMap<>();
@@ -972,7 +1130,7 @@
                                    for (TargetBean tb : list) {
                                        if (tb == null) continue;
                                        PreparedStatement ps = con3.prepareStatement(SQL3_UPDATE);
                                        PreparedStatement ps = con3.prepareStatement(SQL_UPDATE_TARGET);
                                        ps.setString(1, tb.getData());
                                        ps.setTimestamp(2, tb.getTime());
                                        ps.setString(3, version);
@@ -1021,11 +1179,19 @@
    }
    private void updateResult(String str) {
        System.out.println(str);
        //输出结果带时间
        System.out.printf("%s\t%s%n", DateUtils.formatDate(new Date()), str);
        setInputText(txOutResult, str);
    }
    private Insets getInsets() {
        if (SystemUtil.isWindows()) {
            return new Insets(0, 0, -6, 0);
        }
        return new Insets(0);
    }
    public boolean isSyncing() {
        return mSyncing;
    }