package com.github.hunter0x7c7.sync.ctrls; import com.github.hunter0x7c7.sync.config.ConfigProperties; import com.github.hunter0x7c7.sync.model.bean.ConfigBean; import com.github.hunter0x7c7.sync.model.bean.TargetBean; import com.github.hunter0x7c7.sync.model.interfaces.Callback; import com.github.hunter0x7c7.sync.model.storage.Session; import com.github.hunter0x7c7.sync.utils.*; import io.reactivex.Observable; import io.reactivex.ObservableSource; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.schedulers.Schedulers; import javafx.application.Platform; import javafx.event.ActionEvent; import javafx.event.EventHandler; import javafx.fxml.FXML; import javafx.geometry.Insets; import javafx.geometry.Pos; import javafx.scene.Scene; import javafx.scene.control.Button; import javafx.scene.control.Label; import javafx.scene.control.TextField; import javafx.scene.control.*; import javafx.scene.image.Image; import javafx.scene.layout.HBox; import javafx.scene.layout.VBox; import javafx.scene.text.Font; import javafx.scene.text.Text; import javafx.scene.text.TextAlignment; import javafx.stage.Modality; import javafx.stage.Stage; import java.awt.MenuItem; import java.awt.*; import java.sql.*; import java.util.Date; import java.util.List; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.UnaryOperator; import java.util.regex.Pattern; import static com.github.hunter0x7c7.sync.model.global.Parameters.*; public class Controller { public static final String SQL1_QUERY = "SELECT * FROM IOT_Equipment_Info WHERE IE_Type = 'XPH物联网关'; "; public static final String SQL2_QUERY = "SELECT SD_Addr + '|' + SD_Code IE_Param, * " + "FROM TY_SensorData WHERE SD_Key IN ( '%s' );"; public static final String SQL3_UPDATE = "UPDATE IOT_Equipment_Info " + "SET IE_Realtime_Data = ?, IE_Realtime_Time = ?, Edit_User = ?, Edit_Time = ? " + "WHERE IE_Param = ? AND IE_Parent = ( " + "SELECT IE_ID FROM IOT_Equipment_Info WHERE IE_Param = ? " + ") ;"; @FXML private TextField tvInputSrcHost; @FXML private TextField tvInputSrcName; @FXML private PasswordField tvInputSrcPwd; @FXML private TextField tvInputSrcDbName; @FXML private TextField tvInputTargetHost; @FXML private TextField tvInputTargetName; @FXML private PasswordField tvInputTargetPwd; @FXML private TextField tvInputTargetDbName; @FXML private TextField tvInputFreq; @FXML private ComboBox cbSelectFreqUnit; @FXML private Button btnSaveConfig; @FXML private Button btnStartSync; @FXML private Button btnStopSync; @FXML private Text txStartSyncTime; @FXML private Text txLastSyncTime; @FXML private Text txSyncStatus; @FXML private Text txSyncCount; @FXML private Text txOutResult; private CompositeDisposable mCompositeDisposable; //同步频率单位列表 private static final TimeUnit[] mFreqUnitList = {TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS}; //同步频率单位名称列表 private static final String[] mFreqNameList = {"毫秒", "秒", "分钟", "小时", "天"}; //同步次数 private int mSyncCount; //是否正在同步中 private boolean mSyncing; public Controller() { } public void initialize() { init(); } /*@Override protected void finalize() throws Throwable { super.finalize(); }*/ private void init() { mCompositeDisposable = new CompositeDisposable(); //初始化数据 addSubscribe(Observable.just(true) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.io()) .doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { //初始化数据 setSyncing(false); mSyncCount = 0; if (cbSelectFreqUnit != null) { cbSelectFreqUnit.getItems().addAll(mFreqNameList); } //设置整数规则 tvInputFreq.setTextFormatter(new TextFormatter(new UnaryOperator() { @Override public TextFormatter.Change apply(TextFormatter.Change change) { //整数规则"\\d*" //小数规则"\\d*|\\d+\\.\\d*" return Pattern.compile("\\d*") .matcher(change.getControlNewText()) .matches() ? change : null; } })); /*tvInputFreq.textProperty().addListener((observable, oldValue, newValue) -> { if (!newValue.matches("\\d*")) {//整数规则 tvInputFreq.setText(newValue.replaceAll("[^\\d]", "")); } });*/ } }).doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { //UI线程 Platform.runLater(new Runnable() { @Override public void run() { setInputText(txSyncStatus, "未启动"); setInputText(txSyncCount, String.valueOf(mSyncCount)); setInputText(txStartSyncTime, "-"); setInputText(txLastSyncTime, "-"); if (cbSelectFreqUnit != null) { cbSelectFreqUnit.getSelectionModel().select(2);//默认选中 分钟 } } }); } }).doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { //添加到系统托盘 Stage primaryStage = Session.getInstance().getPrimaryStage(); String tooltip = String.format("%s v%s", AppName, VersionName);//SyncTools v1.0 String mipmap = "mipmap/ic_chinese_cabbage_16.png"; MenuItem show = new MenuItem("显示");//显示Show //绑定系统托盘事件 show.addActionListener(actionListener -> { Platform.runLater(() -> { if (primaryStage != null) { primaryStage.show(); } }); }); Callback callback = new Callback() { @Override public void onCall(Object o) { if (isSyncing()) { stopSync();//停止同步 } } }; //添加到系统托盘 TrayUtil.getInstance().addSystemTray(primaryStage, AppNameString, mipmap, callback, show); } }).doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { //初始化已保存的配置信息 ConfigBean cb = getSaveConfig(); if (cb != null) { //UI线程 Platform.runLater(new Runnable() { @Override public void run() { setInputText(tvInputSrcHost, cb.getSrcHost()); setInputText(tvInputSrcName, cb.getSrcName()); setInputText(tvInputSrcPwd, getDecryptData(cb.getSrcPwd())); setInputText(tvInputSrcDbName, cb.getSrcDbName()); setInputText(tvInputTargetHost, cb.getTargetHost()); setInputText(tvInputTargetName, cb.getTargetName()); setInputText(tvInputTargetPwd, getDecryptData(cb.getTargetPwd())); setInputText(tvInputTargetDbName, cb.getTargetDbName()); setInputText(tvInputFreq, cb.getFreqValue()); cbSelectFreqUnit.getSelectionModel().select(cb.getFreqUnit()); //默认启动同步功能 if (cb.isStartSync()) { startSync(); //启动同步功能 } } }); } else if (BuildType.equals(BuildTypeEnum.DEBUG)) { try { String srcHost = ConfigProperties.SRC_HOST; String srcName = ConfigProperties.SRC_NAME; String srcPwd = ConfigProperties.SRC_PWD; String srcDbName = ConfigProperties.SRC_DB_NAME; String targetHost = ConfigProperties.TARGET_HOST; String targetName = ConfigProperties.TARGET_NAME; String targetPwd = ConfigProperties.TARGET_PWD; String targetDbName = ConfigProperties.TARGET_DB_NAME; //UI线程 Platform.runLater(new Runnable() { @Override public void run() { setInputText(tvInputSrcHost, srcHost); setInputText(tvInputSrcName, srcName); setInputText(tvInputSrcPwd, srcPwd); setInputText(tvInputSrcDbName, srcDbName); setInputText(tvInputTargetHost, targetHost); setInputText(tvInputTargetName, targetName); setInputText(tvInputTargetPwd, targetPwd); setInputText(tvInputTargetDbName, targetDbName); setInputText(tvInputFreq, "10"); cbSelectFreqUnit.getSelectionModel().select(1); } }); } catch (Exception e) { throw new RuntimeException(e); } } } }).subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { //UI线程 Platform.runLater(new Runnable() { @Override public void run() { updateResult("数据同步工具"); } }); } }, new Consumer() { @Override public void accept(Throwable e) throws Exception { e.printStackTrace(); } })); } public void onClickParameter(ActionEvent event) { //点击:参数设置 clickParameter(); } public void onClickExit(ActionEvent event) { //点击:退出 clickExit(); } public void onClickAbout(ActionEvent event) { //点击:关于 clickAbout(); } public void onClickSaveConfig(ActionEvent event) { //点击:保存配置信息 clickSaveConfig(); } public void onClickStartSync(ActionEvent event) { //点击:启动同步 clickStartSync(); } public void onClickStopSync(ActionEvent event) { //点击:停止同步 clickStopSync(); } //点击:参数设置 private void clickParameter() { addSubscribe(Observable.just(true) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.io()) .map(new Function() { @Override public Boolean apply(Object o) throws Exception { //获取配置文件中的数据 return isStartSyncForSaveConfig(); } }).subscribe(new Consumer() { @Override public void accept(Boolean aBoolean) throws Exception { final boolean startSync = aBoolean != null && aBoolean; //UI线程 Platform.runLater(new Runnable() { @Override public void run() { //显示对话框:参数设置 showParamDialog(startSync); } }); } })); } //点击:退出 private void clickExit() { addSubscribe(Observable.just(true) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.io()) .doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { if (isSyncing()) { stopSync();//停止同步 } } }) .doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { //检查系统是否支持托盘 if (SystemTray.isSupported()) { //系统托盘 SystemTray tray = SystemTray.getSystemTray(); if (tray != null) { TrayIcon[] icons = tray.getTrayIcons(); for (TrayIcon icon : icons) { if (icon == null) continue; //退出之前先移除系统托盘图标 if (APPID.equals(icon.getActionCommand())) { tray.remove(icon); } } } } } }) .subscribe(new Consumer() { @Override public void accept(Boolean aBoolean) throws Exception { TrayUtil.getInstance().exitApp();//退出 } }, new Consumer() { @Override public void accept(Throwable e) throws Exception { e.printStackTrace(); TrayUtil.getInstance().exitApp();//退出 } })); } //点击:关于 private void clickAbout() { showAboutDialog(); } //点击:保存配置信息 private void clickSaveConfig() { addSubscribe(Observable.just(true) .doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { if (btnSaveConfig != null) { btnSaveConfig.setDisable(true); } updateResult("保存中..."); } }).observeOn(Schedulers.newThread()) .map(new Function() { @Override public Boolean apply(Object o) throws Exception { String srcHost = getInputText(tvInputSrcHost); String srcName = getInputText(tvInputSrcName); String srcPwd = getInputText(tvInputSrcPwd); String srcDbName = getInputText(tvInputSrcDbName); String targetHost = getInputText(tvInputTargetHost); String targetName = getInputText(tvInputTargetName); String targetPwd = getInputText(tvInputTargetPwd); String targetDbName = getInputText(tvInputTargetDbName); String targetFreq = getInputText(tvInputFreq); int freqSelect = cbSelectFreqUnit.getSelectionModel().getSelectedIndex(); boolean startSync = isStartSyncForSaveConfig(); ConfigBean cb = new ConfigBean(srcHost, srcName, getEncryptData(srcPwd), srcDbName , targetHost, targetName, getEncryptData(targetPwd), targetDbName , targetFreq, freqSelect, startSync); //写入文件 String path = PathUtil.getConfigFilePath("config.json"); String content = JsonUtil.toJson(cb); //使用 BufferedWriter 写文件 return FileUtil.bufferedWriter(path, content); } }) .subscribe(new Consumer() { @Override public void accept(Boolean aBoolean) throws Exception { //UI线程 Platform.runLater(new Runnable() { @Override public void run() { boolean saveResult = aBoolean != null && aBoolean; updateResult(saveResult ? "保存成功!" : "保存失败!"); if (btnSaveConfig != null) { btnSaveConfig.setDisable(false); } } }); } })); } //点击:启动同步 private void clickStartSync() { if (!isSyncing()) { startSync(); //启动同步功能 } else { updateResult("已启动同步功能!"); } } //点击:停止同步 private void clickStopSync() { if (isSyncing()) { stopSync();//停止同步 } else { updateResult("未启动同步功能!"); } } //解密 public static String getDecryptData(String encrypted) { try { return AesUtil.getInstance().decrypt(encrypted); } catch (Exception e) { e.printStackTrace(); } return ""; } //加密 public static String getEncryptData(String plaintext) { try { return AesUtil.getInstance().encrypt(plaintext); } catch (Exception e) { e.printStackTrace(); } return ""; } //从本地配置文件中获取配置信息 private ConfigBean getSaveConfig() { try { String json = FileUtil.bufferedReader(PathUtil.getConfigFilePath("config.json")); return JsonUtil.fromJson(json, ConfigBean.class); } catch (Exception e) { e.printStackTrace(); } return null; } //从配置信息中获取是否开启:在启动时“启动”同步功能 private boolean isStartSyncForSaveConfig() { ConfigBean cb = getSaveConfig(); return cb != null && cb.isStartSync(); } //显示对话框:参数设置 private void showParamDialog(boolean startSync) { Stage primaryStage = Session.getInstance().getPrimaryStage(); final Stage stage = new Stage(); stage.setTitle("参数设置"); stage.setResizable(false); stage.setWidth(360); stage.setMinHeight(218); stage.getIcons().add(new Image("mipmap/ic_chinese_cabbage_48.png")); //APPLICATION_MODAL 全局模态,该窗口运行时,其他窗口不能进行操作 //WINDOW_MODAL 为父窗口禁用,其他窗口可以使用 stage.initOwner(primaryStage); //设置父窗口 stage.initModality(Modality.APPLICATION_MODAL); //stage.initStyle(StageStyle.UTILITY);//禁用最大化和最小化 Label title = new Label("常规"); title.setAlignment(Pos.CENTER); title.setFont(new Font(14)); CheckBox cbStartSync = new CheckBox("在启动时\"启动\"同步功能"); cbStartSync.setFont(new Font(12)); cbStartSync.setSelected(startSync); VBox generalList = new VBox(); generalList.setPadding(new Insets(0, 20, 30, 20)); generalList.setSpacing(4.0); generalList.getChildren().add(cbStartSync); Label foot = new Label(String.format("* 更改将于重新启动 %s 后生效", AppName));//"* 更改将于重新启动 SyncTools 后生效"); foot.setAlignment(Pos.CENTER); foot.setFont(new Font(12)); VBox content = new VBox(); content.setStyle("-fx-background-color: white"); content.setPadding(new Insets(20)); content.setSpacing(10); content.getChildren().addAll(title, generalList, foot); Button defButton = new Button("确定"); defButton.setAlignment(Pos.CENTER); defButton.setPrefSize(84, 24); defButton.setDefaultButton(true); defButton.setOnAction(new EventHandler() { @Override public void handle(ActionEvent arg0) { boolean startSync = cbStartSync.isSelected(); Callback callback = new Callback() { @Override public void onCall(Boolean aBoolean) { call(aBoolean != null && aBoolean); } private void call(boolean isSuccess) { //操作成功 if (!isSuccess) { return; } //UI线程 Platform.runLater(new Runnable() { @Override public void run() { if (stage != null) { stage.close(); } } }); } }; saveConfigForIsStartSync(startSync, callback); } }); Button cancelButton = new Button("取消"); cancelButton.setAlignment(Pos.CENTER); cancelButton.setPrefSize(84, 24); cancelButton.setCancelButton(true); cancelButton.setOnAction(new EventHandler() { @Override public void handle(ActionEvent arg0) { if (stage != null) { stage.close(); } } }); HBox operate = new HBox(); operate.setAlignment(Pos.CENTER_RIGHT); operate.setSpacing(8); // operate.setStyle("-fx-background-color: #F0F0F0"); operate.setPadding(new Insets(12, 20, 12, 20)); operate.getChildren().addAll(defButton, cancelButton); VBox vBox = new VBox(); vBox.setSpacing(0); vBox.setPadding(getInsets()); vBox.getChildren().addAll(content, operate); stage.setScene(new Scene(vBox)); stage.show(); } //显示对话框:关于 private void showAboutDialog() { Stage primaryStage = Session.getInstance().getPrimaryStage(); final Stage stage = new Stage(); stage.setTitle("关于"); stage.setResizable(false); stage.setWidth(360); stage.setMinHeight(200); stage.getIcons().add(new Image("mipmap/ic_chinese_cabbage_48.png")); //APPLICATION_MODAL 全局模态,该窗口运行时,其他窗口不能进行操作 //WINDOW_MODAL 为父窗口禁用,其他窗口可以使用 stage.initOwner(primaryStage); //设置父窗口 stage.initModality(Modality.APPLICATION_MODAL); //stage.initStyle(StageStyle.UTILITY);//禁用最大化和最小化 String description = "这是一个数据库数据同步的小工具,把需要同步的农抬头\r\n传感器数据从总库中更新到指定的数据库。"; String version = String.format("Version %s, Build %s", VersionName, getBuildName());//"Version 1.0, Build 001.230804"; Label label1 = new Label(AppName);//SyncTools label1.setAlignment(Pos.CENTER); label1.setPrefWidth(360); label1.setFont(new Font("System Bold", 24)); Label label2 = new Label(Author);//Hunter label2.setAlignment(Pos.CENTER); label2.setPrefWidth(360); label2.setFont(new Font(14)); Label label3 = new Label(description);//描述 label3.setAlignment(Pos.CENTER); label3.setFont(new Font(12)); label3.setPrefWidth(360); label3.setPadding(new Insets(30, 0, 4, 0)); label3.setTextAlignment(TextAlignment.CENTER); Label label4 = new Label(version);//版本 label4.setAlignment(Pos.CENTER); label4.setPrefWidth(360); label4.setFont(new Font(14)); VBox vBox = new VBox(); vBox.setPadding(new Insets(20, 20, 4, 20)); vBox.getChildren().addAll(label1, label2, label3, label4); stage.setScene(new Scene(vBox)); stage.show(); } //显示对话框:提示用户 private void showPromptDialog(String prompt) { Stage primaryStage = Session.getInstance().getPrimaryStage(); final Stage stage = new Stage(); stage.setTitle("提示"); stage.setResizable(false); stage.setWidth(360); stage.setHeight(150); stage.getIcons().add(new Image("mipmap/ic_chinese_cabbage_48.png")); //APPLICATION_MODAL 全局模态,该窗口运行时,其他窗口不能进行操作 //WINDOW_MODAL 为父窗口禁用,其他窗口可以使用 stage.initOwner(primaryStage); //设置父窗口 stage.initModality(Modality.APPLICATION_MODAL); //stage.initStyle(StageStyle.UTILITY);//禁用最大化和最小化 Label label1 = new Label(String.format("请输入%s", prompt)); label1.setAlignment(Pos.CENTER); label1.setPrefWidth(360); label1.setFont(new Font(18)); Button defButton = new Button("好的"); defButton.setAlignment(Pos.CENTER); defButton.setPrefSize(84, 24); defButton.setOnAction(new EventHandler() { @Override public void handle(ActionEvent arg0) { if (stage != null) { stage.close(); } } }); HBox hBox = new HBox(); hBox.setAlignment(Pos.CENTER); hBox.setPadding(new Insets(30, 0, 0, 0)); hBox.getChildren().add(defButton); VBox vBox = new VBox(); vBox.setPadding(new Insets(20, 20, 10, 20)); vBox.getChildren().addAll(label1, hBox); stage.setScene(new Scene(vBox)); stage.show(); } //启动同步功能 private void startSync() { mSyncCount = 0; setInputText(txSyncCount, String.valueOf(mSyncCount)); final String srcHost = getInputText(tvInputSrcHost); final String srcName = getInputText(tvInputSrcName); final String srcPwd = getInputText(tvInputSrcPwd); final String srcDbName = getInputText(tvInputSrcDbName); final String targetHost = getInputText(tvInputTargetHost); final String targetName = getInputText(tvInputTargetName); final String targetPwd = getInputText(tvInputTargetPwd); final String targetDbName = getInputText(tvInputTargetDbName); final String targetFreq = getInputText(tvInputFreq); final int freqSelect = cbSelectFreqUnit.getSelectionModel().getSelectedIndex(); //判断输入是否为空 if (TextUtils.isEmpty(srcHost)) { showPromptDialog("源数据库主机地址"); return; } if (TextUtils.isEmpty(srcName)) { showPromptDialog("源数据库用户名"); return; } if (TextUtils.isEmpty(srcPwd)) { showPromptDialog("源数据库密码"); return; } if (TextUtils.isEmpty(srcDbName)) { showPromptDialog("源数据库名"); return; } if (TextUtils.isEmpty(targetHost)) { showPromptDialog("目标库主机地址"); return; } if (TextUtils.isEmpty(targetName)) { showPromptDialog("目标库用户名"); return; } if (TextUtils.isEmpty(targetPwd)) { showPromptDialog("目标库密码"); return; } if (TextUtils.isEmpty(targetDbName)) { showPromptDialog("目标库名"); return; } if (TextUtils.isEmpty(targetFreq)) { showPromptDialog("同步频率"); return; } addSubscribe(Observable.just(targetFreq) .doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { //UI线程 updateResult("连接中..."); } }) .observeOn(Schedulers.newThread()) .concatMap(new Function>() { @Override public ObservableSource apply(String targetFreq) throws Exception { return getStartSyncObservable(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName) .map(new Function() { @Override public String apply(String s) throws Exception { return targetFreq; } }); } }) .doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { //启动成功 setSyncing(true); mSyncCount++; if (mSyncCount >= Integer.MAX_VALUE) { mSyncCount = 0; } setInputText(txLastSyncTime, DateUtils.formatDate(new Date())); setInputText(txSyncCount, String.valueOf(mSyncCount)); btnStartSync.setDisable(true); btnStopSync.setDisable(false); setInputText(txSyncStatus, "同步中"); setInputText(txStartSyncTime, DateUtils.formatDate(new Date())); updateResult("启动成功!"); } }) .map(new Function() { @Override public Integer apply(String freq) throws Exception { try { return Integer.parseInt(freq); } catch (NumberFormatException e) { return -1; } } }) .concatMap(new Function>() { @Override public ObservableSource apply(Integer freq) throws Exception { int delay = freq > 0 ? freq : 10; TimeUnit timeUnit = ListUtil.getDataByList(mFreqUnitList, freqSelect); TimeUnit unit = freq > 0 && timeUnit != null ? timeUnit : TimeUnit.MINUTES; //System.out.println("delay:" + delay + " unit:" + unit); return Observable.timer(delay, unit) /*.doOnNext(new Consumer() { @Override public void accept(Long aLong) throws Exception { startSync(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName); } })*/ .concatMap(new Function>() { @Override public ObservableSource apply(Long o) throws Exception { return getStartSyncObservable(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName); } }).repeat(); } }) .observeOn(Schedulers.io()) .subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { setSyncing(true); mSyncCount++; if (mSyncCount >= Integer.MAX_VALUE) { mSyncCount = 0; } setInputText(txLastSyncTime, DateUtils.formatDate(new Date())); setInputText(txSyncCount, String.valueOf(mSyncCount)); updateResult("同步成功!"); } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { throwable.printStackTrace(); updateResult(throwable.getMessage()); } })); } //停止同步 private void stopSync() { if (mCompositeDisposable != null) { mCompositeDisposable.clear(); } btnStartSync.setDisable(false); btnStopSync.setDisable(true); setSyncing(false); setInputText(txSyncStatus, "已停止"); updateResult("停止成功!"); } //保存 在启动时”启动“同步功能 private void saveConfigForIsStartSync(final boolean startSync, final Callback callback) { addSubscribe(Observable.just(true) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.io()) .doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { ConfigBean cb = getSaveConfig(); if (cb == null) { cb = new ConfigBean(); } cb.setStartSync(startSync); String path = PathUtil.getConfigFilePath("config.json"); String content = JsonUtil.toJson(cb); //使用 BufferedWriter 写文件 boolean saveResult = FileUtil.bufferedWriter(path, content); System.out.println(saveResult ? "保存成功!" : "保存失败!"); } }) .subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { if (callback != null) { callback.onCall(true); } } }, new Consumer() { @Override public void accept(Throwable e) throws Exception { e.printStackTrace(); if (callback != null) { callback.onCall(false); } } })); } //开始同步,获取开始同步的Observable private Observable getStartSyncObservable(final String srcHost, final String srcName, final String srcPwd, final String srcDbName , final String targetHost, final String targetName, final String targetPwd, final String targetDbName) { return Observable.just(String.valueOf(0)) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.io()) .doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { String targetUrl = "jdbc:sqlserver://" + targetHost + ";databaseName=" + targetDbName + ";integratedSecurity=false;"; List keyList = new ArrayList<>(); //1.从目标库查出来有多少传感器需要查找 try (Connection con = DriverManager.getConnection(targetUrl, targetName, targetPwd); Statement stmt = con.createStatement() ) { ResultSet rs = stmt.executeQuery(SQL1_QUERY); while (rs.next()) { keyList.add(rs.getString("IE_Param")); } rs.close(); stmt.close(); con.close(); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("连接目标库失败!"); } //System.out.println(keyList); StringBuilder sb = new StringBuilder(); for (String key : keyList) { StringUtil.append(sb, key, "','"); } String sql = String.format(SQL2_QUERY, sb); String srcUrl = "jdbc:sqlserver://" + srcHost + ";databaseName=" + srcDbName + ";integratedSecurity=false;"; //System.out.println("sql2:" + sql); Map> map = new HashMap<>(); //2. 从源数据库查出来最新数据 try (Connection con = DriverManager.getConnection(srcUrl, srcName, srcPwd); Statement stmt = con.createStatement() ) { List list; ResultSet rs = stmt.executeQuery(sql); while (rs.next()) { String key = rs.getString("SD_Key"); String param = rs.getString("IE_Param"); String data = rs.getString("SD_Data"); Timestamp time = rs.getTimestamp("SD_Time"); //System.out.printf("key:%s param:%s data:%s time:%s%n", key, param, data, time.toString()); list = map.get(key); if (list == null) { list = new ArrayList<>(); } list.add(new TargetBean(key, param, data, time)); map.put(key, list); } rs.close(); stmt.close(); con.close(); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("查询源数据库失败!"); } //3. 把最新数据更新到目标数据库 try (Connection con3 = DriverManager.getConnection(targetUrl, targetName, targetPwd); Statement stmt3 = con3.createStatement() ) { //循环修改数据 Timestamp time = new Timestamp(System.currentTimeMillis()); String version = String.format("%sV%s", AppName, VersionName);//"SyncToolsV1.0"; Set>> entrySet = map.entrySet(); for (Map.Entry> entry : entrySet) { List list = entry.getValue(); if (list != null) { String key = entry.getKey(); for (TargetBean tb : list) { if (tb == null) continue; PreparedStatement ps = con3.prepareStatement(SQL3_UPDATE); ps.setString(1, tb.getData()); ps.setTimestamp(2, tb.getTime()); ps.setString(3, version); ps.setTimestamp(4, time); ps.setString(5, tb.getParam()); ps.setString(6, key); ps.execute(); //System.out.println("sql3:" + sql3); } } } stmt3.close(); con3.close(); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("更新目标数据库失败!"); } } }); } protected void addSubscribe(Disposable disposable) { if (this.mCompositeDisposable == null) { this.mCompositeDisposable = new CompositeDisposable(); } this.mCompositeDisposable.add(disposable); } private String getInputText(TextField field) { return field != null ? field.getText() : null; } private void setInputText(TextField field, String str) { if (field != null) { field.setText(str); } } private void setInputText(Text field, String str) { if (field != null) { field.setText(str); } } private void updateResult(String str) { System.out.println(str); setInputText(txOutResult, str); } private Insets getInsets() { if (SystemUtil.isWindows()) { return new Insets(0, 0, -10, 0); } return new Insets(0); } public boolean isSyncing() { return mSyncing; } public void setSyncing(boolean mSyncing) { this.mSyncing = mSyncing; } }