001package com.astrolabsoftware.FinkBrowser.Parquet; 002 003import com.Lomikel.Utils.Init; 004import com.Lomikel.Januser.JanusClient; 005import com.Lomikel.Januser.GremlinRecipies; 006import com.Lomikel.Utils.LomikelException; 007 008// Parquet 009import org.apache.parquet.hadoop.ParquetFileReader; 010import org.apache.parquet.hadoop.metadata.ParquetMetadata; 011import org.apache.parquet.format.converter.ParquetMetadataConverter; 012import org.apache.parquet.schema.MessageType; 013import org.apache.parquet.column.page.PageReadStore; 014import org.apache.parquet.io.MessageColumnIO; 015import org.apache.parquet.io.ColumnIOFactory; 016import org.apache.parquet.io.RecordReader; 017import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; 018import org.apache.parquet.example.data.Group; 019import org.apache.parquet.example.data.simple.SimpleGroup; 020import org.apache.parquet.schema.GroupType; 021import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; 022 023// Hadoop 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.conf.Configuration; 028 029// Tinker Pop 030import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; 031import org.apache.tinkerpop.gremlin.structure.Vertex; 032 033// Janus Graph 034import org.janusgraph.core.JanusGraph; 035import org.janusgraph.core.attribute.Geoshape; 036 037// Java 038import java.io.IOException; 039import java.io.FileNotFoundException; 040import java.util.List; 041import java.util.ArrayList; 042import java.util.Map; 043import java.util.TreeMap; 044import java.util.Base64; 045import java.time.LocalDateTime; 046import java.time.LocalDate; 047import java.time.LocalTime; 048import java.time.temporal.JulianFields; 049 050// Log4J 051import org.apache.logging.log4j.Logger; 052import org.apache.logging.log4j.LogManager; 053 054/** <code>ParquetImporter</code> imports <em>Parquet</em> files into <em>JanusGraph</em>. 055 * @opt attributes 056 * @opt operations 057 * @opt types 058 * @opt visibility 059 * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */ 060// TBD: allow reset 061// TBD: allow getOrCreate 062public class ParquetImporter extends JanusClient { 063 064 /** Import Parquet files or directory. 065 * @param args[0] The Janusgraph properties file. 066 * @param args[1] The Parquet file or directory with Parquet files. 067 * @param args[2] The number of events to use for progress report (-1 means no report untill the end). 068 * @param args[3] The number of events to commit in one step (-1 means commit only at the end). 069 * @param args[4] The creation strategy. <tt>drop,replace,getOrCreate</tt>. 070 * @throws LomikelException If anything goes wrong. */ 071 public static void main(String[] args) throws IOException { 072 Init.init(); 073 if (args.length != 5) { 074 log.error("ParquetImporter.exe.jar <JanusGraph properties> [<file>|<directory>] <report limit> <commit limit> [create|reuse|drop]"); 075 System.exit(-1); 076 } 077 try { 078 ParquetImporter importer = new ParquetImporter( args[0], 079 Integer.valueOf(args[2]), 080 Integer.valueOf(args[3]), 081 args[4]); 082 importer.timerStart(); 083 importer.process(args[1]); 084 importer.commit(); 085 importer.close(); 086 } 087 catch (LomikelException e) { 088 log.fatal("Cannot import " + args[1] + " into " + args[0], e); 089 System.exit(-1); 090 } 091 } 092 093 /** Create with JanusGraph properties file. 094 * @param properties The file with the complete Janusgraph properties. 095 * @param reportLimit The number of events to use for progress report (-1 means no report until the end). 096 * @param commitLimit The number of events to commit in one step (-1 means commit only at the end). 097 * @param strategy The creation strategy. <tt>drop,replace,getOrCreate</tt>. */ 098 public ParquetImporter(String properties, 099 int reportLimit, 100 int commitLimit, 101 String strategy) { 102 super(properties); 103 log.info("Reporting after each " + reportLimit + " alerts"); 104 log.info("Committing after each " + commitLimit + " alerts"); 105 log.info("Using strategy: " + strategy); 106 _reportLimit = reportLimit; 107 _commitLimit = commitLimit; 108 _create = false; 109 _reuse = false; 110 _replace = false; 111 _drop = false; 112 if (strategy.contains("create")) { 113 _create = true; 114 } 115 if (strategy.contains("reuse")) { 116 _reuse = true; 117 } 118 if (strategy.contains("replace")) { 119 _replace = true; 120 } 121 if (strategy.contains("drop")) { 122 _drop = true; 123 } 124 _gr = new GremlinRecipies(this); 125 } 126 127 128 /** Process directory with <em>Parquet</em> alert files. 129 * @param dirFN The dirname of directiory with data file. 130 * @param fileExt The file extention. 131 * @throws IOException If problem with file reading. 132 * @throws FileNotFoundException If problem with file founding. */ 133 public void processDir(String dirFN, 134 String fileExt) throws IOException, FileNotFoundException { 135 log.info("Loading directory " + dirFN); 136 Path path = new Path(dirFN); 137 Path p; 138 int i = 0; 139 for (FileStatus fileStatus : _fs.listStatus(path)) { 140 p = fileStatus.getPath(); 141 if (_fs.isDirectory(p)) { 142 processDir(dirFN + "/" + p.getName(), fileExt); 143 } 144 else if (p.getName().endsWith("." + fileExt)) { 145 try { 146 process(dirFN + "/" + p.getName()); 147 i++; 148 } 149 catch (IOException | LomikelException e) { 150 log.error("Failed to process " + p, e); 151 } 152 } 153 else { 154 log.warn("Not " + fileExt + " file: " + p); 155 } 156 } 157 timer("alerts created", _n, -1, -1); 158 log.info("" + i + " files loaded"); 159 } 160 161 /** Process <em>Parquet</em> alert file or directory with files (recursive). 162 * @param fn The filename of the data file 163 * or directory with files. 164 * @throws IOException If problem with file reading. 165 * @throws LomikelException If anything wrong. */ 166 public void process(String fn) throws IOException, LomikelException { 167 log.info("Loading " + fn); 168 _conf = new Configuration(); 169 _fs = FileSystem.get(_conf); 170 Path path = new Path(fn); 171 if (_fs.isDirectory(path)) { 172 processDir(fn, "parquet"); 173 return; 174 } 175 else if (!_fs.isFile(path)) { 176 log.error("Not a file/directory: " + fn); 177 return; 178 } 179 ParquetMetadata readFooter = ParquetFileReader.readFooter(_conf, path, ParquetMetadataConverter.NO_FILTER); 180 MessageType schema = readFooter.getFileMetaData().getSchema(); 181 ParquetFileReader r = new ParquetFileReader(_conf, path, readFooter); 182 PageReadStore pages = null; 183 while (null != (pages = r.readNextRowGroup())) { 184 final long rows = pages.getRowCount(); 185 log.info("Reading " + rows + " rows"); 186 final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); 187 final RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); 188 String sTemp = ""; 189 Group g; 190 int i = 0; 191 while ((g = recordReader.read()) != null && ++i < rows) { // BUG?: i++ ? 192 processGroup(g, "alert"); 193 timer("alerts processed", ++_n, _reportLimit, _commitLimit); 194 } 195 } 196 } 197 198 /** Process {@link Group} and create contained {@link Vertex}es with the 199 * specified label. Runs recursively. 200 * @param g The {@link Group} to process. 201 * @param lbl The label for created {@link Vertex}es. 202 * @return The created {@link Vertex}es. */ 203 private List<Vertex> processGroup(Group g, 204 String lbl) { 205 lbl = reLabel(lbl); 206 //log.info(lbl); 207 SimpleGroup sg; 208 GroupType type; 209 String[] jt; 210 int n; 211 String name = null; 212 String edgeName; 213 boolean bypass = false; 214 Vertex v = null; 215 List<Vertex> vertices = new ArrayList<>(); 216 Map<String, List<Vertex>> vs = new TreeMap<>(); 217 Map<String, String> props = new TreeMap<>(); 218 if (g instanceof SimpleGroup) { 219 sg = (SimpleGroup)g; 220 type = sg.getType(); 221 n = type.getFieldCount(); 222 for (int i = 0; i < n; i++) { 223 for (int j = 0; j < g.getFieldRepetitionCount(i); j++) { 224 jt = type.getType(i).toString().split(" "); // optionality, type, name, ... 225 //log.info(lbl + ": " + type.getType(i).toString()); 226 switch (jt[1]) { 227 case "int32": 228 props.put(jt[2], "" + g.getInteger(i, j)); 229 break; 230 case "int64": 231 props.put(jt[2], "" + g.getLong(i, j)); 232 break; 233 case "int96": 234 props.put(jt[2], "" + int96toTimestamp(g.getInt96(i, j).getBytes())); 235 break; 236 case "float": 237 props.put(jt[2], "" + g.getFloat(i, j)); 238 break; 239 case "double": 240 props.put(jt[2], "" + g.getDouble(i, j)); 241 break; 242 case "binary": 243 if (jt.length == 4 && jt[3].equals("(STRING)")) { 244 props.put(jt[2], g.getString(i, j)); 245 } 246 else { 247 props.put(jt[2], Base64.getEncoder().encodeToString(g.getBinary(i, j).getBytes())); 248 } 249 break; 250 case "group": 251 name = type.getFieldName(i); 252 if (name.equals("list")) { 253 bypass = true; 254 registerVertices(vs, lbl, processGroup(g.getGroup(i, j).getGroup(0, 0), lbl)); 255 } 256 else { 257 registerVertices(vs, name, processGroup(g.getGroup(i, j), name)); 258 } 259 break; 260 default: 261 log.error("Uncovered type of " + type.getType(i).toString()); 262 } 263 } 264 } 265 if (bypass) { 266 for (Map.Entry<String, List<Vertex>> vv : vs.entrySet()) { 267 for (Vertex vvv : vv.getValue()) { 268 vertices.add(vvv); 269 } 270 } 271 } 272 else { 273 v = vertex(props, lbl, IDS.get(lbl)); 274 for (Map.Entry<String, List<Vertex>> vv : vs.entrySet()) { 275 name = reLabel(vv.getKey()); 276 edgeName = RELATIONS.get(name); 277 if (edgeName != null) { 278 for (Vertex vvv : vv.getValue()) { 279 _gr.addEdge(v, vvv, edgeName); 280 } 281 } 282 else { 283 log.error("Unknown relation for " + vv.getKey()); 284 } 285 } 286 vertices.add(v); 287 } 288 } 289 else { 290 log.info("Uncovered group " + g.getClass()); 291 } 292 return vertices; 293 } 294 295 /** Create or drop a {@link Vertex} according to chosen strategy. 296 * @param props The {@link Map} of values. 297 * @param label The {@link Vertex} label. 298 * @param property The name of {@link Vertex} property. 299 * If <tt>null</tt> strategy is ignored and {@link Vertex} is created. 300 * @return The created {@link Vertex} or <tt>null</tt>. */ 301 private Vertex vertex(Map<String, String> props, 302 String label, 303 String property) { 304 label = reLabel(label); 305 Vertex v = null; 306 // Not unique Vertex 307 if (property == null) { 308 if (_drop) { 309 log.error("Cannot drop " + label + " of " + props); 310 } 311 else { 312 log.debug("Creating: " + label); 313 v = g().addV(label).property("lbl", label).next(); 314 } 315 } 316 // Unique Vertex 317 else { 318 if (_drop || _replace) { 319 log.debug("Dropping " + label + ": " + property + " = " + props.get(property)); 320 _gr.drop(label, property, props.get(property), true); 321 } 322 if (_reuse) { 323 log.info("Getting " + label + ": " + property + " = " + props.get(property)); 324 v = _gr.getOrCreate(label, property, props.get(property)).next(); 325 } 326 if (_create || _replace) { 327 log.debug("Creating " + label + ": " + property + " = " + props.get(property)); 328 v = g().addV(label).property("lbl", label).property(property, props.get(property)).next(); 329 } 330 } 331 // Fill properties 332 if (property != null) { 333 props.remove(property); 334 } 335 for (Map.Entry<String, String> prop : props.entrySet()) { 336 try { 337 v.property(prop.getKey(), prop.getValue()); 338 } 339 catch (IllegalArgumentException e) { 340 log.error("Unknown property: " + prop.getKey() + " = " + prop.getValue()); 341 } 342 if (props.get("dec") != null && props.get("ra") != null) { 343 v.property("direction", Geoshape.point(Double.valueOf(props.get("dec").toString()), Double.valueOf(props.get("ra").toString()) - 180)); 344 } 345 } 346 return v; 347 } 348 349 /** Register {@link Vertex}es in a {@link Map} according their label. 350 * @param map The {@link Map} of all registered {@link Vertex}es, organised according their label. 351 * @param key The label of new {@link Vertex}es/ 352 * @param vertices The {@link List} of new {@link Vertex}es to be added. */ 353 private void registerVertices(Map<String, List<Vertex>> map, 354 String key, 355 List<Vertex> vertices) { 356 if (!map.containsKey(key)) { 357 map.put(key, new ArrayList<Vertex>()); 358 } 359 for (Vertex vertex : vertices) { 360 map.get(key).add(vertex); 361 } 362 } 363 364 /** Change label. 365 * @param label The original label. 366 * @return The changed label. */ 367 private String reLabel(String label) { 368 if (VERTEXES.containsKey(label)) { 369 label = VERTEXES.get(label); 370 } 371 return label; 372 } 373 374 /** Transform timestamp from int96 to {@link LocalDateTime}. 375 * @param bytes The timestamp of int96 bytes. 376 * @return The timestamp converted to {@link LocalDateTime}. */ 377 private LocalDateTime int96toTimestamp(byte[] bytes) { 378 int julianDay = 0; 379 int index = bytes.length; 380 while (index > 8) { 381 index--; 382 julianDay <<= 8; 383 julianDay += bytes[index] & 0xFF; 384 } 385 long nanos = 0; 386 while (index > 0) { 387 index--; 388 nanos <<= 8; 389 nanos += bytes[index] & 0xFF; 390 } 391 LocalDateTime timestamp = LocalDate.MIN.with(JulianFields.JULIAN_DAY, julianDay) 392 .atTime(LocalTime.NOON) 393 .plusNanos(nanos); 394 return timestamp; 395 } 396 397 private static Map<String, String> VERTEXES; 398 399 private static Map<String, String> RELATIONS; 400 401 private static Map<String, String> IDS; 402 403 private Configuration _conf; 404 405 private FileSystem _fs; 406 407 private GremlinRecipies _gr; 408 409 private int _n = 0; 410 411 private int _reportLimit; 412 413 private int _commitLimit; 414 415 private boolean _create; 416 417 private boolean _reuse; 418 419 private boolean _replace; 420 421 private boolean _drop; 422 423 static { 424 VERTEXES = new TreeMap<>(); 425 VERTEXES.put("prv_candidates", "prv_candidate"); 426 VERTEXES.put("cutoutScience", "Science"); 427 VERTEXES.put("cutoutTemplate", "Template"); 428 VERTEXES.put("cutoutDifference", "Difference"); 429 RELATIONS = new TreeMap<>(); 430 RELATIONS.put("candidate", "has"); 431 RELATIONS.put("prv_candidate", "has"); 432 RELATIONS.put("mulens", "has"); 433 RELATIONS.put("Science", "cutout"); 434 RELATIONS.put("Template", "cutout"); 435 RELATIONS.put("Difference", "cutout"); 436 IDS = new TreeMap<>(); 437 IDS.put("alert", "objectId"); 438 } 439 440 /** Logging . */ 441 private static Logger log = LogManager.getLogger(ParquetImporter.class); 442 443 }