001package com.astrolabsoftware.FinkBrowser.Avro;
002
003import com.Lomikel.Utils.Init;
004
005// Lomikel
006import com.Lomikel.Januser.JanusClient;
007import com.Lomikel.Januser.GremlinRecipies;
008import com.Lomikel.Utils.LomikelException;
009
010// Avro
011import org.apache.avro.Schema.Field;
012import org.apache.avro.Schema.Type;
013import org.apache.avro.io.DatumReader;
014import org.apache.avro.file.DataFileReader;
015import org.apache.avro.generic.GenericRecord;
016import org.apache.avro.generic.GenericData.Array;
017import org.apache.avro.generic.GenericDatumReader;
018
019// Tinker Pop
020import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;
021import org.apache.tinkerpop.gremlin.structure.Vertex;
022
023// Janus Graph
024import org.janusgraph.core.JanusGraph;
025import org.janusgraph.core.attribute.Geoshape;
026
027// Java
028import java.io.File;
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Arrays;
034import java.util.Date;
035import java.text.SimpleDateFormat;
036import java.util.ArrayList;
037import java.util.Map;
038import java.util.TreeMap;
039import java.util.Base64;
040import java.nio.ByteBuffer;
041import java.nio.file.Files;
042import java.nio.file.FileSystems;
043
044// Log4J
045import org.apache.log4j.Logger;
046
047/** <code>AvroImporter</code> imports <em>Avro</em> files into <em>JanusGraph</em>.
048  * @opt attributes
049  * @opt operations
050  * @opt types
051  * @opt visibility
052  * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */
053public class AvroImporter extends JanusClient {
054        
055  /** Import Avro files or directory. 
056    * @param args[0] The Janusgraph properties file. 
057    * @param args[1] The Avro file or directory with Avro files.
058    * @param args[2] The directory for FITS files. If <tt>null</tt> or empty, FITS are included in the Graph. Ignored if HBase url set.
059    * @param args[3] The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty.
060    * @param args[4] The number of events to use for progress report (-1 means no report untill the end).
061    * @param args[5] The number of events to commit in one step (-1 means commit only at the end).
062    * @param args[6] The creation strategy. <tt>create,drop,replace,skip</tt>.
063    * @param args[7] The data type, <tt>alert|pca</tt>. If <tt>null<tt>, then considering as <tt>alert</tt>.
064    * @throws LomikelException If anything goes wrong. */
065   public static void main(String[] args) throws IOException {
066    Init.init("AvroImporter");
067    if (args.length != 8) {
068      log.error("AvroImporter.exe.jar <JanusGraph properties> [<file>|<directory>] <hbaseUrl> <report limit> <commit limit> [create|reuse|drop] [alert|pca]");
069      System.exit(-1);
070      }
071    try {
072      AvroImporter importer = new AvroImporter(                args[0],
073                                               Integer.valueOf(args[4]),
074                                               Integer.valueOf(args[5]),
075                                                               args[6],
076                                                               args[2],
077                                                               args[3],
078                                                               args[7]);
079      importer.timerStart();                    
080      importer.process(args[1]);
081      if (!importer.skip()) {
082        importer.commit();
083        }
084      importer.close();
085      }
086    catch (LomikelException e) {
087      log.fatal("Cannot import " + args[1] + " into " + args[0], e);
088      System.exit(-1);
089      }
090    }
091  
092  /** Create with JanusGraph properties file.
093    * @param properties  The file with the complete Janusgraph properties.
094    * @param reportLimit The number of events to use for progress report (-1 means no report untill the end).
095    * @param commitLimit The number of events to commit in one step (-1 means commit only at the end).
096    * @param strategy    The creation strategy. <tt>drop,replace,getOrCreate</tt>.
097    * @param fitsDir     The directory for FITS files. If <tt>null</tt> or empty, FITS are included in the Graph. Ignored if HBase url set.
098    * @param hbaseUrl    The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty.
099    * @param dataType    The data type, <tt>alert|pca</tt>.  If <tt>null<tt>, then considering as <tt>alert</tt>.*/  
100  public AvroImporter(String properties,
101                      int    reportLimit,
102                      int    commitLimit,
103                      String strategy,
104                      String fitsDir,
105                      String hbaseUrl,
106                      String dataType) {
107    super(properties);
108    if (fitsDir != null && fitsDir.trim().equals("")) {
109      fitsDir = null;
110      }
111    if (hbaseUrl != null && hbaseUrl.trim().equals("")) {
112      hbaseUrl = null;
113      }
114    log.info("Reporting after each " + reportLimit + " alerts");
115    log.info("Committing after each " + commitLimit + " alerts");
116    log.info("Using strategy: " + strategy);
117    log.info("Importing " + dataType + "s");
118    if (fitsDir == null) {
119      log.info("Writing FITS into Graph");
120      }
121    else {
122      log.info("Writing FITS into: " + fitsDir);
123      }
124    if (hbaseUrl != null) {
125      log.info("Connecting to data from: " + hbaseUrl);
126      }
127    log.info("Importing at " + _date);
128    _reportLimit = reportLimit;
129    _commitLimit = commitLimit;
130    _fitsDir     = fitsDir;
131    _hbaseUrl    = hbaseUrl;
132    _dataType    = dataType;
133    _create      = false;
134    _reuse       = false;
135    _replace     = false;
136    _drop        = false;
137    if (strategy.contains("create")) {
138      _create = true;
139      }
140    if (strategy.contains("reuse")) {
141      _reuse = true;
142      }
143    if (strategy.contains("replace")) {
144      _replace = true;
145      }
146    if (strategy.contains("drop")) {
147      _drop = true;
148      }
149    if (strategy.contains("skip")) {
150      _skip = true;
151      }
152    _gr = new GremlinRecipies(this);
153    }
154        
155  /** Process directory with <em>Avro</em> alert files (recursive).
156    * @param dirFN The dirname of directiory with data file.
157    * @param fileExt The file extention.
158    * @throws IOException      If problem with file reading. */
159  public void processDir(String dirFN,
160                         String fileExt) throws IOException {  
161    log.info("Loading directory " + dirFN);
162    File dir = new File(dirFN);
163    int i = 0;
164    for (String dataFN : dir.list()) {
165      if (new File(dirFN + "/" + dataFN).isDirectory()) {
166        processDir(dirFN + "/" + dataFN, "avro");
167        }
168      else if (dataFN.endsWith("." + fileExt)) {
169        try {
170          process(dirFN + "/" + dataFN);
171          i++;
172          }
173        catch (IOException | LomikelException e) {
174          log.error("Failed to process " + dirFN + "/" + dataFN, e);
175          }
176        }
177      else {
178        log.warn("Not " + fileExt + " file: " + dataFN);
179        }
180      }
181    timer("alerts/pcas created", _n, -1, -1);      
182    log.info("" + i + " files loaded");
183    }
184     
185  /** Process <em>Avro</em> alert file or directory with files (recursive).
186     * @param fn The filename of the data file
187     *           or directory with files.
188     * @throws IOException      If problem with file reading.
189     * @throws LomikelException If anything wrong. */
190  public void process(String fn) throws IOException, LomikelException {
191    log.info("Loading " + fn);
192    register(fn);
193    File file = new File(fn);
194    if (file.isDirectory()) {
195      processDir(fn, "avro");
196      return;
197      }
198    else if (!file.isFile()) {
199      log.error("Not a file/directory: " + fn);
200      return;
201      }
202    processFile(file);
203    }
204    
205  /** Register <em>Import</em> {@link Vertex}.
206     * @param fn The filename of the data file
207     *           or directory with files. */
208  public void register(String fn) {
209    if (_top) {
210      _topFn = fn;
211      now(); 
212      Vertex import1 = g().addV("Import").property("lbl", "Import").property("importSource", fn).property("importDate", _date).next();
213      Vertex imports = g().V().has("lbl", "site").has("title", "IJCLab").out().has("lbl", "Imports").next();
214      _gr.addEdge(imports, import1, "has"); 
215      commit();
216      }
217    _top = false;
218    }
219    
220  /** Process <em>Avro</em> alert file .
221     * @param file The data file.
222     * @throws IOException If problem with file reading. */
223  public void processFile(File file) throws IOException {
224    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
225    DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
226    GenericRecord record = null;
227    while (dataFileReader.hasNext()) {
228      record = dataFileReader.next(record);
229      processRecord(record);
230      }
231    dataFileReader.close();
232    } 
233    
234  /** Process {@link GenericRecord} of the requested type.
235    * @param record The {@link GenericRecord} to be rocessed. */
236  public void processRecord(GenericRecord record) {
237    if (_dataType.equals("alert")) {
238      processAlert(record);
239      }
240    else if (_dataType.equals("pca")) {
241      processPCA(record);
242      }
243    else {
244      log.error("Unknown data type: " + _dataType);
245      }
246    }
247   
248  /** Process <em>Avro</em> alert.
249    * @param record The full alert {@link GenericRecord}.
250    * @return       The created {@link Vertex}. */
251  public Vertex processAlert(GenericRecord record) {
252    _n++;
253    Map<String, String> values = getSimpleValues(record, getSimpleFields(record,
254                                                                         COLUMNS_ALERT,
255                                                                         new String[]{"objectId",
256                                                                                      "candidate",
257                                                                                      "prv_candidates",
258                                                                                      "cutoutScience",
259                                                                                      "cutoutTemplate",
260                                                                                      "cutoutDifference"}));
261    Vertex v = vertex(record, "alert", null, "create");
262    if (v != null) {
263      String objectId = record.get("objectId").toString();
264      for (Map.Entry<String, String> entry : values.entrySet()) {
265        try {
266          v.property(entry.getKey(), entry.getValue());
267          }
268        catch (IllegalArgumentException e) {
269          log.error("Cannot add property: " + entry.getKey() + " = " + entry.getValue(), e);
270          }
271        }
272      v.property("objectId",     objectId);
273      v.property("alertVersion", VERSION);
274      v.property("importDate",   _date);
275      String ss;
276      processGenericRecord((GenericRecord)(record.get("candidate")),
277                           "candidate",
278                           "candid",
279                           true,
280                           v,
281                           "has",
282                           COLUMNS_CANDIDATE,
283                           objectId);
284      Vertex vv = vertex(record, "prv_candidates", null, "create");
285      _gr.addEdge(v, vv, "has");    
286      Array a = (Array)record.get("prv_candidates");
287      if (a != null) {
288        for (Object o : a) {
289          _nPrvCandidates++;
290          processGenericRecord((GenericRecord)o,
291                               "prv_candidate",
292                               "candid",
293                               true,
294                               vv,
295                               "holds",
296                               COLUMNS_CANDIDATE,
297                               objectId);
298          } 
299        }
300      processCutout(record, v, objectId, _jd); // _jd taken from candidate
301      }
302    else {
303      log.error("Failed to create alert from " + record);
304      }
305    timer("alerts processed", _n, _reportLimit, _commitLimit); 
306    return v;
307    }
308   
309  /** Process <em>Avro</em> PCA.
310    * Only create PCAs for already existing sources.
311    * Do not verify if the PCA already exists (so multiple
312    * PCAs fvor one source may be created.
313    * @param record The full PCA {@link GenericRecord}.
314    * @return       The created {@link Vertex}. */
315  public Vertex processPCA(GenericRecord record) {
316    _n++;
317    String objectId = record.get("objectId").toString();
318    Vertex v = g().addV("PCA").property("lbl", "PCA").property("objectId", objectId).next();
319    Array<Double> array = (Array<Double>)(record.get("pca"));
320    Iterator<Double> it = array.iterator();
321    short pcai = 0;
322    while (it.hasNext()) {
323      v.property(PCAS[pcai++], it.next());
324      }
325    v.property("importDate", _date);      
326    timer("pcas processed", _n, _reportLimit, _commitLimit); 
327    return v;
328    }
329   
330  /** Process <em>Avro</em> {@link GenericRecord}.
331    * @param record       The {@link GenericRecord} to process.
332    * @param name         The name of new {@link Vertex}.
333    * @param idName       The name of the unique identifying field.
334    * @param tryDirection Whether try created <em>Direction</em> property from <em>ra,dec</em> fields.
335    * @param mother       The mother {@link Vertex}.
336    * @param edgerName    The name of the edge to the mother {@link Vertex}.
337    * @param fields       The list of fields to fill. All fields are filled if <code>null</code>
338    * @param objectId     The <em>objectId</em> of the containing source.
339    * @return             The created {@link Vertex}. */
340  private Vertex processGenericRecord(GenericRecord record,
341                                      String        name,
342                                      String        idName,
343                                      boolean       tryDirection,
344                                      Vertex        mother,
345                                      String        edgeName,
346                                      List<String>  fields,
347                                      String        objectId) {
348    String[] idNameA;
349    if (idName == null) {
350      idNameA= new String[]{};
351      }
352    else {
353      idNameA = new String[]{idName};
354      }
355    Map<String, String> values = getSimpleValues(record, getSimpleFields(record, fields, idNameA));
356    Vertex v = vertex(record, name, idName, "create");
357    if (v == null) {
358      return v;
359      }
360    for (Map.Entry<String, String> entry : values.entrySet()) {
361      //log.debug("\t" + entry.getKey() + " = " + entry.getValue());
362      v.property(entry.getKey(), entry.getValue());
363      }
364    if (record.get("dec") != null && record.get("ra") != null) {
365      v.property("direction", Geoshape.point(Double.valueOf(record.get("dec").toString()), Double.valueOf(record.get("ra").toString()) - 180));
366      }
367    _gr.addEdge(mother, v, edgeName);
368    if (hbaseUrl() != null) {
369      _jd = record.get("jd").toString();
370      _gr.attachDataLink(v,
371                        "Candidate data",
372                        "HBase",
373                        _hbaseUrl,
374                        "return client.scan('" + objectId + "_" + _jd + "', null, '*', 0, true, true)");
375      }
376    return v;
377    }
378    
379  /** Process <em>Avro</em> cutout.
380    * @param record       The {@link GenericRecord} to process.
381    * @param mother       The {@link Vertex} to attach to. 
382    * @param jd           The <em>jd</em> of the corresponding candidate.
383    */
384  private void processCutout(GenericRecord record,
385                             Vertex        mother,
386                             String        objectId,
387                             String        jd) {
388    Vertex v = vertex(record, "cutout", null, "create");
389    GenericRecord r;
390    String fn;
391    String key = objectId + "_" + jd;
392    byte[] data;
393    for (String s : new String[]{"Science", "Template", "Difference"}) { 
394      r = (GenericRecord)(record.get("cutout" + s));
395      fn = r.get("fileName").toString();
396      data = ((ByteBuffer)(r.get("stampData"))).array();
397      if (hbaseUrl() != null) {
398        _gr.attachDataLink(v,
399                       s + " fits",
400                       "HBase",
401                       _hbaseUrl,
402                       "x=client.scan('" + key + "', null, 'b:cutout" + s + "_stampData', 0, false, false).get('" + key + "').get('b:cutout" + s + "_stampData');y=client.repository().get(x);java.util.Base64.getEncoder().encodeToString(y)");
403        }
404      else if (fitsDir() == null) {
405        v.property("cutout" + s + "Fn", fn);
406        v.property("cutout" + s,        Base64.getEncoder().encodeToString(data));
407        }
408      else {
409        v.property("cutout" + s + "Fn", "file:" + fitsDir() + "/" + fn);
410        writeFits(fn, data);
411        }
412      }
413    _gr.addEdge(mother, v, "has");
414    }
415
416  /** Register part of {@link GenericRecord} in <em>HBase</em>.
417    * @param record  The {@link GenericRecord} to be registered in <em>HBase</em>.
418    * @param fields  The fields to be mapped. */
419  private Map<String, String> getSimpleValues(GenericRecord record,
420                                              List<String>  fields) {
421    Map<String, String> content = new TreeMap<>();
422    for (String s : fields) {
423      Object o = record.get(s);
424      if (o instanceof ByteBuffer) {
425        content.put(s, new String(((ByteBuffer)o).array()));
426        }
427      else if (o != null) {
428        content.put(s, o.toString());
429        }
430      }
431    return content;
432    }
433    
434  /** Get {@link Field}s corresponding to simple types
435    * and having non-<code>null</code> values.
436    * @param record The {@link GenericRecord} to use.
437    * @param keeps  The {@link GenericRecord} to report.
438    *               Report all if <tt>null</tt>.
439    * @param avoids The array of fields names not to report.
440    *               Cannot cancel <em>keeps</em> argument.
441    * @return       The list of coressponding fields. */
442  private List<String> getSimpleFields(GenericRecord record,
443                                       List<String>  keeps,
444                                       String[]      avoids) {
445    List<String> fields = new ArrayList<>();
446    Type type;
447    String name;
448    boolean veto;
449    for (Field field : record.getSchema().getFields()) {
450      type = field.schema().getType();
451      name = field.name();
452      veto = false;
453      if (keeps == null) {
454        }
455      else if (!keeps.contains(name)) {
456        veto = true;
457        }
458      else {
459        for (String avoid : avoids) {
460          if (name.equals(avoid) || record.get(name) == null) {
461            veto = true;
462            }
463          }
464        }
465      if (!veto) {
466        if (type == Type.BOOLEAN ||
467            type == Type.DOUBLE  ||
468            type == Type.FLOAT   ||
469            type == Type.LONG    ||
470            type == Type.INT     ||
471            type == Type.UNION   ||
472            type == Type.STRING  ||
473            type == Type.BYTES   ||
474            type == Type.ARRAY) {
475            fields.add(name);
476          }
477        else {
478          log.warn("Skipped: " + name + " of " + type);
479          }
480        }
481      }
482    return fields;
483    }
484    
485  /** Create or drop a {@link Vertex} according to chosen strategy.
486    * @param record    The full {@link GenericRecord}.
487    * @param label     The {@link Vertex} label.
488    * @param property  The name of {@link Vertex} property.
489    *                  If <tt>null</tt> strategy is ignored and {@link Vertex} is created.
490    * @param strategy  The creation strategy: <tt>drop, replace, reuse, skip, create</tt>.
491    *                  If anything else, the global strategy b  is used.
492    * @return          The created {@link Vertex} or <tt>null</tt>. */
493  private Vertex vertex(GenericRecord record,
494                        String        label,
495                        String        property,
496                        String        strategy) {
497    if (strategy == null) {
498      strategy = "";
499      }
500    strategy        = strategy.trim();
501    boolean drop    = strategy.equals("drop");
502    boolean skip    = strategy.equals("skip");   
503    boolean create  = strategy.equals("create"); 
504    boolean reuse   = strategy.equals("reuse");  
505    boolean replace = strategy.equals("replace");
506    Vertex v = null;
507    // Do nothing
508    if (skip) {
509      return v;
510      }
511    // Not unique Vertex
512    if (property == null) {
513      if (drop) {
514        return v;
515        }
516      else {
517        //log.debug("Creating: " + label);
518        return g().addV(label).property("lbl", label).next();
519        }
520      }
521    // Unique Vertex
522    if (drop || replace) {
523      //log.debug("Dropping " + label + ": " + property + " = " + record.get(property));
524      _gr.drop(label, property, record.get(property), true);
525      }
526    if (reuse) {
527      //log.info("Getting " + label + ": " + property + " = " + record.get(property));
528      v = _gr.getOrCreate(label, property, record.get(property)).next(); // TBD: check uniqueness
529      }
530    if (create || replace) {
531      //log.debug("Creating " + label + ": " + property + " = " + record.get(property));
532      v = g().addV(label).property("lbl", label).property(property, record.get(property)).next();
533      }
534    return v;
535    }
536        
537  /** Write FITS file.
538    * @param fn   The FITS file name.
539    * @param data The FITS file content. */
540  protected void writeFits(String fn,
541                           byte[] data) {
542    try {
543      Files.write(FileSystems.getDefault().getPath(fitsDir() + "/" + fn), data);
544      }
545    catch (IOException e) {
546      log.error("Cannot write " + fn, e);
547      }
548    }
549    
550  /** The directory for FITS files.
551    * @return The FITS file directory. */
552  protected String fitsDir() {
553    return _fitsDir;
554    } 
555    
556  /** The data HBase table url.
557    * @return The data HBase table url. */
558  protected String hbaseUrl() {
559    return _hbaseUrl;
560    } 
561    
562  /** Give number of created alerts/pcas.
563    * @return The number of created alerts/pcas. */
564  protected int n() {
565    return _n;
566    }
567    
568  /** Tell, whether import shpuld be skipped.
569    * @return Whether import shpuld be skipped. */
570  protected boolean skip() {
571    return _skip;
572    }
573    
574  @Override
575  public void close() { 
576    g().V().has("lbl", "Import").has("importSource", _topFn).has("importDate", _date).property("complete", true).property("n", _n).next();
577    commit();
578    log.info("Import statistics:");
579    log.info("\talerts/pcas: " + _n);
580    log.info("\tprv_candidates: " + _nPrvCandidates);
581    log.info("Imported at " + _date);
582    super.close();
583    }
584    
585  /** Set new {@link Date}. */
586  protected void now() {
587    _date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString();
588    }
589   
590  private static List<String> COLUMNS_ALERT = Arrays.asList(new String[] {"cdsxmatch",
591                                                                          "mulens",
592                                                                          "objectId",
593                                                                          "rf_kn_vs_nonkn",
594                                                                          "rf_snia_vs_nonia",
595                                                                          "roid",
596                                                                          "snn_snia_vs_nonia",
597                                                                          "snn_sn_vs_all",
598                                                                          "tracklet"
599                                                                          });
600   private static List<String> COLUMNS_CANDIDATE = Arrays.asList(new String[] {"candid",
601                                                                               "classtar",
602                                                                               "diffmaglim",
603                                                                               "distnr",
604                                                                               "distpsnr1",
605                                                                               "DR3Name",
606                                                                               "drb",
607                                                                               "e_Plx",
608                                                                               "fid",
609                                                                               "field",
610                                                                               "gcvs",
611                                                                               "isdiffpos",
612                                                                               "jd",
613                                                                               "jdendhist",
614                                                                               "jdstarthist",
615                                                                               "maggaia",
616                                                                               "magnr",
617                                                                               "magpsf",
618                                                                               "magzpsci",
619                                                                               "ndethist",
620                                                                               "neargaia",
621                                                                               "nid",
622                                                                               "Plx",
623                                                                               "rb",
624                                                                               "rcid",
625                                                                               "sgscore1",
626                                                                               "sigmagnr",
627                                                                               "sigmapsf",
628                                                                               "ssdistnr",
629                                                                               "ssmagnr",
630                                                                               "ssnamenr",
631                                                                               "vsx"});
632   
633  private GremlinRecipies _gr;
634  
635  private String _jd;
636  
637  private int _reportLimit;
638  
639  private int _commitLimit;
640  
641  private String _fitsDir;
642  
643  private String _hbaseUrl;
644  
645  private String _dataType;
646  
647  private boolean _create;
648  
649  private boolean _reuse;
650  
651  private boolean _replace;
652  
653  private boolean _drop;
654  
655  private boolean _skip;
656  
657  private int _n = 0;
658  
659  private int _nPrvCandidates = 0;
660  
661  private String _date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()).toString();
662  
663  private boolean _top = true;
664  
665  private String _topFn;
666  
667  private static String[] PCAS = new String[]{"pca00",
668                                              "pca01",
669                                              "pca02",
670                                              "pca03",
671                                              "pca04",
672                                              "pca05",
673                                              "pca06",
674                                              "pca07",
675                                              "pca08",
676                                              "pca09",
677                                              "pca10",
678                                              "pca11",
679                                              "pca12",
680                                              "pca13",
681                                              "pca14",
682                                              "pca15",
683                                              "pca16",
684                                              "pca17",
685                                              "pca18",
686                                              "pca19",
687                                              "pca20",
688                                              "pca21",
689                                              "pca22",
690                                              "pca23",
691                                              "pca24"};
692     
693  private static String VERSION = "ztf-3.2";
694    
695  /** Logging . */
696  private static Logger log = Logger.getLogger(AvroImporter.class);
697                                                
698  }
699