001package com.astrolabsoftware.FinkBrowser.Avro; 002 003import com.Lomikel.Utils.Init; 004 005// Lomikel 006import com.Lomikel.Utils.LomikelException; 007 008// Avro 009import org.apache.avro.io.DatumReader; 010import org.apache.avro.file.DataFileReader; 011import org.apache.avro.generic.GenericRecord; 012import org.apache.avro.generic.GenericDatumReader; 013import org.apache.avro.file.SeekableInput; 014import org.apache.avro.mapred.FsInput; 015 016// Hadoop 017import org.apache.hadoop.fs.FileSystem; 018import org.apache.hadoop.fs.FileStatus; 019import org.apache.hadoop.fs.Path; 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.fs.FSDataOutputStream; 022 023// Tinker Pop 024import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; 025 026// Janus Graph 027import org.janusgraph.core.JanusGraph; 028 029// Java 030import java.io.IOException; 031 032// Log4J 033import org.apache.log4j.Logger; 034 035/** <code>AvroImporter</code> imports <em>Avro</em> files from HDFS into <em>JanusGraph</em>. 036 * @opt attributes 037 * @opt operations 038 * @opt types 039 * @opt visibility 040 * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */ 041public class HDFSAvroImporter extends AvroImporter { 042 043 /** Import Avro files or directory. 044 * @param args[0] The Janusgraph properties file. 045 * @param args[1] The Avro file or directory with Avro files. 046 * @param args[2] The directory for FITS files. If <tt>null</tt> or empty, FITS are included in the Graph. Ignored if HBase url set. 047 * @param args[3] The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty. 048 * @param args[4] The number of events to use for progress report (-1 means no report untill the end). 049 * @param args[5] The number of events to commit in one step (-1 means commit only at the end). 050 * @param args[6] The creation strategy. <tt>create,drop,replace,skip</tt>. 051 * @param args[7] The data type, <tt>alert|pca</tt>. If <tt>null<tt>, then considering as <tt>alert</tt>. 052 * @throws LomikelException If anything goes wrong. */ 053 public static void main(String[] args) throws IOException { 054 Init.init(); 055 if (args.length != 8) { 056 log.error("HDFSAvroImporter.exe.jar <JanusGraph properties> [<file>|<directory>] <hbaseUrl> <report limit> <commit limit> [create|reuse|drop] [alert|pca]"); 057 System.exit(-1); 058 } 059 try { 060 HDFSAvroImporter importer = new HDFSAvroImporter( args[0], 061 new Integer(args[4]), 062 new Integer(args[5]), 063 args[6], 064 args[2], 065 args[3], 066 args[7]); 067 importer.timerStart(); 068 importer.process(args[1]); 069 if (!importer.skip()) { 070 importer.commit(); 071 } 072 importer.close(); 073 } 074 catch (LomikelException e) { 075 log.fatal("Cannot import " + args[1] + " into " + args[0], e); 076 System.exit(-1); 077 } 078 } 079 080 /** Create with JanusGraph properties file. 081 * @param properties The file with the complete Janusgraph properties. 082 * @param reportLimit The number of events to use for progress report (-1 means no report untill the end). 083 * @param commitLimit The number of events to commit in one step (-1 means commit only at the end). 084 * @param strategy The creation strategy. <tt>dro If <tt>null<tt>, then considering as <tt>alert</tt>.p,replace,getOrCreate</tt>. 085 * @param fitsDir The directory for FITS files. If <tt>null</tt>or empty, FITS are included in the Graph. Ignored if HBase url set. 086 * @param hbaseUrl The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty. 087 * @param dataType The data type, <tt>alert|pca</tt>. If <tt>null<tt>, then considering as <tt>alert</tt>.*/ 088 public HDFSAvroImporter(String properties, 089 int reportLimit, 090 int commitLimit, 091 String strategy, 092 String fitsDir, 093 String hbaseUrl, 094 String dataType) { 095 super(properties, reportLimit, commitLimit, strategy, fitsDir, hbaseUrl, dataType); 096 } 097 098 @Override 099 public void processDir(String dirFN, 100 String fileExt) throws IOException { 101 log.info("Loading directory " + dirFN); 102 Path path = new Path(dirFN); 103 Path p; 104 int i = 0; 105 for (FileStatus fileStatus : _fs.listStatus(path)) { 106 p = fileStatus.getPath(); 107 if (_fs.isDirectory(p)) { 108 processDir(dirFN + "/" + p.getName(), fileExt); 109 } 110 else if (p.getName().endsWith("." + fileExt)) { 111 try { 112 process(dirFN + "/" + p.getName()); 113 i++; 114 } 115 catch (IOException | LomikelException e) { 116 log.error("Failed to process " + p, e); 117 } 118 } 119 else { 120 log.warn("Not " + fileExt + " file: " + p); 121 } 122 } 123 timer("alerts created", n(), -1, -1); 124 log.info("" + i + " files loaded from " + dirFN); 125 } 126 127 @Override 128 public void process(String fn) throws IOException, LomikelException { 129 log.info("Loading " + fn); 130 register(fn); 131 _conf = new Configuration(); 132 _fs = FileSystem.get(_conf); 133 Path path = new Path(fn); 134 if (_fs.isDirectory(path)) { 135 processDir(fn, "avro"); 136 return; 137 } 138 else if (!_fs.isFile(path)) { 139 log.error("Not a file/directory: " + fn); 140 return; 141 } 142 processFile(path); 143 } 144 145 /** Process <em>Avro</em> alert file . 146 * @param path The data file. 147 * @throws IOException If problem with file reading. */ 148 public void processFile(Path path) throws IOException { 149 DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(); 150 SeekableInput input = new FsInput(path, _conf); 151 DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(input, datumReader); 152 GenericRecord record = null; 153 while (dataFileReader.hasNext()) { 154 record = dataFileReader.next(record); 155 processRecord(record); 156 } 157 dataFileReader.close(); 158 } 159 160 @Override 161 protected void writeFits(String fn, 162 byte[] data) { 163 try { 164 FSDataOutputStream out = _fs.create(new Path(fitsDir() + "/" + fn)); 165 out.write(data); 166 out.close(); 167 } 168 catch (IOException e) { 169 log.error("Cannot write " + fn, e); 170 } 171 } 172 173 private Configuration _conf; 174 175 private FileSystem _fs; 176 177 /** Logging . */ 178 private static Logger log = Logger.getLogger(HDFSAvroImporter.class); 179 180 }