001package com.astrolabsoftware.FinkBrowser.Parquet;
002
003import com.Lomikel.Utils.Init;
004import com.Lomikel.Januser.JanusClient;
005import com.Lomikel.Januser.GremlinRecipies;
006import com.Lomikel.Utils.LomikelException;
007
008// Parquet
009import org.apache.parquet.hadoop.ParquetFileReader;
010import org.apache.parquet.hadoop.metadata.ParquetMetadata;
011import org.apache.parquet.format.converter.ParquetMetadataConverter;
012import org.apache.parquet.schema.MessageType;
013import org.apache.parquet.column.page.PageReadStore;
014import org.apache.parquet.io.MessageColumnIO;
015import org.apache.parquet.io.ColumnIOFactory;
016import org.apache.parquet.io.RecordReader;
017import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
018import org.apache.parquet.example.data.Group;
019import org.apache.parquet.example.data.simple.SimpleGroup;
020import org.apache.parquet.schema.GroupType;
021import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
022
023// Hadoop
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.conf.Configuration;
028
029// Tinker Pop
030import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;
031import org.apache.tinkerpop.gremlin.structure.Vertex;
032
033// Janus Graph
034import org.janusgraph.core.JanusGraph;
035import org.janusgraph.core.attribute.Geoshape;
036
037// Java
038import java.io.IOException;
039import java.io.FileNotFoundException;
040import java.util.List;
041import java.util.ArrayList;
042import java.util.Map;
043import java.util.TreeMap;
044import java.util.Base64;
045import java.time.LocalDateTime;
046import java.time.LocalDate;
047import java.time.LocalTime;
048import java.time.temporal.JulianFields;
049
050// Log4J
051import org.apache.logging.log4j.Logger;
052import org.apache.logging.log4j.LogManager;
053
054/** <code>ParquetImporter</code> imports <em>Parquet</em> files into <em>JanusGraph</em>.
055  * @opt attributes
056  * @opt operations
057  * @opt types
058  * @opt visibility
059  * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */
060// TBD: allow reset
061// TBD: allow getOrCreate
062public class ParquetImporter extends JanusClient {
063        
064  /** Import Parquet files or directory. 
065    * @param args[0] The Janusgraph properties file. 
066    * @param args[1] The Parquet file or directory with Parquet files.
067    * @param args[2] The number of events to use for progress report (-1 means no report untill the end).
068    * @param args[3] The number of events to commit in one step (-1 means commit only at the end).
069    * @param args[4] The creation strategy. <tt>drop,replace,getOrCreate</tt>.
070    * @throws LomikelException If anything goes wrong. */
071   public static void main(String[] args) throws IOException {
072    Init.init();
073    if (args.length != 5) {
074      log.error("ParquetImporter.exe.jar <JanusGraph properties> [<file>|<directory>] <report limit> <commit limit> [create|reuse|drop]");
075      System.exit(-1);
076      }
077    try {
078      ParquetImporter importer = new ParquetImporter(                args[0],
079                                                     Integer.valueOf(args[2]),
080                                                     Integer.valueOf(args[3]),
081                                                                     args[4]);
082      importer.timerStart();
083      importer.process(args[1]);
084      importer.commit();
085      importer.close();
086      }
087    catch (LomikelException e) {
088      log.fatal("Cannot import " + args[1] + " into " + args[0], e);
089      System.exit(-1);
090      }
091    }
092  
093  /** Create with JanusGraph properties file.
094    * @param properties  The file with the complete Janusgraph properties.
095    * @param reportLimit The number of events to use for progress report (-1 means no report until the end).
096    * @param commitLimit The number of events to commit in one step (-1 means commit only at the end).
097    * @param strategy    The creation strategy. <tt>drop,replace,getOrCreate</tt>. */
098  public ParquetImporter(String properties,
099                         int    reportLimit,
100                         int    commitLimit,
101                         String strategy) {
102    super(properties);
103    log.info("Reporting after each " + reportLimit + " alerts");
104    log.info("Committing after each " + commitLimit + " alerts");
105    log.info("Using strategy: " + strategy);
106    _reportLimit = reportLimit;
107    _commitLimit = commitLimit;
108    _create      = false;
109    _reuse       = false;
110    _replace     = false;
111    _drop        = false;
112    if (strategy.contains("create")) {
113      _create = true;
114      }
115    if (strategy.contains("reuse")) {
116      _reuse = true;
117      }
118    if (strategy.contains("replace")) {
119      _replace = true;
120      }
121    if (strategy.contains("drop")) {
122      _drop = true;
123      }
124    _gr = new GremlinRecipies(this);
125    }
126    
127
128  /** Process directory with <em>Parquet</em> alert files.
129    * @param dirFN The dirname of directiory with data file.
130    * @param fileExt The file extention. 
131    * @throws IOException           If problem with file reading.
132    * @throws FileNotFoundException If problem with file founding. */
133  public void processDir(String dirFN,
134                         String fileExt) throws IOException, FileNotFoundException {  
135    log.info("Loading directory " + dirFN);
136    Path path = new Path(dirFN);
137    Path p;
138    int i = 0;
139    for (FileStatus fileStatus : _fs.listStatus(path)) {
140      p = fileStatus.getPath();
141      if (_fs.isDirectory(p)) {
142        processDir(dirFN + "/" + p.getName(), fileExt);
143        }
144      else if (p.getName().endsWith("." + fileExt)) {
145        try {
146          process(dirFN + "/" + p.getName());
147          i++;
148          }
149        catch (IOException | LomikelException e) {
150          log.error("Failed to process " + p, e);
151          }
152        }
153      else {
154        log.warn("Not " + fileExt + " file: " + p);
155        }
156      }
157    timer("alerts created", _n, -1, -1);      
158    log.info("" + i + " files loaded");
159    }
160     
161  /** Process <em>Parquet</em> alert file or directory with files (recursive).
162     * @param fn The filename of the data file
163     *           or directory with files.
164     * @throws IOException      If problem with file reading.
165     * @throws LomikelException If anything wrong. */
166  public void process(String fn) throws IOException, LomikelException {
167    log.info("Loading " + fn);
168    _conf = new Configuration();
169    _fs = FileSystem.get(_conf);
170    Path path = new Path(fn);
171    if (_fs.isDirectory(path)) {
172      processDir(fn, "parquet");
173      return;
174      }
175    else if (!_fs.isFile(path)) {
176      log.error("Not a file/directory: " + fn);
177      return;
178      }
179    ParquetMetadata readFooter = ParquetFileReader.readFooter(_conf, path, ParquetMetadataConverter.NO_FILTER);
180    MessageType schema = readFooter.getFileMetaData().getSchema();
181    ParquetFileReader r = new ParquetFileReader(_conf, path, readFooter);
182    PageReadStore pages = null;
183    while (null != (pages = r.readNextRowGroup())) {
184      final long rows = pages.getRowCount();
185      log.info("Reading " + rows + " rows");      
186      final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
187      final RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
188      String sTemp = "";
189      Group g;
190      int i = 0;
191      while ((g = recordReader.read()) != null && ++i < rows) { // BUG?: i++ ?
192        processGroup(g, "alert");
193        timer("alerts processed", ++_n, _reportLimit, _commitLimit);      
194        }
195      }
196    }   
197    
198  /** Process {@link Group} and create contained {@link Vertex}es with the
199    * specified label. Runs recursively.
200    * @param g   The {@link Group} to process.
201    * @param lbl The label for created {@link Vertex}es.
202    * @return    The created {@link Vertex}es. */
203  private List<Vertex> processGroup(Group  g,
204                                    String lbl) {
205    lbl = reLabel(lbl);
206    //log.info(lbl);
207    SimpleGroup sg;
208    GroupType type;
209    String[] jt;
210    int n;
211    String name = null;
212    String edgeName;
213    boolean bypass = false;
214    Vertex v = null;
215    List<Vertex>              vertices = new ArrayList<>();
216    Map<String, List<Vertex>> vs       = new TreeMap<>();
217    Map<String, String>       props    = new TreeMap<>();
218    if (g instanceof SimpleGroup) {
219      sg = (SimpleGroup)g;
220      type = sg.getType();
221      n = type.getFieldCount();
222      for (int i = 0; i < n; i++) {
223        for (int j = 0; j < g.getFieldRepetitionCount(i); j++) {
224          jt = type.getType(i).toString().split(" "); // optionality, type, name, ...
225          //log.info(lbl + ": " + type.getType(i).toString());
226          switch (jt[1]) {
227            case "int32":
228              props.put(jt[2], "" + g.getInteger(i, j));
229              break;
230            case "int64":
231              props.put(jt[2], "" + g.getLong(i, j));
232              break;
233            case "int96":
234              props.put(jt[2], "" + int96toTimestamp(g.getInt96(i, j).getBytes()));
235              break;
236            case "float":
237              props.put(jt[2], "" + g.getFloat(i, j));
238              break;
239            case "double":
240              props.put(jt[2], "" + g.getDouble(i, j));
241              break;
242            case "binary":
243              if (jt.length == 4 && jt[3].equals("(STRING)")) {
244                props.put(jt[2], g.getString(i, j));
245                }
246              else {
247                props.put(jt[2], Base64.getEncoder().encodeToString(g.getBinary(i, j).getBytes()));
248                }
249              break;
250            case "group":
251              name = type.getFieldName(i);
252              if (name.equals("list")) {
253                bypass = true;
254                registerVertices(vs, lbl, processGroup(g.getGroup(i, j).getGroup(0, 0), lbl));
255                }
256              else {
257                registerVertices(vs, name, processGroup(g.getGroup(i, j), name));
258                }
259              break;
260            default:
261              log.error("Uncovered  type of " + type.getType(i).toString());
262            }
263          }
264        }
265      if (bypass) {
266        for (Map.Entry<String, List<Vertex>> vv : vs.entrySet()) {
267          for (Vertex vvv : vv.getValue()) {
268            vertices.add(vvv);
269            }
270          }
271        }
272      else {
273        v = vertex(props, lbl, IDS.get(lbl));
274        for (Map.Entry<String, List<Vertex>> vv : vs.entrySet()) {
275          name = reLabel(vv.getKey());
276          edgeName = RELATIONS.get(name);
277          if (edgeName != null) {
278            for (Vertex vvv : vv.getValue()) {
279              _gr.addEdge(v, vvv, edgeName);
280              }
281            }
282          else {
283            log.error("Unknown relation for " + vv.getKey());
284            }
285          }
286        vertices.add(v);
287        }
288      }
289    else {
290      log.info("Uncovered group " + g.getClass());
291      }
292    return vertices;
293    }  
294    
295  /** Create or drop a {@link Vertex} according to chosen strategy.
296    * @param props     The {@link Map} of values.
297    * @param label     The {@link Vertex} label.
298    * @param property  The name of {@link Vertex} property.
299    *                  If <tt>null</tt> strategy is ignored and {@link Vertex} is created.
300    * @return          The created {@link Vertex} or <tt>null</tt>. */
301  private Vertex vertex(Map<String, String> props,
302                        String              label,
303                        String              property) {
304    label = reLabel(label);
305    Vertex v = null;
306    // Not unique Vertex
307    if (property == null) {
308      if (_drop) {
309        log.error("Cannot drop " + label + " of " + props); 
310        }
311      else  {
312        log.debug("Creating: " + label);
313        v = g().addV(label).property("lbl", label).next();
314        }
315      }
316    // Unique Vertex
317    else {
318      if (_drop || _replace) {
319        log.debug("Dropping " + label + ": " + property + " = " + props.get(property));
320        _gr.drop(label, property, props.get(property), true);
321        }
322      if (_reuse) {
323        log.info("Getting " + label + ": " + property + " = " + props.get(property));
324        v = _gr.getOrCreate(label, property, props.get(property)).next();
325        }
326      if (_create || _replace) {
327        log.debug("Creating " + label + ": " + property + " = " + props.get(property));
328        v = g().addV(label).property("lbl", label).property(property, props.get(property)).next();
329        }
330      }
331    // Fill properties
332    if (property != null) {
333      props.remove(property);
334      }
335    for (Map.Entry<String, String> prop : props.entrySet()) {
336      try {
337        v.property(prop.getKey(), prop.getValue());
338        }
339      catch (IllegalArgumentException e) {
340        log.error("Unknown property: " + prop.getKey() + " = " + prop.getValue());
341        }
342      if (props.get("dec") != null && props.get("ra") != null) {
343        v.property("direction", Geoshape.point(Double.valueOf(props.get("dec").toString()), Double.valueOf(props.get("ra").toString()) - 180));
344        }
345      }
346    return v;
347    }
348    
349  /** Register {@link Vertex}es in a {@link Map} according their label.
350    * @param map The {@link Map} of all registered {@link Vertex}es, organised according their label.
351    * @param key The label of new {@link Vertex}es/
352    * @param vertices The {@link List} of new {@link Vertex}es to be added. */
353  private void registerVertices(Map<String, List<Vertex>> map,
354                                String                    key,
355                                List<Vertex>              vertices) {
356    if (!map.containsKey(key)) {
357      map.put(key, new ArrayList<Vertex>());
358      }
359    for (Vertex vertex : vertices) {
360      map.get(key).add(vertex);
361      }
362    }
363    
364  /** Change label.
365    * @param label The original label.
366    * @return      The changed label. */
367  private String reLabel(String label) {
368    if (VERTEXES.containsKey(label)) {
369      label = VERTEXES.get(label);
370      }
371    return label;
372    }
373
374  /** Transform timestamp from int96 to {@link LocalDateTime}.
375    * @param bytes The timestamp of int96 bytes.
376    * @return      The timestamp converted to {@link LocalDateTime}. */
377  private LocalDateTime int96toTimestamp(byte[] bytes) {
378    int julianDay = 0;
379    int index = bytes.length;
380    while (index > 8) {
381      index--;
382      julianDay <<= 8;
383      julianDay += bytes[index] & 0xFF;
384      }
385    long nanos = 0;
386    while (index > 0) {
387      index--;
388      nanos <<= 8;
389      nanos += bytes[index] & 0xFF;
390      }
391    LocalDateTime timestamp = LocalDate.MIN.with(JulianFields.JULIAN_DAY, julianDay)
392                                           .atTime(LocalTime.NOON)
393                                           .plusNanos(nanos);
394    return timestamp;
395    }
396  
397  private static Map<String, String> VERTEXES;
398  
399  private static Map<String, String> RELATIONS;
400  
401  private static Map<String, String> IDS;
402    
403  private Configuration _conf;
404  
405  private FileSystem _fs;
406    
407  private GremlinRecipies _gr;
408  
409  private int _n = 0;
410  
411  private int _reportLimit;
412  
413  private int _commitLimit;
414  
415  private boolean _create;
416  
417  private boolean _reuse;
418  
419  private boolean _replace;
420  
421  private boolean _drop;
422            
423  static {
424    VERTEXES = new TreeMap<>();
425    VERTEXES.put("prv_candidates",   "prv_candidate");
426    VERTEXES.put("cutoutScience",    "Science");
427    VERTEXES.put("cutoutTemplate",   "Template");
428    VERTEXES.put("cutoutDifference", "Difference");
429    RELATIONS = new TreeMap<>();
430    RELATIONS.put("candidate",        "has");
431    RELATIONS.put("prv_candidate",    "has");
432    RELATIONS.put("mulens",           "has");
433    RELATIONS.put("Science",          "cutout");
434    RELATIONS.put("Template",         "cutout");
435    RELATIONS.put("Difference",       "cutout");
436    IDS = new TreeMap<>();
437    IDS.put("alert", "objectId");
438    }
439    
440  /** Logging . */
441  private static Logger log = LogManager.getLogger(ParquetImporter.class);
442                                                
443  }