JIRA :
EAGLE-81
-
Getting issue details...
STATUS
Notification Plugin should provide interface which accepts the Eagle Alert Entity and return Status of the same.
Architecture:
Today in Eagle there is no Notification Plugin Framework where users can implement the Notification Plugin Interface and upload their own library. This Plugin allows user to implement their own and also provides below default implementation
- Sending Email Alert
- Persisting Message to Kafka
- Writing Entities to Eagle Service
Notification Interface:
package org.apache.eagle.notification;
import org.apache.eagle.alert.entity.AlertAPIEntity;
/**
* Notification Plug-in interface which provide abstraction layer to notify different system
*/
public interface NotificationPlugin {
/**
* for initialization
* @throws Exception
*/
public void _init() throws Exception;
/**
* Post a notification for the given alertEntity
* @param alertEntity
*/
public void onAlert( AlertAPIEntity alertEntity );
/**
* Returns Status of Notification Post
* @return
*/
public NotificationStatus getStatus();
}
At the time of Eagle Topology starts , Code should scan and register the Notification Type ( Custom Notification / Email / Kafka Message etc.. ). Since Eagle allows users to implement their own Notification Implementation , Our Topology Init code should automatically detects and register which type of Notification needs to be used .
public class MessageNotification implements NotificationPlugin {
}
When users deploys their code with Eagle Service , Our Topology Initializer have to detect MessageNotification and Register it automatically.
How to select Notification When defining Policy ?
Eagle should allow users to select the Notification Type at the time of Policy Creation.. For that we need to persist the detected policies in HBase and Provide API on Top of It to Query.
To ensure the consistency always delete the Notification Table and Recreate with detected Notifications.
Persisting Message to Kafka:
@Resource(name = "Kafka Store" , description = "Persist Alert Entity to Kafka")
@SuppressWarnings({ "rawtypes", "unchecked" })
public class PersistAlertToKafkaTopic implements NotificationPlugin {
private NotificationStatus status;
private static final Logger LOG = LoggerFactory.getLogger(PersistAlertToKafkaTopic.class);
private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>();
private Map<String, KafkaTopicConfig> kafaConfigs = new ConcurrentHashMap<String, KafkaTopicConfig>();
private Config config;
private AlertDefinitionDAO alertDefinitionDao;
@Override
public void _init() throws Exception {
config = EagleConfigFactory.load().getConfig();
String site = config.getString("eagleProps.site");
String dataSource = config.getString("eagleProps.dataSource");
activeAlerts.clear();
kafaConfigs.clear();
alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
try{
activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Kafka Store");
}catch (Exception ex ){
LOG.error(ex.getMessage());
throw new Exception(" Kafka Store Cannot be initialized . Reason : "+ex.getMessage());
}
Set<String> policies = activeAlerts.keySet();
for( String policyId : policies )
{
Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
KafkaTopicConfig kafkaConfig = new KafkaTopicConfig();
try {
kafkaConfig = JsonSerDeserUtils.deserialize(activeAlerts.get(policyId).getNotificationDef(), KafkaTopicConfig.class, Arrays.asList(module));
this.kafaConfigs.put(policyId,kafkaConfig);
}catch (Exception ex){
LOG.error(" Exception when initializing PersistAlertToKafkaTopic. Reason : "+ex.getMessage());
}
}
}
@Override
public void onAlert(AlertAPIEntity alertEntity) {
try{
status = new NotificationStatus();
processAlertEntity(alertEntity);
status.setNotificationSuccess(true);
}catch(Exception ex ){
LOG.error(" Exception when Posting Alert Entity to Kafaka Topic. Reason : "+ex.getMessage());
status.setMessage(ex.getMessage());
}
}
/**
* Access KafkaProducer and send entity to Bus
* @param alertEntity
* @throws Exception
*/
public void processAlertEntity( AlertAPIEntity alertEntity ) throws Exception {
KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer();
producer.send(createRecord(alertEntity));
}
/**
* To Create KafkaProducer Record
* @param entity
* @return
* @throws Exception
*/
public ProducerRecord createRecord(AlertAPIEntity entity ) throws Exception {
String policyId = entity.getTags().get(AlertConstants.POLICY_ID);
ProducerRecord record = new ProducerRecord( this.kafaConfigs.get(policyId).getKafkaTopic(), entity.toString());
return record;
}
@Override
public NotificationStatus getStatus() {
return status;
}
Email Alert Notification:
Email Notification allows us to send email alert/ sms to the configured email id.
@Resource(name = "Email Notification" , description = " Email Notification API to trigger email/sms ")
public class EmailNotification implements NotificationPlugin {
private static final Logger LOG = LoggerFactory.getLogger(EmailNotification.class);
private Map<String, AlertDefinitionAPIEntity> activeAlerts = new ConcurrentHashMap<String, AlertDefinitionAPIEntity>();
static Map<String, List<EmailGenerator>> emailGenerators = new ConcurrentHashMap<String, List<EmailGenerator>>();
private Config config;
private AlertDefinitionDAO alertDefinitionDao;
private NotificationStatus status ;
/* initialize Email Notification related Objects Properly */
public void _init() throws Exception {
// Get Config Object
config = EagleConfigFactory.load().getConfig();
String site = config.getString("eagleProps.site");
String dataSource = config.getString("eagleProps.dataSource");
activeAlerts.clear();
emailGenerators.clear();
// find out all policies and its notification Config
alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
try{
activeAlerts = alertDefinitionDao.findActiveAlertDefsByNotification( site , dataSource ,"Email Notification");
}catch (Exception ex ){
LOG.error(ex.getMessage());
throw new Exception(" Email Notification Cannot be initialized . Reason : "+ex.getMessage());
}
// Create Email
Set<String> policies = activeAlerts.keySet();
for( String policyId : policies ){
AlertDefinitionAPIEntity alertDef = activeAlerts.get(policyId);
List<EmailGenerator> tmpList = createEmailGenerator(alertDef);
this.emailGenerators.put(policyId , tmpList);
}
}
@Override
public void onAlert(AlertAPIEntity alertEntity) {
status = new NotificationStatus();
String policyId = alertEntity.getTags().get(AlertConstants.POLICY_ID);
System.out.println(" Email Notification ");
List<EmailGenerator> generatorList = this.emailGenerators.get(policyId);
boolean isSuccess = false;
for( EmailGenerator gen : generatorList ) {
isSuccess = gen.sendAlertEmail(alertEntity);
if( !isSuccess ) {
status.setMessage(" Failed to send email ");
status.setNotificationSuccess(false);
}else
status.setNotificationSuccess(true);
}
}
}
PersistAlertToEagle Service:
If user selects this Notification Plugin , all alerts will be persisted into Eagle Store.
/**
* Responsible to persist Alerts to Eagle Storage
*/
@Resource(name = "Eagle Store" , description = "Persist Alert Entity to Eagle Store")
public class PersistToEagleStore implements NotificationPlugin {
private static final Logger LOG = LoggerFactory.getLogger(PersistToEagleStore.class);
private NotificationStatus status;
private Config config;
private EagleAlertPersist persist;
@Override
public void _init() throws Exception {
config = EagleConfigFactory.load().getConfig();
this.persist = new EagleAlertPersist(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port"),
config.getString("eagleProps.eagleService.username"), config.getString("eagleProps.eagleService.password"));
}
@Override
public NotificationStatus getStatus() {
return this.status;
}
@Override
public void onAlert(AlertAPIEntity alertEntity) {
try{
status = new NotificationStatus();
List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>();
list.add(alertEntity);
persist.doPersist( list );
status.setNotificationSuccess(true);
}catch (Exception ex ){
status.setMessage(ex.getMessage());
LOG.error(" Exception when Posting Alert Entity to Eagle Service Topic. Reason : "+ex.getMessage());
}
}
}
Notification Manager :
Notification Manager responsible for
1) Scanning Notification Plugins
2) Loading all Plugins/ Persisting into alertNotifications Table
3) Broadcasting any updates in Policy to all Notification Plugins
/**
* Notification manager that is responsible for
* <p> Scanning Plugins </p>
* <p> Loading Plugins and Policy Mapping </p>
* <p> Initializing Plugins </p>
* <p> Forwarding eagle alert to configured Notification Plugin </p>
* <p> BroadCast Changes in Policy to all Notification Plugins </p>
*/
public class NotificationManager {
public static Map<String, String > policyNotificationMapping = new ConcurrentHashMap<String,String>();
private static final Logger LOG = LoggerFactory.getLogger(NotificationManager.class);
/**
* Static Initializer of Manager
*/
static {
policyNotificationMapping.clear();
// initialize all Notification Plugins
_init();
}
/**
* Initialization of Notification Manager
*/
private static void _init() {
policyNotificationMapping.clear();
Set<String> plugins = NotificationPluginLoader.notificationMapping.keySet();
for( String plugin : plugins ){
try {
Object obj = NotificationPluginLoader.notificationMapping.get(plugin);
obj.getClass().getMethod("_init").invoke(obj); // invoke _init method of all notification plugins
} catch (Exception e) {
LOG.error(" Error in loading Plugins . Reason : "+e.getMessage());
}
}
Config config = EagleConfigFactory.load().getConfig();
String site = config.getString("eagleProps.site");
String dataSource = config.getString("eagleProps.dataSource");
// find notification Types
AlertDefinitionDAO alertDefinitionDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(config.getString("eagleProps.eagleService.host"), config.getInt("eagleProps.eagleService.port")));
try{
List<AlertDefinitionAPIEntity> activeAlerts = alertDefinitionDao.findActiveAlertDefs( site , dataSource );
for( AlertDefinitionAPIEntity entity : activeAlerts ){
policyNotificationMapping.put(entity.getTags().get(AlertConstants.POLICY_ID) , entity.getTags().get(AlertConstants.NOTIFICATION_TYPE));
}
}catch (Exception ex ){
LOG.error(" Error in determining policy and its notification type. Reason : "+ex.getMessage());
}
}
/**
* To Pass Alert to respective Notification Plugins
* @param entity
*/
public void notifyAlert( AlertAPIEntity entity ) {
try {
Object obj = getNotificationPluginAPI( this.policyNotificationMapping.get(entity.getTags().get(AlertConstants.POLICY_ID)) );
obj.getClass().getMethod("onAlert" , new Class[]{AlertAPIEntity.class}).invoke( obj , entity);
} catch ( Exception ex) {
LOG.error(" Error in NotificationManager when invoking NotifyAlert method . Reason : "+ex.getMessage());
}
}
/**
* Returns Notification Plugin for the given Type
* @param type
* @return
*/
private Object getNotificationPluginAPI( String type ){
return NotificationPluginLoader.notificationMapping.get(type);
}
/**
* Update all Notification Plugin if changes in Policy
* @param entity
*/
public void updateNotificationPlugins( AlertDefinitionAPIEntity entity ){
try {
// Re Load the plugins
// Re visit this , this should be in better way
NotificationPluginLoader.loadPlugins();
// Re initialize Notification Manager
_init();
} catch (Exception e) {
LOG.error(" Error in updateNotificationPlugins . Reason : "+e.getMessage());
}
}
How to Create Policy with Notification Type ?:
When we create Policy , we need to select what is the Notification Type for the Policy.
Example :
Policy With Email Notification:
curl -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Email Notification\",\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}],"remediationDef":"","enabled":true}]'
Policy With Kafka Topic:
curl -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Kafka Store\" , \"kafkaTopic\":\"alerts\"} ]","remediationDef":"","enabled":true}]'
Policy With Eagle Store:
curl -X POST -H ''Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Eagle Store\"}]","remediationDef":"","enabled":true}]'
Mulitiple Plugins:
curl -u -X POST -H 'Content-Type:application/json' "http://localhost:8080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"hdfsAuditLog","alertExecutorId":"hdfsAuditLogAlertExecutor","policyId":"sensititvityAlert","policyType":"siddhiCEPEngine"},"desc":"view private file","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from hdfsAuditLogEventStream[(cmd=='\'open\'') and (src=='\'/tmp/private\'')] select * insert into outputStream\"}","dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}","notificationDef": "[{\"notificationType\":\"Esclate To ExternalSys\"},{\"notificationType\":\"Eagle Store\"},{\"notificationType\":\"Kafka Store\"
, \"kafkaTopic\" : \"alerts\"}]","remediationDef":"","enabled":true}]'
Querying Available Notifications :
http://localhost:8080/eagle-service/rest/entities?query=AlertNotificationService%5B%5D%7B*%7D&pageSize=2147483647&startTime=1970-01-01%2000:00:00&endTime=1970-01-11%2000:00:00&treeAgg=false
{"meta":{"elapsedms":77,"totalResults":4,"lastTimestamp":86400000,"firstTimestamp":86400000},"success":true,
"obj":[
{"prefix":"alertNotifications","tags":{"notificationType":"Eagle Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exUY4H4U","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Forward to External System"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exbzI93E","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Email Notification"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exe4g2y8","enabled":true},
{"prefix":"alertNotifications","tags":{"notificationType":"Kafka Store"},
"encodedRowkey":"WSdQ7H_____62aP_YA4exfPVxyc","enabled":true}
],
"type":"org.apache.eagle.alert.entity.AlertNotificationEntity"}
Config:
For creating Kafka Producer we need Kafka broker.
"eagleNotificationProps" : {
"kafka_broker":"localhost:9092"
}
For now , We Support only one Kafka Cluster where eagle reads from application.conf.
Alert Notification Executor:
Once the AlertAPIEntity generated from Policy Evaluation , AlertNotifcation Executor picks it up and send it to Configured Notification Plugin .
@Override
public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
String policyId = (String) input.get(0);
AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
processAlerts(policyId, Arrays.asList(alertEntity));
}
private void processAlerts(String policyId, List<AlertAPIEntity> list) {
for (AlertAPIEntity entity : list) {
notificationManager.notifyAlert(entity);
}
}
@Override
public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
for(AlertDefinitionAPIEntity alertDef : added.values()){
LOG.info("alert notification config really changed " + alertDef);
notificationManager.updateNotificationPlugins( alertDef );
}
}
In future , we will decouple this Notification Executor later once we finalize the design for Generic Notification ( which accepts alerts from any System ).
How to Use Eagle Notification Plugin for Custom Implementation ?
Users should implement NotificationPlugin and write own logic .
Pls read Configuration from Config Object of Eagle .
For Example :
"notificationDef": "{\"kafkaTopic\":\"notification_topic_kafka\"}"
In code you need to get AlertDefinitionEntity.getNotificationDef() will return you the above JSON Str.. Once you get the config you can Parse it for your use.
Or you can simply forward it to external system .
public CustomNotificationSink implements NotificationPlugin {
// Do your Plugin Initialization like Connecting to External System etc...
public void _init() throws Exception {
}
// Eagle Notification Manager forwards AlertAPIEntity to this onAlert method
public void onAlert() {
// Where to forward this alerts logic should be here ?
}
public NotificationStatus getStatus(){
// return the status of notification
}
}
Development Steps:
To make use of Eagle Notification Plugin , below are the different steps to be followed.
1) Copy eagle jars into your library
2) Add eagle jars as dependency in pom.xml
3) Implement Notification Plugin Interface
4) Write your custom logic