001package com.Lomikel.HBaser; 002 003import com.Lomikel.Utils.LomikelException; 004 005// HBase 006import org.apache.hadoop.hbase.client.Get; 007import org.apache.hadoop.hbase.client.Result; 008import org.apache.hadoop.hbase.client.Get; 009import org.apache.hadoop.hbase.client.Result; 010 011// SQL 012import java.sql.Connection; 013import java.sql.DriverManager; 014import java.sql.SQLException; 015import java.sql.ResultSet; 016import java.sql.ResultSetMetaData; 017import java.sql.Statement; 018 019// Java 020import java.util.Map; 021import java.util.TreeMap; 022 023// Log4J 024import org.apache.logging.log4j.Logger; 025import org.apache.logging.log4j.LogManager; 026 027/** <code>HBaseSQLClient</code> adds SQL-search possibility and SQL Phoenix upsert 028 * possibility to {@link HBaseClient}. 029 * @opt attributes 030 * @opt operations 031 * @opt types 032 * @opt visibility 033 * @author <a href="mailto:Julius.Hrivnac@cern.ch">J.Hrivnac</a> */ 034// TBD: reuse Phoenixer 035public class HBaseSQLClient extends HBaseClient { 036 037 /** Create and connect to HBase. 038 * @param zookeepers The comma-separated list of zookeper ids. 039 * @param clientPort The client port. 040 * @throws LomikelException If anything goes wrong. */ 041 public HBaseSQLClient(String zookeepers, 042 String clientPort) throws LomikelException { 043 super(zookeepers, clientPort); 044 } 045 046 /** Create and connect to HBase. 047 * @param zookeepers The comma-separated list of zookeper ids. 048 * @param clientPort The client port. 049 * @throws LomikelException If anything goes wrong. */ 050 public HBaseSQLClient(String zookeepers, 051 int clientPort) throws LomikelException { 052 super(zookeepers, clientPort); 053 } 054 055 /** Create and connect to HBase. 056 * @param url The HBase url. 057 * @throws LomikelException If anything goes wrong. */ 058 public HBaseSQLClient(String url) throws LomikelException { 059 super(url); 060 } 061 062 /** Get row(s) using SQL query. 063 * The Phoenix SQL View should be already created using {@link #sqlTableCreationCommand}. 064 * @param sql The (Phoenix) SQL query. 065 * The <code>ROWKEY</code> should be included in the <code>SELECT<code> 066 * part (explicitely or implicitely). 067 * The small and mixed case variables should be closed in quotation marks. 068 * @param ifkey Whether add also entries keys (as <tt>key:key</tt>). 069 * @return The {@link Map} of {@link Map}s of results as <tt>key-&t;{family:column->value}</tt>. */ 070 // TBD: handle binary columns 071 // TBD: handle iftime 072 // TBD: handle WHERE 073 public Map<String, Map<String, String>> scan(String sql, 074 boolean ifkey) { 075 Map<String, Map<String, String>> results = new TreeMap<>(); 076 Map<String, String> result; 077 Statement stmt; 078 try { 079 stmt = conn().createStatement(); 080 } 081 catch (SQLException e) { 082 log.error("Cannot create statement", e); 083 return null; 084 } 085 try { 086 ResultSet rs = stmt.executeQuery(sql); 087 ResultSetMetaData rsmd = rs.getMetaData(); 088 int n = rsmd.getColumnCount(); 089 while (rs.next()) { 090 result = new TreeMap<>(); 091 if (addResult(rs, rsmd, result, ifkey)) { 092 results.put(rs.getString("rowkey"), result); 093 } 094 } 095 } 096 catch (SQLException e ) { 097 log.error("Cannot execute: " + sql, e); 098 } 099 return results; 100 } 101 102 /** Add {@link Result} into result {@link Map}. 103 * @param rs The {@link ResultSet} to add (the current row). 104 * @param rsmd The {@link ResultSetMetaData}. 105 * @param result The {@link Map} of results <tt>familty:column->value</tt>. 106 * @param ifkey Whether add also entries keys (as <tt>key:key</tt>). 107 * @return Whether the result has been added. */ 108 private boolean addResult(ResultSet rs, 109 ResultSetMetaData rsmd, 110 Map<String, String> result, 111 boolean ifkey) throws SQLException { 112 String columnName; 113 int n = rsmd.getColumnCount(); 114 boolean isSchema = false; 115 if (rs.getString("ROWKEY").startsWith("schema")) { 116 isSchema = true; 117 } 118 for (int i = 1; i <= n; i++) { 119 columnName = rsmd.getColumnName(i); 120 if (columnName.equals("ROWKEY")) { 121 if (ifkey) { 122 result.put("key:key", rs.getString(i)); 123 } 124 } 125 else if (!isSchema && _simpleSchema != null && _simpleSchema.type(columnName) != null) { 126 result.put(rsmd.getColumnName(i), _simpleSchema.decode(columnName, rs.getBytes(i))); 127 } 128 else { 129 result.put(columnName, rs.getString(i)); 130 } 131 } 132 return true; 133 } 134 135 /** Give SQL view creation command for this HBase table. 136 * It creates the Phoenix SQL view of the the current HBase table. 137 * @return The SQL view creation command for this HBase table. */ 138 public String sqlViewCreationCommand() { 139 HBaseSchema hs = (HBaseSchema)schema(); 140 return hs.toSQLView(tableName()); 141 } 142 143 /** Give SQL table creation command for this HBase table. 144 * It creates the SQL tabel with the same properties are the current HBase table. 145 * Using the default table name. 146 * @return The SQL table creation command for this HBase table. */ 147 public String sqlTableCreationCommand() { 148 return schema().toSQL(tableName() + "_" + schema().name().replaceAll("\\.", "__")); 149 } 150 151 /** Replicate row(s) into Phoenix SQL table. 152 * @param key The row key. Disables other search terms. 153 * It can be <tt>null</tt>. 154 * @param search The search terms as <tt>family:column:value,...</tt>. 155 * Key can be searched with <tt>family:column = key:key<tt> "pseudo-name". 156 * <tt>key:startKey</tt> and <tt>key:stopKey</tt> van restrict search to a key interval. 157 * {@link Comparator} can be chosen as <tt>family:column:value:comparator</tt> 158 * among <tt>exact,prefix,substring,regex</tt>. 159 * The default for key is <tt>prefix</tt>, 160 * the default for columns is <tt>substring</tt>. 161 * The randomiser can be added with <tt>random:random:chance</tt>. 162 * It can be <tt>null</tt>. 163 * All searches are executed as prefix searches. 164 * @param filter The names of required values as <tt>family:column,...</tt>. 165 * <tt>*</tt> = all. 166 * @param start The time period start timestamp in <tt>ms</tt>. 167 * <tt>0</tt> means since the beginning. 168 * @param stop The time period stop timestamp in <tt>ms</tt>. 169 * <tt>0</tt> means till now. 170 * @param ifkey Whether give also entries keys (as <tt>key:key</tt>). 171 * @param iftime Whether give also entries timestamps (as <tt>key:time</tt>). 172 * @param sqlTableName The SQL table name. Empty or <tt>null</tt> will use the default name. 173 * @return The {@link Map} of {@link Map}s of results as <tt>key-&t;{family:column->value}</tt>. */ 174 public Map<String, Map<String, String>> scan2SQL(String key, 175 String search, 176 String filter, 177 long start, 178 long stop, 179 boolean ifkey, 180 boolean iftime, 181 String sqlTableName) { 182 log.info("Upserting results into SQL Phoenix table " + _sqlTableName + ", not returning results"); 183 _sqlTableName = sqlTableName; 184 Map<String, Map<String, String>> results = scan(key, search, filter, start, stop, ifkey, iftime); 185 _sqlTableName = null; 186 return results; 187 } 188 189 /** Give JDBC {@link Connection} to Phoenix database. 190 * @return The JDBC {@link Connection} to Phoenix database. */ 191 public Connection conn() { 192 if (_conn == null) { 193 log.info("Opening " + zookeepers() + " on port " + clientPort() + " via Phoenix"); 194 try { 195 Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); 196 //Class.forName("org.apache.phoenix.queryserver.client.Driver"); 197 } 198 catch (ClassNotFoundException e) { 199 log.error("Cannot find Phoenix JDBC driver", e); 200 return null; 201 } 202 try { 203 _conn = DriverManager.getConnection("jdbc:phoenix:" + zookeepers() + ":" + clientPort()); 204 //_conn = DriverManager.getConnection("jdbc:phoenix:thin:url=http://" + zookeepers() + ":" + clientPort() + ";serialization=PROTOBUF"); 205 } 206 catch (SQLException e) { 207 log.error("Cannot open connection", e); 208 return null; 209 } 210 if (schema() instanceof HBaseSchema && schema().size() > 0) { 211 _simpleSchema = (HBaseSchema)schema().simpleSchema(); 212 } 213 setProcessor(new HBaseSQLClientProcessor(this)); 214 } 215 return _conn; 216 } 217 218 @Override 219 public void close() { 220 log.debug("Closing"); 221 try { 222 if (_conn != null) { 223 _conn.close(); 224 } 225 } 226 catch (SQLException e) { 227 log.warn("Cannot close JDBC", e); 228 } 229 _conn = null; 230 super.close(); 231 } 232 233 /** Give simple {@link HBaseSchema}. 234 * @return The simple {@link HBaseSchema}. */ 235 public HBaseSchema simpleSchema() { 236 return _simpleSchema; 237 } 238 239 /** Give SQL table name. 240 * @return The SQL table name. */ 241 public String sqlTableName() { 242 return _sqlTableName; 243 } 244 245 private HBaseSchema _simpleSchema; 246 247 private Connection _conn = null; 248 249 private String _sqlTableName = null; 250 251 /** Logging . */ 252 private static Logger log = LogManager.getLogger(HBaseSQLClient.class); 253 254 }