Logstash , JDBC Input Plug-in work like a adapter to send your database detail to Elasticsearch so that utilize for full text search, query, analysis and show in form of Charts and Dashboard to Kibana.
In below example I will explain about how to create Logstash configuration file by using JDBC Input Plug-in for Oracle Database and output to Elasticsearch .
Pre-requisite:
- Logstash 5.xx installed
- Elasticsearch 5.xx installed
- Java 7/8 Installed
Sample Data:
Below sample data is from defect_detail table where defect id as numeric value and increment continuously in ascending order.
defect_id owned_by severity status summary application created_by creation_date modified_by modified_date assigned_to 530812 Ramesh Severity 3 Cacelled Customer call 5 time TEST-APP Saurabh 7/3/2017 15:44 Gaurav 8/19/2017 6:22 Development 530828 Neha Severity 1 Cancelled Dealer Code Buyer on behalf TEST-APP-5 Rajan 7/3/2017 16:20 Nilam 8/17/2017 9:29 Development 540829 Ramesh Severity 1 Retest Completed Client Not want Bulk call TEST-APP-4 Rajiv 7/24/2017 11:29 Raghav 8/5/2017 20:00 IST
Configuration File :
Below configuration file is setup to read data from Oracle database , will execute query in every 15 minute and read records after last run value of defect id . We should always use order by for column for which need to use last run value as configured for defect_id having numeric column.
If you are using any other database like MYSQL, SQLServer, DB2 etc. change jdbc_driver_library and jdbc_connection_string according to database. Because every database have there own query format so update query accordinly.
Copy below content and create file in bin directory as /bin/logstash-jdbc-defect.conf
input { jdbc { #Path to download jdbc deriver and add in class path jdbc_driver_library => "../jar/ojdbc6.jar" # ORACLE Driver Class jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" # ORACLE database jdbc connection string , jdbc:oracle:thin:@hostname:PORT/SERVICE jdbc_connection_string => "jdbc:oracle:thin:@hostname:1521/service" #The user and password to connect to database jdbc_user => "username" jdbc_password => "password" #Use when need to read password from file #jdbc_password_filepath => "/opt/app/password-path-location" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #Configure Cron to How frequent want execute query in database schedule => "*/15 * * * *" #Use below if query is big and want to store in separate file #statement_filepath =>"../query/remedy-tickets-details.sql" #Use for Inline query and if want to execute record after last run compare with value sql_last_value that can be numeric or timestamp statement => "select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id" #Below is configuration when want to use last run value clean_run=>true use_column_value => true tracking_column => defect_id #Logstash by default consider last_sql_value as numeric if it's timestamp configure specifically as timestamp #tracking_column_type => "timestamp" record_last_run => true #This file keep record of sql_last_value so that when next time query run can utilize last run values last_run_metadata_path =>"logstash_jdbc_last_run_t_data.txt" #Define type of data from database type => "t-data" #Configure Timestamp according to database location #jdbc_default_timezone => "UTC"</code> } } filter { #To map your creation_date column with elasticsearch @timestamp use below Date filter mutate { convert => [ "creation_date", "string" ] } #Date pattern represent to date filter this creation_date is on format "MM/dd/yyyy HH:mm" #and from timezone America/New_York so that when store in elasticsearch in UTC will adjust accordingly date { match => ["creation_date","MM/dd/yyyy HH:mm"] timezone => "America/New_York" } } output { #output to elasticsearch elasticsearch { index => "defect-data-%{+YYYY.MM}" hosts => ["elasticsearch-server:9200"] document_type => "t-type" #Use document_id in elasticsearch id you want to stop duplicate record in elasticsearch document_id => "%{defect_id}" } #Output to console stdout { codec => rubydebug} }
I try to give descriptive information in comment corresponding to each properties in configuration file. if need to go in depth and more information just drop comments and send email will discuss in detail.
Date Filter : This filter will map CREATION_DATE to @timestamp value for Index for each document and it says to CREATION_DATE is having pattern as “MM/dd/yyyy HH:mm” so that while converting to timestamp will follow same.
Execution :
[logstash-installation-dir]/bin/logstash -f <strong>transaction-jdbc-defect</strong>.conf
For learning validation and start Logstash with other option follow link Logstash Installation, Configuration and Start
Logstash Console Output
If you noticed by using Date filter index @timestamp value is generating based on value of CREATION_DATE and for elasticsearch output configuration for index name defect-data-%{+YYYY.MM} will create indexes for every month based on @timestamp value as defect-data-2017.07 for sample data and if data changing in your database and defect id increase you will see changes on your console for new defects in every 15 minute as setup in configuration file.
Result :
select defect_id,owned_by,severity,status,summary,application,created_by,creation_date,modified_by,modified_date,assigned_to from defect_detail where defect_id>:sql_last_value order by defect_id" # { "severity" => "Severity 3", "summary" => "Customer call 5 time but no response", "owned_by" => "Ramesh", "creation_date" => "7/3/2017 15:44", "modified_date" => "8/19/2017 6:22", "type" => "t-data", "created_by" => "Saurabh", "@timestamp" => 2017-07-03T19:44:00.000Z, "modified_by" => "Gaurav", "@version" => "1", "defect_id" => 530812, "application" => "TEST-APP", "status" => "Cancelled", "assigned_to" => "Development" } { "severity" => "Severity 1", "summary" => "Dealer Code Buyer on behalf", "owned_by" => "Neha", "creation_date" => "7/3/2017 16:20", "modified_date" => "8/17/2017 9:29", "type" => "t-data", "created_by" => "Rajan", "@timestamp" => 2017-07-03T20:20:00.000Z, "modified_by" => "Nilam", "@version" => "1", "defect_id" => 530828, "application" => "TEST-APP5", "status" => "Cancelled", "assigned_to" => "Development" } { "severity" => "Severity 1", "summary" => "Client Not want Bulk call", "owned_by" => "Ramesh", "creation_date" => "7/24/2017 11:29", "modified_date" => "8/5/2017 20:00", "type" => "t-data", "created_by" => "Rajiv", "@timestamp" => 2017-07-24T15:29:00.000Z, "modified_by" => "Raghav", "@version" => "1", "defect_id" => 540829, "application" => "TEST-APP4", "status" => "Retest Complete", "assigned_to" => "IST - Integrated System Test" }
Summary
In above detail cover about below points:
- Logstash JDBC Input from Oracle Database.
- JDBC Input changes for sql_last_value for numeric and timestamp
- Read password and multi-line query from separate file.
- Date Filter to get Index Timestamp value based on fields and pattern.
- Dynamic Index Name for each day by appending date format.
- Duplicate insert record prevention on Elasticsearch.
- Start Logstash on background for configuration file.
- Send Logstash output to Elasticsearch and Console.
Read More
To read more on Logstash Configuration,Input Plugins, Filter Plugins, Output Plugins, Logstash Customization and related issues follow Logstash Tutorial and Logstash Issues.
Hope this blog was helpful for you.
Leave you feedback to enhance more on this topic so that make it more helpful for others.
You must log in to post a comment.