001package com.astrolabsoftware.FinkBrowser.Avro;
002
003import com.Lomikel.Utils.Init;
004
005// Lomikel
006import com.Lomikel.Utils.LomikelException;
007
008// Avro
009import org.apache.avro.io.DatumReader;
010import org.apache.avro.file.DataFileReader;
011import org.apache.avro.generic.GenericRecord;
012import org.apache.avro.generic.GenericDatumReader;
013import org.apache.avro.file.SeekableInput;
014import org.apache.avro.mapred.FsInput;
015
016// Hadoop
017import org.apache.hadoop.fs.FileSystem;
018import org.apache.hadoop.fs.FileStatus;
019import org.apache.hadoop.fs.Path;
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.fs.FSDataOutputStream;
022
023// Tinker Pop
024import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold;
025
026// Janus Graph
027import org.janusgraph.core.JanusGraph;
028
029// Java
030import java.io.IOException;
031
032// Log4J
033import org.apache.log4j.Logger;
034
035/** <code>AvroImporter</code> imports <em>Avro</em> files from HDFS into <em>JanusGraph</em>.
036  * @opt attributes
037  * @opt operations
038  * @opt types
039  * @opt visibility
040  * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */
041public class HDFSAvroImporter extends AvroImporter {
042        
043  /** Import Avro files or directory. 
044    * @param args[0] The Janusgraph properties file. 
045    * @param args[1] The Avro file or directory with Avro files.
046    * @param args[2] The directory for FITS files. If <tt>null</tt> or empty, FITS are included in the Graph. Ignored if HBase url set.
047    * @param args[3] The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty.
048    * @param args[4] The number of events to use for progress report (-1 means no report untill the end).
049    * @param args[5] The number of events to commit in one step (-1 means commit only at the end).
050    * @param args[6] The creation strategy. <tt>create,drop,replace,skip</tt>.
051    * @param args[7] The data type, <tt>alert|pca</tt>. If <tt>null<tt>, then considering as <tt>alert</tt>.
052    * @throws LomikelException If anything goes wrong. */
053   public static void main(String[] args) throws IOException {
054    Init.init();
055    if (args.length != 8) {
056      log.error("HDFSAvroImporter.exe.jar <JanusGraph properties> [<file>|<directory>] <hbaseUrl> <report limit> <commit limit> [create|reuse|drop] [alert|pca]");
057      System.exit(-1);
058      }
059    try {
060      HDFSAvroImporter importer = new HDFSAvroImporter(            args[0],
061                                                       new Integer(args[4]),
062                                                       new Integer(args[5]),
063                                                                   args[6],
064                                                                   args[2],
065                                                                   args[3],
066                                                                   args[7]);
067      importer.timerStart();
068      importer.process(args[1]);
069      if (!importer.skip()) {
070        importer.commit();
071        }
072      importer.close();
073      }
074    catch (LomikelException e) {
075      log.fatal("Cannot import " + args[1] + " into " + args[0], e);
076      System.exit(-1);
077      }
078    }
079  
080  /** Create with JanusGraph properties file.
081    * @param properties  The file with the complete Janusgraph properties.
082    * @param reportLimit The number of events to use for progress report (-1 means no report untill the end).
083    * @param commitLimit The number of events to commit in one step (-1 means commit only at the end).
084    * @param strategy    The creation strategy. <tt>dro If <tt>null<tt>, then considering as <tt>alert</tt>.p,replace,getOrCreate</tt>. 
085    * @param fitsDir     The directory for FITS files. If <tt>null</tt>or empty, FITS are included in the Graph. Ignored if HBase url set.
086    * @param hbaseUrl    The url for HBase table with full data as <tt>ip:port:table:schema</tt>. May be <tt>null</tt> or empty.
087    * @param dataType    The data type, <tt>alert|pca</tt>. If <tt>null<tt>, then considering as <tt>alert</tt>.*/  
088  public HDFSAvroImporter(String properties,
089                          int    reportLimit,
090                          int    commitLimit,
091                          String strategy,
092                          String fitsDir,
093                          String hbaseUrl,
094                          String dataType) {
095    super(properties, reportLimit, commitLimit, strategy, fitsDir, hbaseUrl, dataType);
096    }
097        
098  @Override
099  public void processDir(String dirFN,
100                         String fileExt) throws IOException {  
101    log.info("Loading directory " + dirFN);
102    Path path = new Path(dirFN);
103    Path p;
104    int i = 0;
105    for (FileStatus fileStatus : _fs.listStatus(path)) {
106      p = fileStatus.getPath();
107      if (_fs.isDirectory(p)) {
108        processDir(dirFN + "/" + p.getName(), fileExt);
109        }
110      else if (p.getName().endsWith("." + fileExt)) {
111        try {
112          process(dirFN + "/" + p.getName());
113          i++;
114          }
115        catch (IOException | LomikelException e) {
116          log.error("Failed to process " + p, e);
117          }
118        }
119      else {
120        log.warn("Not " + fileExt + " file: " + p);
121        }
122      }
123    timer("alerts created", n(), -1, -1);      
124    log.info("" + i + " files loaded from " + dirFN);
125    }
126     
127  @Override
128  public void process(String fn) throws IOException, LomikelException {
129    log.info("Loading " + fn);
130    register(fn);
131    _conf = new Configuration();
132    _fs = FileSystem.get(_conf);
133    Path path = new Path(fn);
134    if (_fs.isDirectory(path)) {
135      processDir(fn, "avro");
136      return;
137      }
138    else if (!_fs.isFile(path)) {
139      log.error("Not a file/directory: " + fn);
140      return;
141      }
142    processFile(path);
143    }
144    
145  /** Process <em>Avro</em> alert file .
146     * @param path The data file.
147     * @throws IOException If problem with file reading. */
148  public void processFile(Path path) throws IOException {
149    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
150    SeekableInput input = new FsInput(path, _conf);
151    DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(input, datumReader);
152    GenericRecord record = null;
153    while (dataFileReader.hasNext()) {
154      record = dataFileReader.next(record);
155      processRecord(record);
156      }
157    dataFileReader.close();
158    }     
159    
160  @Override
161  protected void writeFits(String fn,
162                           byte[] data) {
163    try {
164      FSDataOutputStream out = _fs.create(new Path(fitsDir() + "/" + fn));
165      out.write(data);
166      out.close();
167      }
168    catch (IOException e) {
169      log.error("Cannot write " + fn, e);
170      }
171    }
172    
173  private Configuration _conf;
174  
175  private FileSystem _fs;
176    
177  /** Logging . */
178  private static Logger log = Logger.getLogger(HDFSAvroImporter.class);
179                                                
180  }