Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlflow integrate with zeppelin (SQLFlowInterpreter.java) #11

Closed
wants to merge 5 commits into from
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions SQLFlowInterpreter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.apache.zeppelin.sqlflow;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.sqlflow.client.MessageHandlerZeppelin;
import org.sqlflow.client.SQLFlow;
import org.apache.zeppelin.sqlflow.client.utils.EnvironmentSpecificSQLFlowClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;

public class SQLFlowInterpreter extends Interpreter {
// Print Log
private static final Logger LOGGER = LoggerFactory
.getLogger(SQLFlowInterpreter.class);

private Map<String, String> parameters = new HashMap<String, String>();

public static final String SQLFLOW_SERVERADDR = "sqlflow.serverAddr";
public static final String MYSQL_USERNAME = "mysql.username";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we only support mysql databases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't just support mysql database. For other SQL engines, including Oracle, Hive, SparkSQL, Flink, etc., developers can expand and improve according to actual needs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we still use mysql.username to specific the hive username ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This initial version has not yet opened the connection to the hive database.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This initial version has not yet opened the connection to the hive database.

Let's do this under the new issue.

public static final String MYSQL_PASSWORD = "mysql.password";
public static final String MYSQL_SERVERADDR = "mysql.serverAddr";
public static final String MYSQL_DATABASENAME = "mysql.databaseName";

public SQLFlowInterpreter(Properties properties) {
super(properties);
}

@Override
public void open() throws InterpreterException {
// get the configured parameters from the front page
String sqlflowServerAddr = getProperty(SQLFLOW_SERVERADDR);
String mysqlUserName = getProperty(MYSQL_USERNAME);
String mysqlPassword = getProperty(MYSQL_PASSWORD);
String mysqlAddr = getProperty(MYSQL_SERVERADDR);
String mysqlDatabaseName = getProperty(MYSQL_DATABASENAME);

parameters.put(SQLFLOW_SERVERADDR, sqlflowServerAddr);
parameters.put(MYSQL_USERNAME, mysqlUserName);
parameters.put(MYSQL_PASSWORD, mysqlPassword);
parameters.put(MYSQL_SERVERADDR, mysqlAddr);
parameters.put(MYSQL_DATABASENAME, mysqlDatabaseName);
}

@Override
public void cancel(InterpreterContext arg0) throws InterpreterException {
}

@Override
public void close() throws InterpreterException {
}

@Override
public FormType getFormType() throws InterpreterException {
return FormType.SIMPLE;
}

@Override
public int getProgress(InterpreterContext arg0) throws InterpreterException {
return 20;
}

@Override
public InterpreterResult interpret(String query, InterpreterContext context)
throws InterpreterException {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blank.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless blank removed.

if (StringUtils.isBlank(query)) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
}

// Create user
SQLFlow client = EnvironmentSpecificSQLFlowClient.getClient(
new MessageHandlerZeppelin(context), parameters);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless blank.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless blank removed.

try {
// Run custom sqlflow code
client.run(query);
// Release user
client.release();

return new InterpreterResult(InterpreterResult.Code.SUCCESS);
} catch (InterruptedException e) {
LOGGER.error("client error", e);
return new InterpreterResult(InterpreterResult.Code.ERROR);
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blank

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless blank removed.

}