001package com.astrolabsoftware.FinkBrowser.Avro; 002 003import com.Lomikel.Utils.Init; 004 005// Lomikel 006import com.Lomikel.Januser.JanusClient; 007import com.Lomikel.Januser.GremlinRecipies; 008import com.Lomikel.Utils.LomikelException; 009 010// Avro 011import org.apache.avro.Schema.Field; 012import org.apache.avro.Schema.Type; 013import org.apache.avro.io.DatumReader; 014import org.apache.avro.file.DataFileReader; 015import org.apache.avro.generic.GenericRecord; 016import org.apache.avro.generic.GenericData.Array; 017import org.apache.avro.generic.GenericDatumReader; 018 019// Tinker Pop 020import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; 021import org.apache.tinkerpop.gremlin.structure.Vertex; 022import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; 023 024// Janus Graph 025import org.janusgraph.core.JanusGraph; 026import org.janusgraph.core.attribute.Geoshape; 027 028// Java 029import java.io.File; 030import java.io.IOException; 031import java.nio.ByteBuffer; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Arrays; 035import java.util.Date; 036import java.text.SimpleDateFormat; 037import java.util.ArrayList; 038import java.util.Map; 039import java.util.TreeMap; 040import java.util.Base64; 041import java.nio.ByteBuffer; 042import java.nio.file.Files; 043import java.nio.file.FileSystems; 044 045// Log4J 046import org.apache.log4j.Logger; 047 048/** <code>AvroImporter</code> imports <em>Avro</em> files into <em>JanusGraph</em>. 049 * @opt attributes 050 * @opt operations 051 * @opt types 052 * @opt visibility 053 * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */ 054public class AvroImporter extends JanusClient { 055 056 /** Import Avro files or directory. 057 * @param args[0] The Janusgraph properties file. 058 * @param args[1] The Avro file or directory with Avro files. 059 * @param args[2] The directory for FITS files. If <tt>null</tt> or empty, FITS are included in the Graph. Ignored if HBase url set. 060 * @param args[3] The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty. 061 * @param args[4] The number of events to use for progress report (-1 means no report untill the end). 062 * @param args[5] The number of events to commit in one step (-1 means commit only at the end). 063 * @param args[6] The creation strategy. <tt>create,drop,replace,skip</tt>. 064 * @param args[7] The data type, <tt>alert|pca</tt>. If <tt>null<tt>, then considering as <tt>alert</tt>. 065 * @throws LomikelException If anything goes wrong. */ 066 public static void main(String[] args) throws IOException { 067 Init.init("AvroImporter"); 068 if (args.length != 8) { 069 log.error("AvroImporter.exe.jar <JanusGraph properties> [<file>|<directory>] <hbaseUrl> <report limit> <commit limit> [create|reuse|drop] [alert|pca]"); 070 System.exit(-1); 071 } 072 try { 073 AvroImporter importer = new AvroImporter( args[0], 074 Integer.valueOf(args[4]), 075 Integer.valueOf(args[5]), 076 args[6], 077 args[2], 078 args[3], 079 args[7]); 080 importer.timerStart(); 081 importer.process(args[1]); 082 if (!importer.skip()) { 083 importer.commit(); 084 } 085 importer.close(); 086 } 087 catch (LomikelException e) { 088 log.fatal("Cannot import " + args[1] + " into " + args[0], e); 089 System.exit(-1); 090 } 091 } 092 093 /** Create with JanusGraph properties file. 094 * @param properties The file with the complete Janusgraph properties. 095 * @param reportLimit The number of events to use for progress report (-1 means no report untill the end). 096 * @param commitLimit The number of events to commit in one step (-1 means commit only at the end). 097 * @param strategy The creation strategy. <tt>drop,replace,getOrCreate</tt>. 098 * @param fitsDir The directory for FITS files. If <tt>null</tt> or empty, FITS are included in the Graph. Ignored if HBase url set. 099 * @param hbaseUrl The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty. 100 * @param dataType The data type, <tt>alert|pca</tt>. If <tt>null<tt>, then considering as <tt>alert</tt>.*/ 101 public AvroImporter(String properties, 102 int reportLimit, 103 int commitLimit, 104 String strategy, 105 String fitsDir, 106 String hbaseUrl, 107 String dataType) { 108 super(properties); 109 if (fitsDir != null && fitsDir.trim().equals("")) { 110 fitsDir = null; 111 } 112 if (hbaseUrl != null && hbaseUrl.trim().equals("")) { 113 hbaseUrl = null; 114 } 115 log.info("Reporting after each " + reportLimit + " alerts"); 116 log.info("Committing after each " + commitLimit + " alerts"); 117 log.info("Using strategy: " + strategy); 118 log.info("Importing " + dataType + "s"); 119 if (fitsDir == null) { 120 log.info("Writing FITS into Graph"); 121 } 122 else { 123 log.info("Writing FITS into: " + fitsDir); 124 } 125 if (hbaseUrl != null) { 126 log.info("Connecting to data from: " + hbaseUrl); 127 } 128 log.info("Importing at " + _date); 129 _reportLimit = reportLimit; 130 _commitLimit = commitLimit; 131 _fitsDir = fitsDir; 132 _hbaseUrl = hbaseUrl; 133 _dataType = dataType; 134 _create = false; 135 _reuse = false; 136 _replace = false; 137 _drop = false; 138 if (strategy.contains("create")) { 139 _create = true; 140 } 141 if (strategy.contains("reuse")) { 142 _reuse = true; 143 } 144 if (strategy.contains("replace")) { 145 _replace = true; 146 } 147 if (strategy.contains("drop")) { 148 _drop = true; 149 } 150 if (strategy.contains("skip")) { 151 _skip = true; 152 } 153 _gr = new GremlinRecipies(this); 154 } 155 156 /** Process directory with <em>Avro</em> alert files (recursive). 157 * @param dirFN The dirname of directiory with data file. 158 * @param fileExt The file extention. 159 * @throws IOException If problem with file reading. */ 160 public void processDir(String dirFN, 161 String fileExt) throws IOException { 162 log.info("Loading directory " + dirFN); 163 File dir = new File(dirFN); 164 int i = 0; 165 for (String dataFN : dir.list()) { 166 if (new File(dirFN + "/" + dataFN).isDirectory()) { 167 processDir(dirFN + "/" + dataFN, "avro"); 168 } 169 else if (dataFN.endsWith("." + fileExt)) { 170 try { 171 process(dirFN + "/" + dataFN); 172 i++; 173 } 174 catch (IOException | LomikelException e) { 175 log.error("Failed to process " + dirFN + "/" + dataFN, e); 176 } 177 } 178 else { 179 log.warn("Not " + fileExt + " file: " + dataFN); 180 } 181 } 182 timer("alerts/pcas created", _n, -1, -1); 183 log.info("" + i + " files loaded"); 184 } 185 186 /** Process <em>Avro</em> alert file or directory with files (recursive). 187 * @param fn The filename of the data file 188 * or directory with files. 189 * @throws IOException If problem with file reading. 190 * @throws LomikelException If anything wrong. */ 191 public void process(String fn) throws IOException, LomikelException { 192 log.info("Loading " + fn); 193 register(fn); 194 File file = new File(fn); 195 if (file.isDirectory()) { 196 processDir(fn, "avro"); 197 return; 198 } 199 else if (!file.isFile()) { 200 log.error("Not a file/directory: " + fn); 201 return; 202 } 203 processFile(file); 204 } 205 206 /** Register <em>Import</em> {@link Vertex}. 207 * @param fn The filename of the data file 208 * or directory with files. */ 209 public void register(String fn) { 210 if (_top) { 211 _topFn = fn; 212 now(); 213 Vertex import1 = g().addV("Import").property("lbl", "Import").property("importSource", fn).property("importDate", _date).next(); 214 Vertex imports = g().V().has("lbl", "site").has("title", "IJCLab").out().has("lbl", "Imports").next(); 215 _gr.addEdge(imports, import1, "has"); 216 commit(); 217 } 218 _top = false; 219 } 220 221 /** Process <em>Avro</em> alert file . 222 * @param file The data file. 223 * @throws IOException If problem with file reading. */ 224 public void processFile(File file) throws IOException { 225 DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(); 226 DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader); 227 GenericRecord record = null; 228 while (dataFileReader.hasNext()) { 229 record = dataFileReader.next(record); 230 processRecord(record); 231 } 232 dataFileReader.close(); 233 } 234 235 /** Process {@link GenericRecord} of the requested type. 236 * @param record The {@link GenericRecord} to be rocessed. */ 237 public void processRecord(GenericRecord record) { 238 if (_dataType.equals("alert")) { 239 processAlert(record); 240 } 241 else if (_dataType.equals("pca")) { 242 processPCA(record); 243 } 244 else { 245 log.error("Unknown data type: " + _dataType); 246 } 247 } 248 249 /** Process <em>Avro</em> alert. 250 * @param record The full alert {@link GenericRecord}. 251 * @return The created {@link Vertex}. */ 252 public Vertex processAlert(GenericRecord record) { 253 _n++; 254 Map<String, String> values = getSimpleValues(record, getSimpleFields(record, 255 COLUMNS_ALERT, 256 new String[]{"objectId", 257 "candidate", 258 "prv_candidates", 259 "cutoutScience", 260 "cutoutTemplate", 261 "cutoutDifference"})); 262 Vertex v = vertex(record, "alert", null, "create"); 263 if (v != null) { 264 String objectId = record.get("objectId").toString(); 265 for (Map.Entry<String, String> entry : values.entrySet()) { 266 try { 267 v.property(entry.getKey(), entry.getValue()); 268 } 269 catch (IllegalArgumentException e) { 270 log.error("Cannot add property: " + entry.getKey() + " = " + entry.getValue(), e); 271 } 272 } 273 v.property("objectId", objectId); 274 v.property("alertVersion", VERSION); 275 v.property("importDate", _date); 276 String ss; 277 processGenericRecord((GenericRecord)(record.get("candidate")), 278 "candidate", 279 "candid", 280 true, 281 v, 282 "has", 283 COLUMNS_CANDIDATE, 284 objectId); 285 Vertex vv = vertex(record, "prv_candidates", null, "create"); 286 _gr.addEdge(v, vv, "has"); 287 Array a = (Array)record.get("prv_candidates"); 288 if (a != null) { 289 for (Object o : a) { 290 _nPrvCandidates++; 291 processGenericRecord((GenericRecord)o, 292 "prv_candidate", 293 "candid", 294 true, 295 vv, 296 "holds", 297 COLUMNS_CANDIDATE, 298 objectId); 299 } 300 } 301 processCutout(record, v, objectId, _jd); // _jd taken from candidate 302 } 303 else { 304 log.error("Failed to create alert from " + record); 305 } 306 timer("alerts processed", _n, _reportLimit, _commitLimit); 307 return v; 308 } 309 310 /** Process <em>Avro</em> PCA. 311 * Only create PCAs for already existing sources. 312 * Do not verify if the PCA already exists (so multiple 313 * PCAs fvor one source may be created. 314 * @param record The full PCA {@link GenericRecord}. 315 * @return The created {@link Vertex}. */ 316 public Vertex processPCA(GenericRecord record) { 317 _n++; 318 String objectId = record.get("objectId").toString(); 319 Vertex v = g().addV("PCA").property("lbl", "PCA").property("objectId", objectId).next(); 320 Array<Double> array = (Array<Double>)(record.get("pca")); 321 Iterator<Double> it = array.iterator(); 322 short pcai = 0; 323 while (it.hasNext()) { 324 v.property(PCAS[pcai++], it.next()); 325 } 326 v.property("importDate", _date); 327 timer("pcas processed", _n, _reportLimit, _commitLimit); 328 return v; 329 } 330 331 /** Process <em>Avro</em> {@link GenericRecord}. 332 * @param record The {@link GenericRecord} to process. 333 * @param name The name of new {@link Vertex}. 334 * @param idName The name of the unique identifying field. 335 * @param tryDirection Whether try created <em>Direction</em> property from <em>ra,dec</em> fields. 336 * @param mother The mother {@link Vertex}. 337 * @param edgerName The name of the edge to the mother {@link Vertex}. 338 * @param fields The list of fields to fill. All fields are filled if <code>null</code> 339 * @param objectId The <em>objectId</em> of the containing source. 340 * @return The created {@link Vertex}. */ 341 private Vertex processGenericRecord(GenericRecord record, 342 String name, 343 String idName, 344 boolean tryDirection, 345 Vertex mother, 346 String edgeName, 347 List<String> fields, 348 String objectId) { 349 String[] idNameA; 350 if (idName == null) { 351 idNameA= new String[]{}; 352 } 353 else { 354 idNameA = new String[]{idName}; 355 } 356 Map<String, String> values = getSimpleValues(record, getSimpleFields(record, fields, idNameA)); 357 Vertex v = vertex(record, name, idName, "create"); 358 if (v == null) { 359 return v; 360 } 361 for (Map.Entry<String, String> entry : values.entrySet()) { 362 //log.debug("\t" + entry.getKey() + " = " + entry.getValue()); 363 v.property(entry.getKey(), entry.getValue()); 364 } 365 if (record.get("dec") != null && record.get("ra") != null) { 366 v.property("direction", Geoshape.point(Double.valueOf(record.get("dec").toString()), Double.valueOf(record.get("ra").toString()) - 180)); 367 } 368 _gr.addEdge(mother, v, edgeName); 369 if (hbaseUrl() != null) { 370 _jd = record.get("jd").toString(); 371 _gr.attachDataLink(v, 372 "Candidate data", 373 "HBase", 374 _hbaseUrl, 375 "return client.scan('" + objectId + "_" + _jd + "', null, '*', 0, true, true)"); 376 } 377 return v; 378 } 379 380 /** Process <em>Avro</em> cutout. 381 * @param record The {@link GenericRecord} to process. 382 * @param mother The {@link Vertex} to attach to. 383 * @param jd The <em>jd</em> of the corresponding candidate. 384 */ 385 private void processCutout(GenericRecord record, 386 Vertex mother, 387 String objectId, 388 String jd) { 389 Vertex v = vertex(record, "cutout", null, "create"); 390 GenericRecord r; 391 String fn; 392 String key = objectId + "_" + jd; 393 byte[] data; 394 for (String s : new String[]{"Science", "Template", "Difference"}) { 395 r = (GenericRecord)(record.get("cutout" + s)); 396 fn = r.get("fileName").toString(); 397 data = ((ByteBuffer)(r.get("stampData"))).array(); 398 if (hbaseUrl() != null) { 399 _gr.attachDataLink(v, 400 s + " fits", 401 "HBase", 402 _hbaseUrl, 403 "x=client.scan('" + key + "', null, 'b:cutout" + s + "_stampData', 0, false, false).get('" + key + "').get('b:cutout" + s + "_stampData');y=client.repository().get(x);java.util.Base64.getEncoder().encodeToString(y)"); 404 } 405 else if (fitsDir() == null) { 406 v.property("cutout" + s + "Fn", fn); 407 v.property("cutout" + s, Base64.getEncoder().encodeToString(data)); 408 } 409 else { 410 v.property("cutout" + s + "Fn", "file:" + fitsDir() + "/" + fn); 411 writeFits(fn, data); 412 } 413 } 414 _gr.addEdge(mother, v, "has"); 415 } 416 417 /** Register part of {@link GenericRecord} in <em>HBase</em>. 418 * @param record The {@link GenericRecord} to be registered in <em>HBase</em>. 419 * @param fields The fields to be mapped. */ 420 private Map<String, String> getSimpleValues(GenericRecord record, 421 List<String> fields) { 422 Map<String, String> content = new TreeMap<>(); 423 for (String s : fields) { 424 Object o = record.get(s); 425 if (o instanceof ByteBuffer) { 426 content.put(s, new String(((ByteBuffer)o).array())); 427 } 428 else if (o != null) { 429 content.put(s, o.toString()); 430 } 431 } 432 return content; 433 } 434 435 /** Get {@link Field}s corresponding to simple types 436 * and having non-<code>null</code> values. 437 * @param record The {@link GenericRecord} to use. 438 * @param keeps The {@link GenericRecord} to report. 439 * Report all if <tt>null</tt>. 440 * @param avoids The array of fields names not to report. 441 * Cannot cancel <em>keeps</em> argument. 442 * @return The list of coressponding fields. */ 443 private List<String> getSimpleFields(GenericRecord record, 444 List<String> keeps, 445 String[] avoids) { 446 List<String> fields = new ArrayList<>(); 447 Type type; 448 String name; 449 boolean veto; 450 for (Field field : record.getSchema().getFields()) { 451 type = field.schema().getType(); 452 name = field.name(); 453 veto = false; 454 if (keeps == null) { 455 } 456 else if (!keeps.contains(name)) { 457 veto = true; 458 } 459 else { 460 for (String avoid : avoids) { 461 if (name.equals(avoid) || record.get(name) == null) { 462 veto = true; 463 } 464 } 465 } 466 if (!veto) { 467 if (type == Type.BOOLEAN || 468 type == Type.DOUBLE || 469 type == Type.FLOAT || 470 type == Type.LONG || 471 type == Type.INT || 472 type == Type.UNION || 473 type == Type.STRING || 474 type == Type.BYTES || 475 type == Type.ARRAY) { 476 fields.add(name); 477 } 478 else { 479 log.warn("Skipped: " + name + " of " + type); 480 } 481 } 482 } 483 return fields; 484 } 485 486 /** Create or drop a {@link Vertex} according to chosen strategy. 487 * @param record The full {@link GenericRecord}. 488 * @param label The {@link Vertex} label. 489 * @param property The name of {@link Vertex} property. 490 * If <tt>null</tt> strategy is ignored and {@link Vertex} is created. 491 * @param strategy The creation strategy: <tt>drop, replace, reuse, skip, create</tt>. 492 * If anything else, the global strategy b is used. 493 * @return The created {@link Vertex} or <tt>null</tt>. */ 494 private Vertex vertex(GenericRecord record, 495 String label, 496 String property, 497 String strategy) { 498 if (strategy == null) { 499 strategy = ""; 500 } 501 strategy = strategy.trim(); 502 boolean drop = strategy.equals("drop"); 503 boolean skip = strategy.equals("skip"); 504 boolean create = strategy.equals("create"); 505 boolean reuse = strategy.equals("reuse"); 506 boolean replace = strategy.equals("replace"); 507 Vertex v = null; 508 // Do nothing 509 if (skip) { 510 return v; 511 } 512 // Not unique Vertex 513 if (property == null) { 514 if (drop) { 515 return v; 516 } 517 else { 518 //log.debug("Creating: " + label); 519 return g().addV(label).property("lbl", label).next(); 520 } 521 } 522 // Unique Vertex 523 if (drop || replace) { 524 //log.debug("Dropping " + label + ": " + property + " = " + record.get(property)); 525 _gr.drop(label, property, record.get(property), true); 526 } 527 if (reuse) { 528 //log.info("Getting " + label + ": " + property + " = " + record.get(property)); 529 v = _gr.getOrCreate(label, property, record.get(property)).next(); // TBD: check uniqueness 530 } 531 if (create || replace) { 532 //log.debug("Creating " + label + ": " + property + " = " + record.get(property)); 533 v = g().addV(label).property("lbl", label).property(property, record.get(property)).next(); 534 } 535 return v; 536 } 537 538 /** Write FITS file. 539 * @param fn The FITS file name. 540 * @param data The FITS file content. */ 541 protected void writeFits(String fn, 542 byte[] data) { 543 try { 544 Files.write(FileSystems.getDefault().getPath(fitsDir() + "/" + fn), data); 545 } 546 catch (IOException e) { 547 log.error("Cannot write " + fn, e); 548 } 549 } 550 551 /** The directory for FITS files. 552 * @return The FITS file directory. */ 553 protected String fitsDir() { 554 return _fitsDir; 555 } 556 557 /** The data HBase table url. 558 * @return The data HBase table url. */ 559 protected String hbaseUrl() { 560 return _hbaseUrl; 561 } 562 563 /** Give number of created alerts/pcas. 564 * @return The number of created alerts/pcas. */ 565 protected int n() { 566 return _n; 567 } 568 569 /** Tell, whether import shpuld be skipped. 570 * @return Whether import shpuld be skipped. */ 571 protected boolean skip() { 572 return _skip; 573 } 574 575 @Override 576 public void close() { 577 g().V().has("lbl", "Import").has("importSource", _topFn).has("importDate", _date).property("complete", true).property("n", _n).next(); 578 commit(); 579 log.info("Import statistics:"); 580 log.info("\talerts/pcas: " + _n); 581 log.info("\tprv_candidates: " + _nPrvCandidates); 582 log.info("Imported at " + _date); 583 super.close(); 584 } 585 586 /** Set new {@link Date}. */ 587 protected void now() { 588 _date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString(); 589 } 590 591 private static List<String> COLUMNS_ALERT = Arrays.asList(new String[] {"cdsxmatch", 592 "mulens", 593 "objectId", 594 "rf_kn_vs_nonkn", 595 "rf_snia_vs_nonia", 596 "roid", 597 "snn_snia_vs_nonia", 598 "snn_sn_vs_all", 599 "tracklet" 600 }); 601 private static List<String> COLUMNS_CANDIDATE = Arrays.asList(new String[] {"candid", 602 "classtar", 603 "diffmaglim", 604 "distnr", 605 "distpsnr1", 606 "DR3Name", 607 "drb", 608 "e_Plx", 609 "fid", 610 "field", 611 "gcvs", 612 "isdiffpos", 613 "jd", 614 "jdendhist", 615 "jdstarthist", 616 "maggaia", 617 "magnr", 618 "magpsf", 619 "magzpsci", 620 "ndethist", 621 "neargaia", 622 "nid", 623 "Plx", 624 "rb", 625 "rcid", 626 "sgscore1", 627 "sigmagnr", 628 "sigmapsf", 629 "ssdistnr", 630 "ssmagnr", 631 "ssnamenr", 632 "vsx"}); 633 634 private GremlinRecipies _gr; 635 636 private String _jd; 637 638 private int _reportLimit; 639 640 private int _commitLimit; 641 642 private String _fitsDir; 643 644 private String _hbaseUrl; 645 646 private String _dataType; 647 648 private boolean _create; 649 650 private boolean _reuse; 651 652 private boolean _replace; 653 654 private boolean _drop; 655 656 private boolean _skip; 657 658 private int _n = 0; 659 660 private int _nPrvCandidates = 0; 661 662 private String _date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString(); 663 664 private boolean _top = true; 665 666 private String _topFn; 667 668 private static String[] PCAS = new String[]{"pca00", 669 "pca01", 670 "pca02", 671 "pca03", 672 "pca04", 673 "pca05", 674 "pca06", 675 "pca07", 676 "pca08", 677 "pca09", 678 "pca10", 679 "pca11", 680 "pca12", 681 "pca13", 682 "pca14", 683 "pca15", 684 "pca16", 685 "pca17", 686 "pca18", 687 "pca19", 688 "pca20", 689 "pca21", 690 "pca22", 691 "pca23", 692 "pca24"}; 693 694 private static String VERSION = "ztf-3.2"; 695 696 /** Logging . */ 697 private static Logger log = Logger.getLogger(AvroImporter.class); 698 699 } 700