Saturday, March 23, 2013

Opensource Splunk

There are many blogs and StackOverflow answers drafted  the Opensource Splunk equivalent architecture such architecture has been implemented as a experiment to monitor the production application.

Experimental Architecture:


Opensource Tools:

Apache Flume: Apache Flume© is a distributed opensource solution to collect and aggregate the applications logs and events from different streams. Flume can aggregate the logs into the centralized data store. Flume works based on the Source, Channel and Sink configurations. Flume needs to configured based on the application requirements. In this experiment Flume will be used for log collection and aggregation.

Apache Hadoop: Aggregated logs will be preserved in the HDFS file system in the clustered environment. PIG process will generate the indexes out of the stored bulk application data.

Elastic Search / Lucene: Search Index query engine to index the big data in the clustered environment. PIG periodically publishes the indexes into the search engine.

Front end application can query the index based on the custom queries.

Proof-Of-Concept (POC) Implementation:

Used simple architecture of flume -> elastic search engine -> kibana. Flume supports different types of source and possible to create a custom source as well, for this POC Flume ExecSource has been extended. Custom source will parse the generated logs and store the index in the Elastic Search Engine using ElasticSearchSink. Kibana a opensoruce ruby based front end interface expects all the index in the Logstash format displays the search results based on the user query.

Custom Flume Source

Configuration

In addition to exec source configuration following new configuration introduced,

   - delimiter : Log delimiter
   - fieldFomat : format of the message expected format, "header | fields"

     Format: timestamp, source, type, host, message | name:field-1, name:field-2,
  
  •      timestamp: Can be derived from the log message or current timestamp
  •      source: Can be derived from the log message or exec source
  •      host: Can be derived from the log message or system host
  •      message: entire message or specific field range
    Example Config: 

esagent.sources = genlogs
esagent.channels = memoryChannel
esagent.sinks = essink

esagent.sources.genlogs.type = logforwarder.flume.source.LogForwarderSource
esagent.sources.genlogs.command = tail -f /home/rathish/Projects/Log_Aggregator/tools/test.log
esagent.sources.genlogs.channels = memoryChannel
esagent.sources.genlogs.fieldFormat = timestamp,test,log,field-4,message|type:field-2,format:field-3,domain:field-5,url:field-11,usecase:field-14,trackingid:field-15
esagent.sources.genlogs.delimiter=,

esagent.sinks.essink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
esagent.sinks.essink.hostNames = localhost:9300
esagent.sinks.essink.indexName = logstash
esagent.sinks.essink.channel = memoryChannel
esagent.sinks.essink.batchSize = 10

esagent.channels.memoryChannel.type = memory
esagent.channels.memoryChannel.capacity = 100
esagent.channels.memoryChannel.transactionCapacity = 100


Source
  
package logforwarder.flume.source;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
 *
 * @author rathish
 *
 *         LogForwarder Source extends Flume.AbstractSource and polls for the
 *         log. Based on the configuration generated logs will be parsed and
 *         sent for indexing.
 *
 *         Forwarder expects following configuration
 *
 *         - All the configuration of "exec" flume source - delimiter : log
 *         delimiter - Log Fields : fields in the generated log (comma
 *         separated), timestamp, source, type, host, message fields are
 *         mandatory. Hard-coded values can be configured. Format:
 *
 *         timestamp, source, type, host, message | field1, field2, ...
 *
 *         Example:
 *
 *         field-1, field-5, log, field-7, field-6:9 | name:field-10,
 *         path:field:21
 *
 *
 */
