Using Using Kinesis Connector to Elasticsearch on streaming data search and interaction

By Marion Myers,2015-10-18 08:07
39 views 0
Using Using Kinesis Connector to Elasticsearch on streaming data search and interaction

    Using Kinesis Connector to Elasticsearch on streaming

    data search and interaction

    Recently, amazon's official blog Posting blog introduced how to use Kinesis Connector to Elasticsearch flow data search and interaction, which is used for help developers to easily develop a downloaded from Kinesis real-time and reliable data to the application of Elasticsearch cluster mass flow.

    According to the official, Elasticsearch is an open source search and analysis of the engine, can real-time indexing structured and unstructured data.K ibana is Elasticsearch data visualization engine, mainly is to help the technical operations staff and business analysts to set the interaction of the panel.Elasticsearch cluster data can also be through a RESTful API or app SDKs and programming for a visit.You can according to the instance of CloudFormation templates to create an Amazon Elastic Compute Cloud (EC2) on Elasticsearch, its byAuto ScalingAuthorized management.

    Kinesis, Elasticsearch and Kibana

    The block diagram below shows the combination relationship between them:

    Using the new Kinesis Connector to Elasticsearch to create an application, you can use the Kinesis processing data, Elasticsearch cluster can also can be the index data, at the same time can also be transformed before to Elastricsearch, filtering and buffer record, also can adjust the Elasticsearch index of special operations to add such as survival time, the version

    number, model, and based on each record ID field.The figure below shows the record of the process:

    Note: you can carry out the whole Connector by Elasticsearch cluster using River pipeline.

    The first step

    Your code to achieve the following functions:

    1. Set specific configuration of the program

    2. Use the Transformer, Filter, Buffer and create and configure a

    KinesisConnectorPipeline Emitter

    3. Create a continuous operation of the pipeline KinesisConnectorExecutor

    Above all components have a default Settings, you can use custom logic to replace him.

    Configure the Connector properties

    One instance. The properties and a configuration file, there are a lot of Settings, most of them can use the default Settings.Such as the following Settings:

    1. Configure the Connector after collecting the lowest 1000 records to

    Elasticsearch bulk loading data;

    2. Using local endpoint Elasticsearch cluster testing.

    bufferRecordCountLimit = 1000

elasticSearchEndpoint = localhost

    Implementation of Pipeline components

    To write the Transformer, Filter, Buffer and the Emitter, your code must realize

    IKinesisConnectorPipeline interface.

    public class ElasticSearchPipeline implements

     IKinesisConnectorPipeline public IEmitter getEmitter

     (KinesisConnectorConfiguration configuration) {

     return new ElasticSearchEmitter(configuration); }

    public IBuffer getBuffer(

     KinesisConnectorConfiguration configuration) {

     return new BasicMemoryBuffer(configuration); }

    public ITransformerBase getTransformer

     (KinesisConnectorConfiguration configuration) {

     return new StringToElasticSearchTransformer(); }

    public IFilter getFilter

     (KinesisConnectorConfiguration configuration) {

     return new AllPassFilter();


    This code implements the abstract factory method, indicates the you want to use the


    public KinesisConnectorRecordProcessorFactory

     getKinesisConnectorRecordProcessorFactory() {

     return new KinesisConnectorRecordProcessorFactory

     ElasticSearchObject>(new ElasticSearchPipeline(), config);


    Define the Executor

    The following code defines an input record as a string, the output of the Kinesis for KinesisConnectorPipeline pipeline.

    public class ElasticSearchExecutor extends


    This code implements the main method, to create the Executor and start running: public static voidmain(String[] args) {

     KinesisConnectorExecutor executor

     = new ElasticSearchExecutor(configFile);;


    At this point, please make sure your AWS certification is correct, rely on ant setup project Settings, using ant run run the application, all the codeGitHub, you can immediately start, you can post your questions in the comments, we discuss together.

    Kinesis Client Library and Kinesis Connector Library

    Released in September 2013, amazon AWS Kinesis, are introduced Kinesis Client Library.Developers will be able to pass the Client Library to create processing flow data process, it can deal with such as streaming data load balance and coordinate the distributed server, adjust the flow volume change, complex problems such as fault tolerant way.

    In processing the input stream technology, amazon AWS release a lot of service product, developers will be able to learn about, or help their own progress.The main products include Amazon DynamodB, Amazon Redshift and Amazon Simple Storage Service (S3) Kinesis Connector Library, Kinesis Storm Spout, Amazon EMR Connector.

Report this document

For any questions or suggestions please email