001package com.astrolabsoftware.FinkBrowser.Avro; 002 003import com.Lomikel.Utils.Init; 004 005// Lomikel 006import com.Lomikel.Januser.JanusClient; 007import com.Lomikel.Januser.GremlinRecipies; 008import com.Lomikel.Utils.LomikelException; 009 010// Avro 011import org.apache.avro.Schema.Field; 012import org.apache.avro.Schema.Type; 013import org.apache.avro.io.DatumReader; 014import org.apache.avro.file.DataFileReader; 015import org.apache.avro.generic.GenericRecord; 016import org.apache.avro.generic.GenericData.Array; 017import org.apache.avro.generic.GenericDatumReader; 018 019// Tinker Pop 020import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; 021import org.apache.tinkerpop.gremlin.structure.Vertex; 022 023// Janus Graph 024import org.janusgraph.core.JanusGraph; 025import org.janusgraph.core.attribute.Geoshape; 026 027// Java 028import java.io.File; 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Arrays; 034import java.util.Date; 035import java.text.SimpleDateFormat; 036import java.util.ArrayList; 037import java.util.Map; 038import java.util.TreeMap; 039import java.util.Base64; 040import java.nio.ByteBuffer; 041import java.nio.file.Files; 042import java.nio.file.FileSystems; 043 044// Log4J 045import org.apache.log4j.Logger; 046 047/** <code>AvroImporter</code> imports <em>Avro</em> files into <em>JanusGraph</em>. 048 * @opt attributes 049 * @opt operations 050 * @opt types 051 * @opt visibility 052 * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */ 053public class AvroImporter extends JanusClient { 054 055 /** Import Avro files or directory. 056 * @param args[0] The Janusgraph properties file. 057 * @param args[1] The Avro file or directory with Avro files. 058 * @param args[2] The directory for FITS files. If <tt>null</tt> or empty, FITS are included in the Graph. Ignored if HBase url set. 059 * @param args[3] The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty. 060 * @param args[4] The number of events to use for progress report (-1 means no report untill the end). 061 * @param args[5] The number of events to commit in one step (-1 means commit only at the end). 062 * @param args[6] The creation strategy. <tt>create,drop,replace,skip</tt>. 063 * @param args[7] The data type, <tt>alert|pca</tt>. If <tt>null<tt>, then considering as <tt>alert</tt>. 064 * @throws LomikelException If anything goes wrong. */ 065 public static void main(String[] args) throws IOException { 066 Init.init("AvroImporter"); 067 if (args.length != 8) { 068 log.error("AvroImporter.exe.jar <JanusGraph properties> [<file>|<directory>] <hbaseUrl> <report limit> <commit limit> [create|reuse|drop] [alert|pca]"); 069 System.exit(-1); 070 } 071 try { 072 AvroImporter importer = new AvroImporter( args[0], 073 Integer.valueOf(args[4]), 074 Integer.valueOf(args[5]), 075 args[6], 076 args[2], 077 args[3], 078 args[7]); 079 importer.timerStart(); 080 importer.process(args[1]); 081 if (!importer.skip()) { 082 importer.commit(); 083 } 084 importer.close(); 085 } 086 catch (LomikelException e) { 087 log.fatal("Cannot import " + args[1] + " into " + args[0], e); 088 System.exit(-1); 089 } 090 } 091 092 /** Create with JanusGraph properties file. 093 * @param properties The file with the complete Janusgraph properties. 094 * @param reportLimit The number of events to use for progress report (-1 means no report untill the end). 095 * @param commitLimit The number of events to commit in one step (-1 means commit only at the end). 096 * @param strategy The creation strategy. <tt>drop,replace,getOrCreate</tt>. 097 * @param fitsDir The directory for FITS files. If <tt>null</tt> or empty, FITS are included in the Graph. Ignored if HBase url set. 098 * @param hbaseUrl The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty. 099 * @param dataType The data type, <tt>alert|pca</tt>. If <tt>null<tt>, then considering as <tt>alert</tt>.*/ 100 public AvroImporter(String properties, 101 int reportLimit, 102 int commitLimit, 103 String strategy, 104 String fitsDir, 105 String hbaseUrl, 106 String dataType) { 107 super(properties); 108 if (fitsDir != null && fitsDir.trim().equals("")) { 109 fitsDir = null; 110 } 111 if (hbaseUrl != null && hbaseUrl.trim().equals("")) { 112 hbaseUrl = null; 113 } 114 log.info("Reporting after each " + reportLimit + " alerts"); 115 log.info("Committing after each " + commitLimit + " alerts"); 116 log.info("Using strategy: " + strategy); 117 log.info("Importing " + dataType + "s"); 118 if (fitsDir == null) { 119 log.info("Writing FITS into Graph"); 120 } 121 else { 122 log.info("Writing FITS into: " + fitsDir); 123 } 124 if (hbaseUrl != null) { 125 log.info("Connecting to data from: " + hbaseUrl); 126 } 127 log.info("Importing at " + _date); 128 _reportLimit = reportLimit; 129 _commitLimit = commitLimit; 130 _fitsDir = fitsDir; 131 _hbaseUrl = hbaseUrl; 132 _dataType = dataType; 133 _create = false; 134 _reuse = false; 135 _replace = false; 136 _drop = false; 137 if (strategy.contains("create")) { 138 _create = true; 139 } 140 if (strategy.contains("reuse")) { 141 _reuse = true; 142 } 143 if (strategy.contains("replace")) { 144 _replace = true; 145 } 146 if (strategy.contains("drop")) { 147 _drop = true; 148 } 149 if (strategy.contains("skip")) { 150 _skip = true; 151 } 152 _gr = new GremlinRecipies(this); 153 } 154 155 /** Process directory with <em>Avro</em> alert files (recursive). 156 * @param dirFN The dirname of directiory with data file. 157 * @param fileExt The file extention. 158 * @throws IOException If problem with file reading. */ 159 public void processDir(String dirFN, 160 String fileExt) throws IOException { 161 log.info("Loading directory " + dirFN); 162 File dir = new File(dirFN); 163 int i = 0; 164 for (String dataFN : dir.list()) { 165 if (new File(dirFN + "/" + dataFN).isDirectory()) { 166 processDir(dirFN + "/" + dataFN, "avro"); 167 } 168 else if (dataFN.endsWith("." + fileExt)) { 169 try { 170 process(dirFN + "/" + dataFN); 171 i++; 172 } 173 catch (IOException | LomikelException e) { 174 log.error("Failed to process " + dirFN + "/" + dataFN, e); 175 } 176 } 177 else { 178 log.warn("Not " + fileExt + " file: " + dataFN); 179 } 180 } 181 timer("alerts/pcas created", _n, -1, -1); 182 log.info("" + i + " files loaded"); 183 } 184 185 /** Process <em>Avro</em> alert file or directory with files (recursive). 186 * @param fn The filename of the data file 187 * or directory with files. 188 * @throws IOException If problem with file reading. 189 * @throws LomikelException If anything wrong. */ 190 public void process(String fn) throws IOException, LomikelException { 191 log.info("Loading " + fn); 192 register(fn); 193 File file = new File(fn); 194 if (file.isDirectory()) { 195 processDir(fn, "avro"); 196 return; 197 } 198 else if (!file.isFile()) { 199 log.error("Not a file/directory: " + fn); 200 return; 201 } 202 processFile(file); 203 } 204 205 /** Register <em>Import</em> {@link Vertex}. 206 * @param fn The filename of the data file 207 * or directory with files. */ 208 public void register(String fn) { 209 if (_top) { 210 _topFn = fn; 211 now(); 212 Vertex import1 = g().addV("Import").property("lbl", "Import").property("importSource", fn).property("importDate", _date).next(); 213 Vertex imports = g().V().has("lbl", "site").has("title", "IJCLab").out().has("lbl", "Imports").next(); 214 _gr.addEdge(imports, import1, "has"); 215 commit(); 216 } 217 _top = false; 218 } 219 220 /** Process <em>Avro</em> alert file . 221 * @param file The data file. 222 * @throws IOException If problem with file reading. */ 223 public void processFile(File file) throws IOException { 224 DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(); 225 DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader); 226 GenericRecord record = null; 227 while (dataFileReader.hasNext()) { 228 record = dataFileReader.next(record); 229 processRecord(record); 230 } 231 dataFileReader.close(); 232 } 233 234 /** Process {@link GenericRecord} of the requested type. 235 * @param record The {@link GenericRecord} to be rocessed. */ 236 public void processRecord(GenericRecord record) { 237 if (_dataType.equals("alert")) { 238 processAlert(record); 239 } 240 else if (_dataType.equals("pca")) { 241 processPCA(record); 242 } 243 else { 244 log.error("Unknown data type: " + _dataType); 245 } 246 } 247 248 /** Process <em>Avro</em> alert. 249 * @param record The full alert {@link GenericRecord}. 250 * @return The created {@link Vertex}. */ 251 public Vertex processAlert(GenericRecord record) { 252 _n++; 253 Map<String, String> values = getSimpleValues(record, getSimpleFields(record, 254 COLUMNS_ALERT, 255 new String[]{"objectId", 256 "candidate", 257 "prv_candidates", 258 "cutoutScience", 259 "cutoutTemplate", 260 "cutoutDifference"})); 261 Vertex v = vertex(record, "alert", null, "create"); 262 if (v != null) { 263 String objectId = record.get("objectId").toString(); 264 for (Map.Entry<String, String> entry : values.entrySet()) { 265 try { 266 v.property(entry.getKey(), entry.getValue()); 267 } 268 catch (IllegalArgumentException e) { 269 log.error("Cannot add property: " + entry.getKey() + " = " + entry.getValue(), e); 270 } 271 } 272 v.property("objectId", objectId); 273 v.property("alertVersion", VERSION); 274 v.property("importDate", _date); 275 String ss; 276 processGenericRecord((GenericRecord)(record.get("candidate")), 277 "candidate", 278 "candid", 279 true, 280 v, 281 "has", 282 COLUMNS_CANDIDATE, 283 objectId); 284 Vertex vv = vertex(record, "prv_candidates", null, "create"); 285 _gr.addEdge(v, vv, "has"); 286 Array a = (Array)record.get("prv_candidates"); 287 if (a != null) { 288 for (Object o : a) { 289 _nPrvCandidates++; 290 processGenericRecord((GenericRecord)o, 291 "prv_candidate", 292 "candid", 293 true, 294 vv, 295 "holds", 296 COLUMNS_CANDIDATE, 297 objectId); 298 } 299 } 300 processCutout(record, v, objectId, _jd); // _jd taken from candidate 301 } 302 else { 303 log.error("Failed to create alert from " + record); 304 } 305 timer("alerts processed", _n, _reportLimit, _commitLimit); 306 return v; 307 } 308 309 /** Process <em>Avro</em> PCA. 310 * Only create PCAs for already existing sources. 311 * Do not verify if the PCA already exists (so multiple 312 * PCAs fvor one source may be created. 313 * @param record The full PCA {@link GenericRecord}. 314 * @return The created {@link Vertex}. */ 315 public Vertex processPCA(GenericRecord record) { 316 _n++; 317 String objectId = record.get("objectId").toString(); 318 Vertex v = g().addV("PCA").property("lbl", "PCA").property("objectId", objectId).next(); 319 Array<Double> array = (Array<Double>)(record.get("pca")); 320 Iterator<Double> it = array.iterator(); 321 short pcai = 0; 322 while (it.hasNext()) { 323 v.property(PCAS[pcai++], it.next()); 324 } 325 v.property("importDate", _date); 326 timer("pcas processed", _n, _reportLimit, _commitLimit); 327 return v; 328 } 329 330 /** Process <em>Avro</em> {@link GenericRecord}. 331 * @param record The {@link GenericRecord} to process. 332 * @param name The name of new {@link Vertex}. 333 * @param idName The name of the unique identifying field. 334 * @param tryDirection Whether try created <em>Direction</em> property from <em>ra,dec</em> fields. 335 * @param mother The mother {@link Vertex}. 336 * @param edgerName The name of the edge to the mother {@link Vertex}. 337 * @param fields The list of fields to fill. All fields are filled if <code>null</code> 338 * @param objectId The <em>objectId</em> of the containing source. 339 * @return The created {@link Vertex}. */ 340 private Vertex processGenericRecord(GenericRecord record, 341 String name, 342 String idName, 343 boolean tryDirection, 344 Vertex mother, 345 String edgeName, 346 List<String> fields, 347 String objectId) { 348 String[] idNameA; 349 if (idName == null) { 350 idNameA= new String[]{}; 351 } 352 else { 353 idNameA = new String[]{idName}; 354 } 355 Map<String, String> values = getSimpleValues(record, getSimpleFields(record, fields, idNameA)); 356 Vertex v = vertex(record, name, idName, "create"); 357 if (v == null) { 358 return v; 359 } 360 for (Map.Entry<String, String> entry : values.entrySet()) { 361 //log.debug("\t" + entry.getKey() + " = " + entry.getValue()); 362 v.property(entry.getKey(), entry.getValue()); 363 } 364 if (record.get("dec") != null && record.get("ra") != null) { 365 v.property("direction", Geoshape.point(Double.valueOf(record.get("dec").toString()), Double.valueOf(record.get("ra").toString()) - 180)); 366 } 367 _gr.addEdge(mother, v, edgeName); 368 if (hbaseUrl() != null) { 369 _jd = record.get("jd").toString(); 370 _gr.attachDataLink(v, 371 "Candidate data", 372 "HBase", 373 _hbaseUrl, 374 "return client.scan('" + objectId + "_" + _jd + "', null, '*', 0, true, true)"); 375 } 376 return v; 377 } 378 379 /** Process <em>Avro</em> cutout. 380 * @param record The {@link GenericRecord} to process. 381 * @param mother The {@link Vertex} to attach to. 382 * @param jd The <em>jd</em> of the corresponding candidate. 383 */ 384 private void processCutout(GenericRecord record, 385 Vertex mother, 386 String objectId, 387 String jd) { 388 Vertex v = vertex(record, "cutout", null, "create"); 389 GenericRecord r; 390 String fn; 391 String key = objectId + "_" + jd; 392 byte[] data; 393 for (String s : new String[]{"Science", "Template", "Difference"}) { 394 r = (GenericRecord)(record.get("cutout" + s)); 395 fn = r.get("fileName").toString(); 396 data = ((ByteBuffer)(r.get("stampData"))).array(); 397 if (hbaseUrl() != null) { 398 _gr.attachDataLink(v, 399 s + " fits", 400 "HBase", 401 _hbaseUrl, 402 "x=client.scan('" + key + "', null, 'b:cutout" + s + "_stampData', 0, false, false).get('" + key + "').get('b:cutout" + s + "_stampData');y=client.repository().get(x);java.util.Base64.getEncoder().encodeToString(y)"); 403 } 404 else if (fitsDir() == null) { 405 v.property("cutout" + s + "Fn", fn); 406 v.property("cutout" + s, Base64.getEncoder().encodeToString(data)); 407 } 408 else { 409 v.property("cutout" + s + "Fn", "file:" + fitsDir() + "/" + fn); 410 writeFits(fn, data); 411 } 412 } 413 _gr.addEdge(mother, v, "has"); 414 } 415 416 /** Register part of {@link GenericRecord} in <em>HBase</em>. 417 * @param record The {@link GenericRecord} to be registered in <em>HBase</em>. 418 * @param fields The fields to be mapped. */ 419 private Map<String, String> getSimpleValues(GenericRecord record, 420 List<String> fields) { 421 Map<String, String> content = new TreeMap<>(); 422 for (String s : fields) { 423 Object o = record.get(s); 424 if (o instanceof ByteBuffer) { 425 content.put(s, new String(((ByteBuffer)o).array())); 426 } 427 else if (o != null) { 428 content.put(s, o.toString()); 429 } 430 } 431 return content; 432 } 433 434 /** Get {@link Field}s corresponding to simple types 435 * and having non-<code>null</code> values. 436 * @param record The {@link GenericRecord} to use. 437 * @param keeps The {@link GenericRecord} to report. 438 * Report all if <tt>null</tt>. 439 * @param avoids The array of fields names not to report. 440 * Cannot cancel <em>keeps</em> argument. 441 * @return The list of coressponding fields. */ 442 private List<String> getSimpleFields(GenericRecord record, 443 List<String> keeps, 444 String[] avoids) { 445 List<String> fields = new ArrayList<>(); 446 Type type; 447 String name; 448 boolean veto; 449 for (Field field : record.getSchema().getFields()) { 450 type = field.schema().getType(); 451 name = field.name(); 452 veto = false; 453 if (keeps == null) { 454 } 455 else if (!keeps.contains(name)) { 456 veto = true; 457 } 458 else { 459 for (String avoid : avoids) { 460 if (name.equals(avoid) || record.get(name) == null) { 461 veto = true; 462 } 463 } 464 } 465 if (!veto) { 466 if (type == Type.BOOLEAN || 467 type == Type.DOUBLE || 468 type == Type.FLOAT || 469 type == Type.LONG || 470 type == Type.INT || 471 type == Type.UNION || 472 type == Type.STRING || 473 type == Type.BYTES || 474 type == Type.ARRAY) { 475 fields.add(name); 476 } 477 else { 478 log.warn("Skipped: " + name + " of " + type); 479 } 480 } 481 } 482 return fields; 483 } 484 485 /** Create or drop a {@link Vertex} according to chosen strategy. 486 * @param record The full {@link GenericRecord}. 487 * @param label The {@link Vertex} label. 488 * @param property The name of {@link Vertex} property. 489 * If <tt>null</tt> strategy is ignored and {@link Vertex} is created. 490 * @param strategy The creation strategy: <tt>drop, replace, reuse, skip, create</tt>. 491 * If anything else, the global strategy b is used. 492 * @return The created {@link Vertex} or <tt>null</tt>. */ 493 private Vertex vertex(GenericRecord record, 494 String label, 495 String property, 496 String strategy) { 497 if (strategy == null) { 498 strategy = ""; 499 } 500 strategy = strategy.trim(); 501 boolean drop = strategy.equals("drop"); 502 boolean skip = strategy.equals("skip"); 503 boolean create = strategy.equals("create"); 504 boolean reuse = strategy.equals("reuse"); 505 boolean replace = strategy.equals("replace"); 506 Vertex v = null; 507 // Do nothing 508 if (skip) { 509 return v; 510 } 511 // Not unique Vertex 512 if (property == null) { 513 if (drop) { 514 return v; 515 } 516 else { 517 //log.debug("Creating: " + label); 518 return g().addV(label).property("lbl", label).next(); 519 } 520 } 521 // Unique Vertex 522 if (drop || replace) { 523 //log.debug("Dropping " + label + ": " + property + " = " + record.get(property)); 524 _gr.drop(label, property, record.get(property), true); 525 } 526 if (reuse) { 527 //log.info("Getting " + label + ": " + property + " = " + record.get(property)); 528 v = _gr.getOrCreate(label, property, record.get(property)).next(); // TBD: check uniqueness 529 } 530 if (create || replace) { 531 //log.debug("Creating " + label + ": " + property + " = " + record.get(property)); 532 v = g().addV(label).property("lbl", label).property(property, record.get(property)).next(); 533 } 534 return v; 535 } 536 537 /** Write FITS file. 538 * @param fn The FITS file name. 539 * @param data The FITS file content. */ 540 protected void writeFits(String fn, 541 byte[] data) { 542 try { 543 Files.write(FileSystems.getDefault().getPath(fitsDir() + "/" + fn), data); 544 } 545 catch (IOException e) { 546 log.error("Cannot write " + fn, e); 547 } 548 } 549 550 /** The directory for FITS files. 551 * @return The FITS file directory. */ 552 protected String fitsDir() { 553 return _fitsDir; 554 } 555 556 /** The data HBase table url. 557 * @return The data HBase table url. */ 558 protected String hbaseUrl() { 559 return _hbaseUrl; 560 } 561 562 /** Give number of created alerts/pcas. 563 * @return The number of created alerts/pcas. */ 564 protected int n() { 565 return _n; 566 } 567 568 /** Tell, whether import shpuld be skipped. 569 * @return Whether import shpuld be skipped. */ 570 protected boolean skip() { 571 return _skip; 572 } 573 574 @Override 575 public void close() { 576 g().V().has("lbl", "Import").has("importSource", _topFn).has("importDate", _date).property("complete", true).property("n", _n).next(); 577 commit(); 578 log.info("Import statistics:"); 579 log.info("\talerts/pcas: " + _n); 580 log.info("\tprv_candidates: " + _nPrvCandidates); 581 log.info("Imported at " + _date); 582 super.close(); 583 } 584 585 /** Set new {@link Date}. */ 586 protected void now() { 587 _date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString(); 588 } 589 590 private static List<String> COLUMNS_ALERT = Arrays.asList(new String[] {"cdsxmatch", 591 "mulens", 592 "objectId", 593 "rf_kn_vs_nonkn", 594 "rf_snia_vs_nonia", 595 "roid", 596 "snn_snia_vs_nonia", 597 "snn_sn_vs_all", 598 "tracklet" 599 }); 600 private static List<String> COLUMNS_CANDIDATE = Arrays.asList(new String[] {"candid", 601 "classtar", 602 "diffmaglim", 603 "distnr", 604 "distpsnr1", 605 "DR3Name", 606 "drb", 607 "e_Plx", 608 "fid", 609 "field", 610 "gcvs", 611 "isdiffpos", 612 "jd", 613 "jdendhist", 614 "jdstarthist", 615 "maggaia", 616 "magnr", 617 "magpsf", 618 "magzpsci", 619 "ndethist", 620 "neargaia", 621 "nid", 622 "Plx", 623 "rb", 624 "rcid", 625 "sgscore1", 626 "sigmagnr", 627 "sigmapsf", 628 "ssdistnr", 629 "ssmagnr", 630 "ssnamenr", 631 "vsx"}); 632 633 private GremlinRecipies _gr; 634 635 private String _jd; 636 637 private int _reportLimit; 638 639 private int _commitLimit; 640 641 private String _fitsDir; 642 643 private String _hbaseUrl; 644 645 private String _dataType; 646 647 private boolean _create; 648 649 private boolean _reuse; 650 651 private boolean _replace; 652 653 private boolean _drop; 654 655 private boolean _skip; 656 657 private int _n = 0; 658 659 private int _nPrvCandidates = 0; 660 661 private String _date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString(); 662 663 private boolean _top = true; 664 665 private String _topFn; 666 667 private static String[] PCAS = new String[]{"pca00", 668 "pca01", 669 "pca02", 670 "pca03", 671 "pca04", 672 "pca05", 673 "pca06", 674 "pca07", 675 "pca08", 676 "pca09", 677 "pca10", 678 "pca11", 679 "pca12", 680 "pca13", 681 "pca14", 682 "pca15", 683 "pca16", 684 "pca17", 685 "pca18", 686 "pca19", 687 "pca20", 688 "pca21", 689 "pca22", 690 "pca23", 691 "pca24"}; 692 693 private static String VERSION = "ztf-3.2"; 694 695 /** Logging . */ 696 private static Logger log = Logger.getLogger(AvroImporter.class); 697 698 } 699