| | |
| | | import io.reactivex.functions.Predicate; |
| | | import io.reactivex.schedulers.Schedulers; |
| | | import javafx.application.Platform; |
| | | import javafx.beans.value.ChangeListener; |
| | | import javafx.beans.value.ObservableValue; |
| | | import javafx.event.ActionEvent; |
| | | import javafx.event.EventHandler; |
| | | import javafx.fxml.FXML; |
| | |
| | | import static com.github.hunter0x7c7.sync.model.global.Parameters.*; |
| | | |
| | | public class Controller { |
| | | public static final String SQL1_QUERY = "SELECT * FROM IOT_Equipment_Info WHERE IE_Type = 'XPH物联网关'; "; |
| | | public static final String SQL2_QUERY = "SELECT SD_Addr + '|' + SD_Code IE_Param, * " + |
| | | "FROM TY_SensorData WHERE SD_Key IN ( '%s' );"; |
| | | public static final String SQL3_UPDATE = "UPDATE IOT_Equipment_Info " + |
| | | //1. 在目标库,把采集类网关设备先找出来 |
| | | public static final String SQL_QUERY_TARGET = "SELECT * " + |
| | | "FROM IOT_Equipment_Info " + |
| | | "WHERE IE_Type = '物联网关' " + |
| | | "OR IE_Type = 'XPH网关' " + |
| | | "OR IE_Type = 'XPH物联网关' " + |
| | | "OR IE_Type = '气象站网关' " + |
| | | "OR IE_Type = '采集物联网关' " +//温室采集机柜 |
| | | "OR IE_Type = '虫情监测网关' " + |
| | | "OR IE_Type = '墒情网关' " + |
| | | "OR IE_Type = '远传水表'; "; |
| | | |
| | | //2. 在源数据库,把相关网关设备的子行查找出来,拼接好Param参数00|01 |
| | | public static final String SQL_QUERY_SRC = "SELECT SD_Addr + '|' + SD_Code IE_Param, * " + |
| | | "FROM TY_SensorData " + |
| | | "WHERE SD_Key IN ( '%s' ); "; |
| | | //3. 在目标库,把匹配的最新数据更新到相关的行中 |
| | | public static final String SQL_UPDATE_TARGET = "UPDATE IOT_Equipment_Info " + |
| | | "SET IE_Realtime_Data = ?, IE_Realtime_Time = ?, Edit_User = ?, Edit_Time = ? " + |
| | | "WHERE IE_Param = ? AND IE_Parent = ( " + |
| | | "SELECT IE_ID FROM IOT_Equipment_Info WHERE IE_Param = ? " + |
| | | ") ;"; |
| | | "WHERE IE_Param = ? " + |
| | | "AND IE_Parent IN ( " + |
| | | "SELECT IE_ID " + |
| | | "FROM IOT_Equipment_Info " + |
| | | "WHERE IE_Param = ? ) ;"; |
| | | |
| | | @FXML |
| | | private TextField tvInputSrcHost; |
| | | @FXML |
| | | private TextField tvInputSrcName; |
| | | @FXML |
| | | private PasswordField tvInputSrcPwd; |
| | | private TextField tvInputSrcPwd; |
| | | @FXML |
| | | private TextField tvInputSrcDbName; |
| | | |
| | |
| | | @FXML |
| | | private TextField tvInputTargetName; |
| | | @FXML |
| | | private PasswordField tvInputTargetPwd; |
| | | private TextField tvInputTargetPwd; |
| | | @FXML |
| | | private TextField tvInputTargetDbName; |
| | | @FXML |
| | |
| | | |
| | | private CompositeDisposable mCompositeDisposable; |
| | | //同步频率单位列表 |
| | | private static final TimeUnit[] mFreqUnitList = {TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS}; |
| | | private static final TimeUnit[] mFreqUnitList = { |
| | | TimeUnit.MILLISECONDS, |
| | | TimeUnit.SECONDS, |
| | | TimeUnit.MINUTES, |
| | | TimeUnit.HOURS, |
| | | TimeUnit.DAYS}; |
| | | //同步频率单位名称列表 |
| | | private static final String[] mFreqNameList = {"毫秒", "秒", "分钟", "小时", "天"}; |
| | | //同步次数 |
| | |
| | | show.addActionListener(actionListener -> { |
| | | Platform.runLater(() -> { |
| | | if (primaryStage != null) { |
| | | primaryStage.show(); |
| | | if (primaryStage.isIconified()) {//最小化 |
| | | primaryStage.setIconified(false); |
| | | } |
| | | if (!primaryStage.isShowing()) { |
| | | primaryStage.show(); |
| | | } |
| | | primaryStage.toFront(); |
| | | } |
| | | }); |
| | | }); |
| | |
| | | }; |
| | | //添加到系统托盘 |
| | | TrayUtil.getInstance().addSystemTray(primaryStage, AppName_zh_rCN, mipmap, callback, show); |
| | | |
| | | //最小化之后,关闭窗口 |
| | | primaryStage.iconifiedProperty().addListener(new ChangeListener<Boolean>() { |
| | | @Override |
| | | public void changed(ObservableValue<? extends Boolean> observable, Boolean oldValue, Boolean newValue) { |
| | | //如果你执行了setIconified(true)之后再执行hide(), |
| | | // stage会被销毁,因此需要在窗口关闭前将Iconified设为false |
| | | if (!oldValue && newValue) { |
| | | //最小化之后,关闭窗口 |
| | | if (primaryStage != null) { |
| | | primaryStage.close(); |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | }).concatMap(new Function<Object, Observable<ConfigBean>>() {//准备初始化的配置数据 |
| | | @Override |
| | |
| | | }).map(new Function<Boolean, ConfigBean>() { |
| | | @Override |
| | | public ConfigBean apply(Boolean aBoolean) throws Exception { |
| | | assert configBean != null; |
| | | configBean.setSrcPwd(getDecryptData(configBean.getSrcPwd())); |
| | | configBean.setTargetPwd(getDecryptData(configBean.getTargetPwd())); |
| | | return configBean; |
| | | } |
| | | }).switchIfEmpty(new Observable<ConfigBean>() { |
| | |
| | | //更新界面数据 |
| | | setInputText(tvInputSrcHost, cb.getSrcHost()); |
| | | setInputText(tvInputSrcName, cb.getSrcName()); |
| | | setInputText(tvInputSrcPwd, getDecryptData(cb.getSrcPwd())); |
| | | setInputText(tvInputSrcPwd, cb.getSrcPwd()); |
| | | setInputText(tvInputSrcDbName, cb.getSrcDbName()); |
| | | setInputText(tvInputTargetHost, cb.getTargetHost()); |
| | | setInputText(tvInputTargetName, cb.getTargetName()); |
| | | setInputText(tvInputTargetPwd, getDecryptData(cb.getTargetPwd())); |
| | | setInputText(tvInputTargetPwd, cb.getTargetPwd()); |
| | | setInputText(tvInputTargetDbName, cb.getTargetDbName()); |
| | | setInputText(tvInputFreq, cb.getFreqValue()); |
| | | cbSelectFreqUnit.getSelectionModel().select(cb.getFreqUnit()); |
| | |
| | | |
| | | //点击:退出 |
| | | private void clickExit() { |
| | | addSubscribe(Observable.just(true) |
| | | Disposable d = Observable.just(true) |
| | | .subscribeOn(Schedulers.newThread()) |
| | | .observeOn(Schedulers.io()) |
| | | .doOnNext(new Consumer<Object>() { |
| | | @Override |
| | | public void accept(Object o) throws Exception { |
| | | //1. 停止同步 |
| | | if (isSyncing()) { |
| | | stopSync();//停止同步 |
| | | } |
| | | } |
| | | }) |
| | | .doOnNext(new Consumer<Object>() { |
| | | .concatMap(new Function<Object, ObservableSource<Object>>() { |
| | | @Override |
| | | public void accept(Object o) throws Exception { |
| | | |
| | | //检查系统是否支持托盘 |
| | | if (SystemTray.isSupported()) { |
| | | //系统托盘 |
| | | SystemTray tray = SystemTray.getSystemTray(); |
| | | if (tray != null) { |
| | | TrayIcon[] icons = tray.getTrayIcons(); |
| | | for (TrayIcon icon : icons) { |
| | | if (icon == null) continue; |
| | | public ObservableSource<Object> apply(Object o) throws Exception { |
| | | //2. 移除系统托盘 |
| | | return Observable.create(new ObservableOnSubscribe<Object>() { |
| | | @Override |
| | | public void subscribe(ObservableEmitter<Object> emitter) throws Exception { |
| | | //检查系统是否支持托盘 |
| | | if (SystemTray.isSupported()) { |
| | | //退出之前先移除系统托盘图标 |
| | | if (APPID.equals(icon.getActionCommand())) { |
| | | tray.remove(icon); |
| | | } |
| | | final SystemTray tray = SystemTray.getSystemTray();//系统托盘 |
| | | final TrayIcon icon = getTrayIcon(tray, APPID); |
| | | Platform.runLater(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | try { |
| | | //移除系统托盘 |
| | | tray.remove(icon); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | if (emitter != null) { |
| | | emitter.onNext(o); |
| | | emitter.onComplete(); |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | }) |
| | | .subscribe(new Consumer<Boolean>() { |
| | | .subscribe(new Consumer<Object>() { |
| | | @Override |
| | | public void accept(Boolean aBoolean) throws Exception { |
| | | |
| | | public void accept(Object o) throws Exception { |
| | | //3. 停止同步和移除系统托盘后,退出程序 |
| | | TrayUtil.getInstance().exitApp();//退出 |
| | | } |
| | | }, new Consumer<Throwable>() { |
| | |
| | | |
| | | TrayUtil.getInstance().exitApp();//退出 |
| | | } |
| | | })); |
| | | }); |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | //点击:停止同步 |
| | | private void clickStopSync() { |
| | | if (isSyncing()) { |
| | | stopSync();//停止同步 |
| | | } else { |
| | | updateResult("未启动同步功能!"); |
| | | stopSync();//停止同步 |
| | | } |
| | | |
| | | public TrayIcon getTrayIcon(SystemTray tray, String key) { |
| | | if (tray != null && key != null) { |
| | | TrayIcon[] icons = tray.getTrayIcons(); |
| | | for (TrayIcon icon : icons) { |
| | | if (icon == null) continue; |
| | | //找到指定的托盘图标 |
| | | if (key.equals(icon.getActionCommand())) { |
| | | return icon; |
| | | } |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | //解密 |
| | |
| | | //启动同步功能 |
| | | private void startSync() { |
| | | mSyncCount = 0; |
| | | setInputText(txSyncStatus, "启动中"); |
| | | setInputText(txSyncCount, String.valueOf(mSyncCount)); |
| | | setInputText(txStartSyncTime, "-"); |
| | | setInputText(txLastSyncTime, "-"); |
| | | |
| | | final String srcHost = getInputText(tvInputSrcHost); |
| | | final String srcName = getInputText(tvInputSrcName); |
| | |
| | | .doOnNext(new Consumer<Object>() { |
| | | @Override |
| | | public void accept(Object o) throws Exception { |
| | | //UI线程 |
| | | setInputText(txStartSyncTime, DateUtils.formatDate(new Date())); |
| | | updateResult("连接中..."); |
| | | btnStartSync.setDisable(true); |
| | | btnStopSync.setDisable(false); |
| | | } |
| | | }) |
| | | .observeOn(Schedulers.newThread()) |
| | | .concatMap(new Function<String, ObservableSource<String>>() { |
| | | @Override |
| | | public ObservableSource<String> apply(String targetFreq) throws Exception { |
| | | mSyncCount = 0; |
| | | setSyncing(true); |
| | | return getStartSyncObservable(srcHost, srcName, srcPwd, srcDbName, targetHost, targetName, targetPwd, targetDbName) |
| | | .map(new Function<String, String>() { |
| | | @Override |
| | |
| | | @Override |
| | | public void accept(Object o) throws Exception { |
| | | //启动成功 |
| | | setSyncing(true); |
| | | mSyncCount++; |
| | | if (mSyncCount >= Integer.MAX_VALUE) { |
| | | mSyncCount = 0; |
| | | if (isSyncing()) { |
| | | mSyncCount++; |
| | | if (mSyncCount >= Integer.MAX_VALUE) { |
| | | mSyncCount = 0; |
| | | } |
| | | setInputText(txLastSyncTime, DateUtils.formatDate(new Date())); |
| | | setInputText(txSyncCount, String.valueOf(mSyncCount)); |
| | | setInputText(txSyncStatus, "同步中"); |
| | | updateResult("启动成功!"); |
| | | btnStartSync.setDisable(true); |
| | | btnStopSync.setDisable(false); |
| | | } |
| | | setInputText(txLastSyncTime, DateUtils.formatDate(new Date())); |
| | | setInputText(txSyncCount, String.valueOf(mSyncCount)); |
| | | |
| | | btnStartSync.setDisable(true); |
| | | btnStopSync.setDisable(false); |
| | | |
| | | setInputText(txSyncStatus, "同步中"); |
| | | setInputText(txStartSyncTime, DateUtils.formatDate(new Date())); |
| | | updateResult("启动成功!"); |
| | | } |
| | | }) |
| | | .map(new Function<String, Integer>() { |
| | |
| | | @Override |
| | | public void accept(Object o) throws Exception { |
| | | //同步成功! |
| | | setSyncing(true); |
| | | mSyncCount++; |
| | | if (mSyncCount >= Integer.MAX_VALUE) { |
| | | mSyncCount = 0; |
| | | if (isSyncing()) { |
| | | mSyncCount++; |
| | | if (mSyncCount >= Integer.MAX_VALUE) { |
| | | mSyncCount = 0; |
| | | } |
| | | setInputText(txLastSyncTime, DateUtils.formatDate(new Date())); |
| | | setInputText(txSyncCount, String.valueOf(mSyncCount)); |
| | | updateResult("同步成功!"); |
| | | btnStartSync.setDisable(true); |
| | | btnStopSync.setDisable(false); |
| | | } |
| | | setInputText(txLastSyncTime, DateUtils.formatDate(new Date())); |
| | | setInputText(txSyncCount, String.valueOf(mSyncCount)); |
| | | updateResult("同步成功!"); |
| | | } |
| | | }, new Consumer<Throwable>() { |
| | | @Override |
| | |
| | | |
| | | //停止同步 |
| | | private void stopSync() { |
| | | setSyncing(false); |
| | | mSyncCount = 0; |
| | | if (mCompositeDisposable != null) { |
| | | mCompositeDisposable.clear(); |
| | | mCompositeDisposable.clear();//注意:会把所有操作都取消 |
| | | } |
| | | setInputText(txSyncStatus, "已停止"); |
| | | updateResult("同步停止!"); |
| | | |
| | | btnStartSync.setDisable(false); |
| | | btnStopSync.setDisable(true); |
| | | |
| | | setSyncing(false); |
| | | setInputText(txSyncStatus, "已停止"); |
| | | updateResult("停止成功!"); |
| | | } |
| | | |
| | | //保存 在启动时”启动“同步功能 |
| | |
| | | .doOnNext(new Consumer<Object>() { |
| | | @Override |
| | | public void accept(Object o) throws Exception { |
| | | String targetUrl = "jdbc:sqlserver://" + targetHost + ";databaseName=" + targetDbName + ";integratedSecurity=false;"; |
| | | String urlString = "jdbc:sqlserver://%s;databaseName=%s;integratedSecurity=false;"; |
| | | String targetUrl = String.format(urlString, targetHost, targetDbName); |
| | | List<String> keyList = new ArrayList<>(); |
| | | |
| | | //1.从目标库查出来有多少传感器需要查找 |
| | |
| | | Statement stmt = con.createStatement() |
| | | ) { |
| | | |
| | | ResultSet rs = stmt.executeQuery(SQL1_QUERY); |
| | | ResultSet rs = stmt.executeQuery(SQL_QUERY_TARGET); |
| | | while (rs.next()) { |
| | | keyList.add(rs.getString("IE_Param")); |
| | | } |
| | |
| | | for (String key : keyList) { |
| | | StringUtil.append(sb, key, "','"); |
| | | } |
| | | String sql = String.format(SQL2_QUERY, sb); |
| | | String sql = String.format(SQL_QUERY_SRC, sb); |
| | | |
| | | String srcUrl = "jdbc:sqlserver://" + srcHost + ";databaseName=" + srcDbName + ";integratedSecurity=false;"; |
| | | String srcUrl = String.format(urlString, srcHost, srcDbName); |
| | | //System.out.println("sql2:" + sql); |
| | | |
| | | Map<String, List<TargetBean>> map = new HashMap<>(); |
| | |
| | | for (TargetBean tb : list) { |
| | | if (tb == null) continue; |
| | | |
| | | PreparedStatement ps = con3.prepareStatement(SQL3_UPDATE); |
| | | PreparedStatement ps = con3.prepareStatement(SQL_UPDATE_TARGET); |
| | | ps.setString(1, tb.getData()); |
| | | ps.setTimestamp(2, tb.getTime()); |
| | | ps.setString(3, version); |