001package com.astrolabsoftware.FinkBrowser.HBaser; 002 003import com.Lomikel.HBaser.HBaseClient; 004import com.Lomikel.HBaser.HBaseSQLClient; 005import com.Lomikel.Utils.DateTimeManagement; 006import com.Lomikel.Utils.Pair; 007import com.Lomikel.Utils.LomikelException; 008 009// HealPix 010import cds.healpix.Healpix; 011import cds.healpix.HealpixNested; 012import cds.healpix.HealpixNestedFixedRadiusConeComputer; 013import cds.healpix.HealpixNestedBMOC; 014import cds.healpix.FlatHashIterator; 015import static cds.healpix.VerticesAndPathComputer.LON_INDEX; 016import static cds.healpix.VerticesAndPathComputer.LAT_INDEX; 017 018// HBase 019import org.apache.hadoop.hbase.TableExistsException; 020 021// Java 022import java.lang.Math; 023import java.util.Map; 024import java.util.TreeMap; 025import java.util.Set; 026import java.util.TreeSet; 027import java.util.List; 028import java.util.ArrayList; 029import java.util.LinkedHashSet; 030import java.util.stream.Collectors; 031import java.io.IOException; 032 033// Log4J 034import org.apache.logging.log4j.Logger; 035import org.apache.logging.log4j.LogManager; 036 037/** <code>FinkHBaseClient</code> handles connectionto HBase table 038 * with specific Fink functionality. 039 * It expects the main table with schema and two schemaless aux tables: 040 * <ul> 041 * <li><b>*.jd</b> table with <code>key = jd.alert</code> and one 042 * column <code>i:objectId</code>.</li> 043 * <li><b>*.pixel</b> table with <code>key = pixel_jd</code> and 044 * columns <code>i:objectId,i:dec,i:ra</code><li> 045 * </ul> 046 * @opt attributes 047 * @opt operations 048 * @opt types 049 * @opt visibility 050 * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */ 051public class FinkHBaseClient extends HBaseSQLClient { 052 053 /** Create. 054 * @param zookeepers The comma-separated list of zookeper ids. 055 * @param clientPort The client port. 056 * @throws LomikelException If anything goes wrong. */ 057 public FinkHBaseClient(String zookeepers, 058 String clientPort) throws LomikelException { 059 super(zookeepers, clientPort); 060 //setFinkEvaluatorFunctions(); 061 } 062 063 /** Create. 064 * @param zookeepers The comma-separated list of zookeper ids. 065 * @param clientPort The client port. 066 * @throws LomikelException If anything goes wrong. */ 067 public FinkHBaseClient(String zookeepers, 068 int clientPort) throws LomikelException { 069 super(zookeepers, clientPort); 070 //setFinkEvaluatorFunctions(); 071 } 072 073 /** Create. 074 * @param url The HBase url. 075 * @throws LomikelException If anything goes wrong. */ 076 public FinkHBaseClient(String url) throws LomikelException { 077 super(url); 078 //setFinkEvaluatorFunctions(); 079 } 080 081 /** Create on <em>localhost</em>. 082 * @throws LomikelException If anything goes wrong. */ 083 // TBD: is it needed, does it work ok ? 084 public FinkHBaseClient() throws LomikelException { 085 super(null, null); 086 setFinkEvaluatorFunctions(); 087 } 088 089 /** Setup the default sets of evaluation functions. */ 090 private void setFinkEvaluatorFunctions() { 091 try { 092 evaluator().setEvaluatorFunctions("com.astrolabsoftware.FinkBrowser.HBaser.FinkEvaluatorFunctions", "com/astrolabsoftware/FinkBrowser/HBaser/FinkEvaluatorFunctions.bsh"); 093 evaluator().setEvaluatorFunctions(null, "com/astrolabsoftware/FinkBrowser/WebService/FinkHBaseColumnsProcessor.bsh"); 094 } 095 catch (LomikelException e) { 096 log.error("Cannot set EvaluatorFunctions", e); 097 } 098 } 099 100 /** Get alerts between two Julian dates (inclusive). 101 * @param jdStart The starting Julian date (including day franction). 102 * @param jdStop The stopping Julian date (including day franction). 103 * @param reversed Wheter results should be reversly ordered. 104 * <tt>true</tt> implies that results limits will be counted backwards. 105 * @param filter The names of required values as <tt>family:column,...</tt>. 106 * It can be <tt>null</tt>. 107 * @param ifkey Whether give also entries keys. 108 * @param iftime Whether give also entries timestamps. 109 * @return The {@link Map} of {@link Map}s of results as <tt>key-&t;{family:column->value}</tt>. */ 110 public Map<String, Map<String, String>> search(String jdStart, 111 String jdStop, 112 boolean reversed, 113 String filter, 114 boolean ifkey, 115 boolean iftime) { 116 log.debug("Searching for alerts in jd interval: " + jdStart + " - " + jdStop); 117 Map<String, String> searchMap = jd2keys(jdStart, jdStop, reversed); 118 if (searchMap.isEmpty()) { 119 return new TreeMap<String, Map<String, String>>(); 120 } 121 // searching each entry separately to profit from HBase start/stop row optimisation 122 Map<String, Map<String, String>> allResults = new TreeMap<>(); 123 Map<String, Map<String, String>> aResult; 124 Map<String, String> sMap; 125 for (String key : searchMap.get("key:key:exact").split(",")) { 126 aResult = scan(null, 127 "key:key:" + key + ":exact", 128 filter, 129 0, 130 0, 131 ifkey, 132 iftime); 133 allResults.putAll(aResult); 134 } 135 return allResults; 136 } 137 138 /** Get alerts within a spacial cone (inclusive). 139 * @param ra The central value of ra (in deg). 140 * @param dec The central value of dec (in deg). 141 * @param delta The maximal angular distance from the central direction (in deg). 142 * @param filter The names of required values as <tt>family:column,...</tt>. 143 * It can be <tt>null</tt>. 144 * @param ifkey Whether give also entries keys. 145 * @param iftime Whether give also entries timestamps. 146 * @return The {@link Map} of {@link Map}s of results as <tt>key-&t;{family:column->value}</tt>. */ 147 public Map<String, Map<String, String>> search(double ra, 148 double dec, 149 double delta, 150 String filter, 151 boolean ifkey, 152 boolean iftime) { 153 log.debug("Searching for alerts within " + delta + " deg of (ra, dec) = (" + ra + ", " + dec + ")"); 154 Map<String, String> searchMap = radec2keys(ra, dec, delta); 155 if (searchMap.isEmpty()) { 156 return new TreeMap<String, Map<String, String>>(); 157 } 158 // searching each entry separately to profit from HBase start/stop row optimisation 159 Map<String, Map<String, String>> allResults = new TreeMap<>(); 160 Map<String, Map<String, String>> aResult; 161 Map<String, String> sMap; 162 for (String key : searchMap.get("key:key:exact").split(",")) { 163 aResult = scan(null, 164 "key:key:" + key + ":exact", 165 filter, 166 0, 167 0, 168 ifkey, 169 iftime); 170 allResults.putAll(aResult); 171 } 172 return allResults; 173 } 174 175 /** Give all objectIds corresponding to specified Julian Date. 176 * It uses *.jd table. 177 * @param jd The Julian Data (with day fraction). 178 * @param reversed Wheter results should be reversly ordered. 179 * <tt>true</tt> implies that results limits will be counted backwards. 180 * @return The {@link Map} of corresponding keys of the main table, 181 * in the format expected for the scan methods. */ 182 public Map<String, String> jd2keys(String jd, 183 boolean reversed) { 184 Map<String, String> searchMap = new TreeMap<>(); 185 try { 186 HBaseClient client = new HBaseClient(zookeepers(), clientPort()); 187 client.connect(tableName() + ".jd", schema().name()); 188 client.setReversed(reversed); 189 client.setLimit(limit()); 190 client.setSearchLimit(searchLimit()); 191 Map<String, Map<String, String>> results = client.scan(null, 192 "key:key:" + jd, 193 null, 194 0, 195 0, 196 false, 197 false); 198 String keys = results.keySet().stream().map(m -> {String[] key = m.split("_"); return key[1] + "_" + key[0];}).collect(Collectors.joining(",")); 199 if (keys != null && !keys.trim().equals("")) { 200 searchMap.put("key:key:exact", keys); 201 } 202 client.close(); 203 } 204 catch (LomikelException e) { 205 log.error("Cannot search", e); 206 } 207 return searchMap; 208 } 209 210 /** Give all objectIds between two specified Julian Dates (inclusive). 211 * It uses *.jd table. 212 * @param jdStart The start Julian Data (with day fraction), evaluated as literal prefix scan. 213 * @param jdStart The stop Julian Data (with day fraction), evaluated as literal prefix scan. 214 * @param reversed Wheter results should be reversly ordered. 215 * <tt>true</tt> implies that results limits will be counted backwards. 216 * @return The {@link Map} of corresponding keys of the main table, 217 * in the format expected for the scan methods. */ 218 public Map<String, String> jd2keys(String jdStart, 219 String jdStop, 220 boolean reversed) { 221 Map<String, String> searchMap = new TreeMap<>(); 222 try { 223 HBaseClient client = new HBaseClient(zookeepers(), clientPort()); 224 client.connect(tableName() + ".jd", schema().name()); 225 client.setRangeScan(true); 226 client.setReversed(reversed); 227 client.setLimit(limit()); 228 client.setSearchLimit(searchLimit()); 229 Map<String, Map<String, String>> results = client.scan(null, 230 "key:key:" + jdStart + ":prefix," + "key:key:" + jdStop + ":prefix", 231 null, 232 0, 233 0, 234 false, 235 false); 236 String keys = results.keySet().stream().map(m -> {String[] key = m.split("_"); return key[1] + "_" + key[0];}).collect(Collectors.joining(",")); 237 if (keys != null && !keys.trim().equals("")) { 238 searchMap.put("key:key:exact", keys); 239 } 240 client.close(); 241 } 242 catch (LomikelException e) { 243 log.error("Cannot search", e); 244 } 245 return searchMap; 246 } 247 248 /** Give all objectIds within a spacial cone. 249 * It uses *.pixel table. 250 * @param ra The central value of ra/lon (in deg). 251 * @param dec The central value of dec/lat (in deg). 252 * @param delta The maximal angular distance from the central direction (in deg). 253 * @return The {@link Map} of corresponding keys of the main table, 254 * in the format expected for the scan methods. */ 255 public Map<String, String> radec2keys(double ra, 256 double dec, 257 double delta) { 258 double coneCenterLon = Math.toRadians(ra); 259 double coneCenterLat = Math.toRadians(dec); 260 double coneRadiusDel = Math.toRadians(delta); 261 //HealpixNestedFixedRadiusConeComputer cc = _hn.newConeComputer(coneRadiusDel); // beta code!! 262 HealpixNestedFixedRadiusConeComputer cc = _hn.newConeComputerApprox(coneRadiusDel); // robust code 263 HealpixNestedBMOC bmoc = cc.overlappingCenters(coneCenterLon, coneCenterLat); 264 String pixs = "" + _hn.toRing(_hn.hash(coneCenterLon, coneCenterLat)); 265 log.debug("Central pixel: " + pixs); 266 int n = 0; 267 FlatHashIterator hIt = bmoc.flatHashIterator(); 268 //while (hIt.hasNext()) { 269 // pixs += _hn.toRing(hIt.next()) + ","; 270 // n++; 271 // } 272 for (HealpixNestedBMOC.CurrentValueAccessor cell : bmoc) { 273 // cell.getDepth(), cell.isFull(), cell.getRawValue() 274 pixs += "," + _hn.toRing(cell.getHash()); 275 n++; 276 } 277 log.debug("" + n + " cells found (using nside = " + _NSIDE + ", depth = " + Healpix.depth(_NSIDE) + ")"); 278 Map<String, String> pixMap = new TreeMap<>(); 279 pixMap.put("key:key:prefix", pixs); 280 Map<String, String> searchMap = new TreeMap<>(); 281 try { 282 HBaseClient client = new HBaseClient(zookeepers(), clientPort()); 283 client.connect(tableName() + ".pixel", null); 284 client.setLimit(limit()); 285 client.setSearchLimit(searchLimit()); 286 Map<String, Map<String, String>> results = client.scan(null, 287 pixMap, 288 "i:objectId", 289 0, 290 0, 291 false, 292 false); 293 //log.info(results); 294 String keys = results.values().stream().map(m -> m.get("i:objectId")).collect(Collectors.joining(",")); 295 if (keys != null && !keys.trim().equals("")) { 296 searchMap.put("key:key:prefix", keys); 297 } 298 client.close(); 299 } 300 catch (LomikelException e) { 301 log.error("Cannot search", e); 302 } 303 return searchMap; 304 } 305 306 /** Give the timeline for the column. It makes use of the Julian Date alert time 307 * instead of HBase timestamp. 308 * @param columnName The name of the column. 309 * @param search The search terms as <tt>family:column:value,...</tt>. 310 * Key can be searched with <tt>family:column = key:key<tt> "pseudo-name". 311 * {@link Comparator} can be chosen as <tt>family:column:value:comparator</tt> 312 * among <tt>exact,prefix,substring,regex</tt>. 313 * The default for key is <tt>prefix</tt>, 314 * the default for columns is <tt>substring</tt>. 315 * It can be <tt>null</tt>. 316 * All searches are executed as prefix searches. 317 * @return The {@link Set} of {@link Pair}s of JulianDate-value. */ 318 @Override 319 public Set<Pair<String, String>> timeline(String columnName, 320 String search) { 321 log.debug("Getting alerts timeline of " + columnName + " with " + search); 322 Set<Pair<String, String>> tl = new TreeSet<>(); 323 Map<String, Map<String, String>> results = scan(null, search, columnName + ",i:jd", 0, false, false); 324 Pair<String, String> p; 325 for (Map.Entry<String, Map<String, String>> entry : results.entrySet()) { 326 if (!entry.getKey().startsWith("schema")) { 327 p = Pair.of(entry.getValue().get("i:jd" ), 328 entry.getValue().get(columnName)); 329 tl.add(p); 330 } 331 } 332 return tl; 333 } 334 335 /** Give all recent values of the column. It makes use of the Julian Date alert time 336 * instead of HBase timestamp. 337 * Results are ordered by the Julian Date alert time, so evetual limits on results 338 * number will be apllied backwards in Julian date time. 339 * @param columnName The name of the column. 340 * @param prefixValue The column value prefix to search for. 341 * @param minutes How far into the past it should search. 342 * @param getValues Whether to get column values or row keys. 343 * @return The {@link Set} of different values of that column. */ 344 public Set<String> latests(String columnName, 345 String prefixValue, 346 long minutes, 347 boolean getValues) { 348 log.debug("Getting " + columnName + " of alerts prefixed by " + prefixValue + " from last " + minutes + " minutes"); 349 Set<String> l = new TreeSet<>(); 350 double nowJD = DateTimeManagement.julianDate(); 351 double minJD = nowJD - minutes / 60.0 / 24.0; 352 Map<String, Map<String, String>> results = search(String.valueOf(minJD), 353 String.valueOf(nowJD), 354 true, 355 columnName, 356 false, 357 false); 358 for (Map.Entry<String, Map<String, String>> entry : results.entrySet()) { 359 l.add(getValues ? entry.getValue().get(columnName) : entry.getKey()); 360 } 361 return l; 362 } 363 364 /** Create aux pixel map hash table. 365 * @param keyPrefixSearch The prefix search of row key. 366 * @throws LomikelException If anything goes wrong. 367 * @throws LomikelException If anything goes wrong. */ 368 // BUG: should write numberts with schema 369 public void createPixelTable(String keyPrefixSearch) throws LomikelException, IOException { 370 String pixelTableName = tableName() + ".pixel"; 371 try { 372 create(pixelTableName, new String[]{"i", "b", "d", "a"}); 373 } 374 catch (TableExistsException e) { 375 log.warn("Table " + pixelTableName + " already exists, will be reused"); 376 } 377 HBaseClient pixelClient = new HBaseClient(zookeepers(), clientPort()); 378 pixelClient.connect(pixelTableName, null); 379 Map<String, Map<String, String>> results = scan(null, "key:key:" + keyPrefixSearch + ":prefix", "i:objectId,i:ra,i:dec", 0, false, false); 380 String objectId; 381 String ra; 382 String dec; 383 String key; 384 log.debug("Writing " + pixelTableName + "..."); 385 int n = 0; 386 for (Map.Entry<String, Map<String, String>> entry : results.entrySet()) { 387 objectId = entry.getValue().get("i:objectId"); 388 ra = entry.getValue().get("i:ra"); 389 dec = entry.getValue().get("i:dec"); 390 pixelClient.put(Long.toString(_hn.hash(Math.toRadians(Double.valueOf(ra)), 391 Math.toRadians(Double.valueOf(dec)))) + "_" + objectId, 392 new String[]{"i:ra:" + ra, 393 "i:dec:" + dec, 394 "i:objectId:" + objectId}); 395 System.out.print("."); 396 if (n++ % 100 == 0) { 397 System.out.print(n-1); 398 } 399 } 400 System.out.println(); 401 log.debug("" + n + " rows written"); 402 pixelClient.close(); 403 } 404 405 /** Create aux jd map hash table. 406 * @param keyPrefixSearch The prefix search of row key. 407 * @throws IOException If anything goes wrong. 408 * @throws LomikelException If anything goes wrong. */ 409 // BUG: should write numbers with schema 410 public void createJDTable(String keyPrefixSearch) throws LomikelException, IOException { 411 String jdTableName = tableName() + ".jd"; 412 try { 413 create(jdTableName, new String[]{"i", "b", "d", "a"}); 414 } 415 catch (TableExistsException e) { 416 log.warn("Table " + jdTableName + " already exists, will be reused"); 417 } 418 HBaseClient jdClient = new HBaseClient(zookeepers(), clientPort()); 419 jdClient.connect(jdTableName, null); 420 Map<String, Map<String, String>> results = scan(null, "key:key:" + keyPrefixSearch + ":prefix", "i:objectId,i:jd", 0, false, false); 421 String objectId; 422 String jd; 423 String key; 424 log.debug("Writing " + jdTableName + "..."); 425 int n = 0; 426 for (Map.Entry<String, Map<String, String>> entry : results.entrySet()) { 427 objectId = entry.getValue().get("i:objectId"); 428 jd = entry.getValue().get("i:jd"); 429 jdClient.put(jd + "_" + objectId, 430 new String[]{"i:jd:" + jd, 431 "i:objectId:" + objectId}); 432 System.out.print("."); 433 if (n++ % 100 == 0) { 434 System.out.print(n-1); 435 } 436 } 437 System.out.println(); 438 log.debug("" + n + " rows written"); 439 jdClient.close(); 440 } 441 /** Assemble curves of variable columns from another table 442 * as multi-versioned columns of the current table. 443 * All previous lightcurves for selected <em>objectId</em>s are deleted. 444 * @param sourceClient The {@link HBaseClient} of the source table. 445 * It should be already opened and connected with appropriate schema. 446 * @param objectIds The comma-separated list of <em>objectIds</em> to extract. 447 * @param columns The comma-separated list of columns (incl. families) to extract. 448 * @param schemaName The name of the schema to be created in the new table. 449 * The columns in the new table will belong to the <em>c</em> family 450 * and will have the type of <em>double</em>. */ 451 public void assembleCurves(HBaseClient sourceClient, 452 String objectIds, 453 String columns, 454 String schemaName) { 455 String[] schema = columns.split(","); 456 for (int i = 0; i < schema.length; i++) { 457 schema[i] = "c:" + schema[i].split(":")[1] + ":double"; 458 } 459 try { 460 put(schemaName, schema); 461 } 462 catch (IOException e) { 463 log.error("Cannot create schema " + schemaName + " = " + schema, e); 464 } 465 try { 466 connect(tableName(), schemaName); 467 } 468 catch (LomikelException e) { 469 log.error("Cannot reconnect to " + tableName() + " with new schema", e); 470 } 471 Map<String, Map<String, String>> results; 472 Set<String> curves = new TreeSet<>(); 473 String value; 474 for (String objectId : objectIds.split(",")) { 475 delete(objectId); 476 results = sourceClient.scan(null, "key:key:" + objectId + ":prefix", columns, 0, false, false); 477 log.debug("Adding " + objectId + "[" + results.size() + "]"); 478 for (Map.Entry<String, Map<String, String>> row : results.entrySet()) { 479 curves.clear(); 480 for (Map.Entry<String, String> e : row.getValue().entrySet()) { 481 value = e.getValue(); 482 if (!value.trim().equals("NaN") && !value.trim().equals("null")) { 483 curves.add("c:" + e.getKey().split(":")[1] + ":" + value.trim()); 484 } 485 } 486 try { 487 if (!curves.isEmpty()) { 488 put(objectId, curves.toArray(new String[0])); 489 } 490 } 491 catch (IOException e) { 492 log.error("Cannot insert " + objectId + " = " + curves, e); 493 } 494 } 495 } 496 } 497 498 /** Assemble lightcurves from another table 499 * as multi-versioned columns of the current table. 500 * All previous lightcurves for selected <em>objectId</em>s are deleted. 501 * The colums schema is embedded in this class sourcecode. 502 * @param sourceClient The {@link HBaseClient} of the source table. 503 * It should be already opened and connected with appropriate schema. 504 * @param objectIds The comma-separated list of <em>objectId</em>s to extract. */ 505 public void assembleLightCurves(HBaseClient sourceClient, 506 String objectIds) { 507 String columns = "i:jd,d:lc_features_g,d:lc_features_r"; 508 String schemaName = "schema_lc_0_0_0"; 509 int slength = LIGHTCURVE_SCHEMA.length; 510 String[] schema = new String[2 * slength]; 511 String[] subcolumns = new String[2 * slength]; 512 for (int i = 0; i < slength; i++) { 513 schema[ i ] = "c:lc_g_" + LIGHTCURVE_SCHEMA[i] + ":double"; 514 schema[ i + slength] = "c:lc_r_" + LIGHTCURVE_SCHEMA[i] + ":double"; 515 subcolumns[i ] = "c:lc_g_" + LIGHTCURVE_SCHEMA[i]; 516 subcolumns[i + slength] = "c:lc_r_" + LIGHTCURVE_SCHEMA[i]; 517 } 518 try { 519 put(schemaName, schema); 520 } 521 catch (IOException e) { 522 log.error("Cannot create schema " + schemaName + " = " + schema, e); 523 } 524 try { 525 connect(tableName(), schemaName); 526 } 527 catch (LomikelException e) { 528 log.error("Cannot reconnect to " + tableName() + " with new schema", e); 529 } 530 Map<String, Map<String, String>> results; 531 Set<String> curves = new TreeSet<>(); 532 int i; 533 for (String objectId : objectIds.split(",")) { 534 delete(objectId); 535 results = sourceClient.scan(null, "key:key:" + objectId + ":prefix", columns, 0, false, false); 536 log.debug("Adding " + objectId + "[" + results.size() + "]"); 537 for (Map.Entry<String, Map<String, String>> row : results.entrySet()) { 538 curves.clear(); 539 i = 0; 540 for (Map.Entry<String, String> e : row.getValue().entrySet()) { 541 if (e.getValue().contains("]")) { 542 for (String value : e.getValue().replaceAll("\\[", "").replaceAll("]", "").split(",")) { 543 if (!value.trim().equals("NaN") && !value.trim().equals("null")) { 544 curves.add(subcolumns[i] + ":" + value.trim()); 545 } 546 i++; 547 } 548 } 549 else { 550 curves.add("c:jd:" + e.getValue()); 551 } 552 } 553 try { 554 if (!curves.isEmpty()) { 555 put(objectId, curves.toArray(new String[0])); 556 } 557 } 558 catch (IOException e) { 559 log.error("Cannot insert " + objectId + " = " + curves, e); 560 } 561 } 562 } 563 } 564 565 private static String[] LIGHTCURVE_SCHEMA = new String[]{"lc00", 566 "lc01", 567 "lc02", 568 "lc03", 569 "lc04", 570 "lc05", 571 "lc06", 572 "lc07", 573 "lc08", 574 "lc09", 575 "lc10", 576 "lc11", 577 "lc12", 578 "lc13", 579 "lc14", 580 "lc15", 581 "lc16", 582 "lc17", 583 "lc18", 584 "lc19", 585 "lc20", 586 "lc21", 587 "lc22", 588 "lc23", 589 "lc24", 590 "lc25", 591 "lc26"}; 592 593 private static int _NSIDE = 131072; // BUG: magic number 594 595 private static HealpixNested _hn = Healpix.getNested(Healpix.depth(_NSIDE)); 596 597 /** Logging . */ 598 private static Logger log = LogManager.getLogger(FinkHBaseClient.class); 599 600 }