public class LogForwarderSource extends AbstractSource implements
        EventDrivenSource, Configurable {

    private static final Logger logger = LoggerFactory
            .getLogger(LogForwarderSource.class);

    private String command;
    private CounterGroup counterGroup;
    private ExecutorService executor;
    private Future runnerFuture;
    private long restartThrottle;
    private boolean restart;
    private boolean logStderr;
    private Integer bufferCount;
    private ExecRunnable runner;
    private String delimiter;
    private String fieldFormat;
   

    @Override
    public void start() {
        logger.info("Exec source starting with command:{}", command);

        executor = Executors.newSingleThreadExecutor();
        counterGroup = new CounterGroup();

        runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
                restart, restartThrottle, logStderr, bufferCount, delimiter, fieldFormat);

        // FIXME: Use a callback-like executor / future to signal us upon
        // failure.
        runnerFuture = executor.submit(runner);

        /*
         * NB: This comes at the end rather than the beginning of the method
         * because it sets our state to running. We want to make sure the
         * executor is alive and well first.
         */
        super.start();

        logger.debug("Exec source started");
    }

    @Override
    public void stop() {
        logger.info("Stopping exec source with command:{}", command);

        if (runner != null) {
            runner.setRestart(false);
            runner.kill();
        }
        if (runnerFuture != null) {
            logger.debug("Stopping exec runner");
            runnerFuture.cancel(true);
            logger.debug("Exec runner stopped");
        }

        executor.shutdown();

        while (!executor.isTerminated()) {
            logger.debug("Waiting for exec executor service to stop");
            try {
                executor.awaitTermination(500, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.debug("Interrupted while waiting for exec executor service "
                        + "to stop. Just exiting.");
                Thread.currentThread().interrupt();
            }
        }

        super.stop();

        logger.debug("Exec source with command:{} stopped. Metrics:{}",
                command, counterGroup);
    }

    @Override
    public void configure(Context context) {
        command = context.getString("command");

        Preconditions.checkState(command != null,
                "The parameter command must be specified");

        restartThrottle = context.getLong(
                LogForwarderSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
                LogForwarderSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);

        restart = context.getBoolean(
                LogForwarderSourceConfigurationConstants.CONFIG_RESTART,
                LogForwarderSourceConfigurationConstants.DEFAULT_RESTART);

        logStderr = context.getBoolean(
                LogForwarderSourceConfigurationConstants.CONFIG_LOG_STDERR,
                LogForwarderSourceConfigurationConstants.DEFAULT_LOG_STDERR);

        bufferCount = context.getInteger(
                LogForwarderSourceConfigurationConstants.CONFIG_BATCH_SIZE,
                LogForwarderSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
       
        delimiter = context.getString(
                LogForwarderSourceConfigurationConstants.CONFIG_DELIMITER,
                LogForwarderSourceConfigurationConstants.DEFAULT_DELIMITER);
       
        fieldFormat = context.getString(
                LogForwarderSourceConfigurationConstants.CONFIG_FIELD_FORMAT,
                LogForwarderSourceConfigurationConstants.DEFAULT_FIELD_FORMAT);
       
    }

    private static class ExecRunnable implements Runnable {

        public ExecRunnable(String command, ChannelProcessor channelProcessor,
                CounterGroup counterGroup, boolean restart,
                long restartThrottle, boolean logStderr, int bufferCount, String delimiter, String fieldFormat) {
            this.command = command;
            this.channelProcessor = channelProcessor;
            this.counterGroup = counterGroup;
            this.restartThrottle = restartThrottle;
            this.bufferCount = bufferCount;
            this.restart = restart;
            this.logStderr = logStderr;
            this.delimiter = delimiter;
            this.fieldFormat = fieldFormat;
        }

        private String command;
        private ChannelProcessor channelProcessor;
        private CounterGroup counterGroup;
        private volatile boolean restart;
        private long restartThrottle;
        private int bufferCount;
        private boolean logStderr;
        private Process process = null;
        private String delimiter;
        private String fieldFormat;

        @Override
        public void run() {
            do {
                String exitCode = "unknown";
                BufferedReader reader = null;
                try {
                    String[] commandArgs = command.split("\\s+");
                    process = new ProcessBuilder(commandArgs).start();
                    reader = new BufferedReader(new InputStreamReader(
                            process.getInputStream()));

                    // StderrLogger dies as soon as the input stream is invalid
                    StderrReader stderrReader = new StderrReader(
                            new BufferedReader(new InputStreamReader(
                                    process.getErrorStream())), logStderr);
                    stderrReader.setName("StderrReader-[" + command + "]");
                    stderrReader.setDaemon(true);
                    stderrReader.start();

                    String line = null;
                    List eventList = new ArrayList();
                    while ((line = reader.readLine()) != null) {
                        counterGroup.incrementAndGet("exec.lines.read");
                       
                        String message = "";
                       
                        // Delimit the lines based on the configured delimiter
                        //
                        String[] stringArray = line.split(delimiter);
                       
                        logger.debug("----->" + fieldFormat);
                       
                        // Construct the event header based on the delimited values
                        //
                        String[] format = fieldFormat.split("\\|");
                        logger.debug("----->" + format[0]);
                       
                        // Header details
                        //
                        String[] headerList = format[0].split(",");
                       
                        logger.debug("----->" + headerList.length);
                       
                        // Additional Fields
                        //
                        String[] fields = format[1].split(",");
                       
                        // Expected format
                        //
                        // timestamp, source, type, host, message
                        //
                       
                        Map headers = new HashMap();
                       
                        if (headerList[0].equals("timestamp")) {
                            headers.put("timestamp", "" + Calendar.getInstance().getTimeInMillis());
                        } else if (headerList[0].startsWith("field")){
                            String[] list = headerList[0].split("-");
                            if (list.length > 0) {
                                headers.put("timestamp", stringArray[Integer.parseInt(list[1])]);
                            }
                        }
                        // Extract the source
                        if (headerList[1].equals("source")) {
                            headers.put("source", command);
                        } else if (headerList[1].startsWith("field")){
                            String[] list = headerList[0].split("-");
                            if (list.length > 0) {
                                headers.put("source", stringArray[Integer.parseInt(list[1])]);
                            }
                        }
                        // Extract the type
                        if (headerList[2].equals("type")) {
                            headers.put("type", "log");
                        } else if (headerList[1].startsWith("field")){
                            String[] list = headerList[0].split("-");
                            if (list.length > 0) {
                                headers.put("type", stringArray[Integer.parseInt(list[1])]);
                            }   
                        }
                        // Extract the host
                        if (headerList[3].equals("host")) {
                            headers.put("host", InetAddress.getLocalHost().getHostName());
                        } else if (headerList[1].startsWith("field")){
                            String[] list = headerList[0].split("-");
                            if (list.length > 0) {
                                headers.put("host", stringArray[Integer.parseInt(list[1])]);
                            }
                        }
                        // Extract the message
                        if (headerList[4].equals("message")) {
                            message = line;
                        } else if (headerList[1].startsWith("field")){
                            String[] list = headerList[0].split("-");
                            if (list.length > 0) {
                                String[] sequence = list[1].split(":");
                               
                                int from = Integer.parseInt(sequence[0]);
                                int to = Integer.parseInt(sequence[1]);
                               
                                String msg = "";
                                for (int count = from; count <= to; count++) {
                                    msg = msg + " " + stringArray[count];
                                }
                                message = msg;
                            }
                        }
                       
                        for (int fieldCount = 0; fieldCount < fields.length; fieldCount++) {
                            String[] fieldList = fields[fieldCount].split(":");
                            if (fieldList.length > 0) {
                                String fieldName = fieldList[0];
                                String fieldValue = fieldList[1].split("-")[1];
                                if (stringArray.length >= Integer.parseInt(fieldValue)) {
                                    headers.put(fieldName, stringArray[Integer.parseInt(fieldValue)]);
                                }
                            }
                        }
                       
                        // Store the message
                        //
                        eventList.add(EventBuilder.withBody(message,
                                Charset.defaultCharset(), headers));

                       
                        if (eventList.size() >= bufferCount) {
                            channelProcessor.processEventBatch(eventList);
                            eventList.clear();
                        }
                    }
                    if (!eventList.isEmpty()) {
                        channelProcessor.processEventBatch(eventList);
                    }
                } catch (Exception e) {
                    logger.error("Failed while running command: " + command, e);
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                } finally {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (IOException ex) {
                            logger.error(
                                    "Failed to close reader for exec source",
                                    ex);
                        }
                    }
                    exitCode = String.valueOf(kill());
                }
                if (restart) {
                    logger.info("Restarting in {}ms, exit code {}",
                            restartThrottle, exitCode);
                    try {
                        Thread.sleep(restartThrottle);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    logger.info("Command [" + command + "] exited with "
                            + exitCode);
                }
            } while (restart);
        }

        public int kill() {
            if (process != null) {
                synchronized (process) {
                    process.destroy();
                    try {
                        return process.waitFor();
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    }
                }
                return Integer.MIN_VALUE;
            }
            return Integer.MIN_VALUE / 2;
        }

        public void setRestart(boolean restart) {
            this.restart = restart;
        }
    }

    private static class StderrReader extends Thread {
        private BufferedReader input;
        private boolean logStderr;

        protected StderrReader(BufferedReader input, boolean logStderr) {
            this.input = input;
            this.logStderr = logStderr;
        }

        @Override
        public void run() {
            try {
                int i = 0;
                String line = null;
                while ((line = input.readLine()) != null) {
                    if (logStderr) {
                        logger.info("StderrLogger[{}] = '{}'", ++i, line);
                    }
                }
            } catch (IOException e) {
                logger.info("StderrLogger exiting", e);
            } finally {
                try {
                    if (input != null) {
                        input.close();
                    }
                } catch (IOException ex) {
                    logger.error(
                            "Failed to close stderr reader for exec source", ex);
                }
            }
        }

    }

}


Execution

Start the elastic search engine "./bin/elasticsearch -f" command. Start the flume forwarder with "./bin/flume-ng agent -c conf -f esagent.conf -n esagent" agent will generate the events and sink them in the elastic search engine. Kibana can query the indexes and renders in the browser.




Experiment proved the expected results , we have planned to create a custom flume source to periodically collect the system statistics like disk, cpu, etc usage using sigar api and provided new possibilities.






2 comments:

Unknown said...

Great work! Did you evaluate fluentd & logstash before deciding on flume?

Unknown said...

Greetings Rajarajan,
I am able to sink data to Splunk from Flume but the header information is removed from the event therefore, I can't do searches on the data. Any ideas how to resolve this. Do I need a custom flume source
"FieldFormat"?