This page shows how to use Storm SQL by showing the example of processing Apache logs. This page is written by "how-to" style so you can follow the step and learn how to utilize Storm SQL step by step.
This page assumes that Apache Zookeeper, Apache Storm and Apache Kafka are installed locally and running properly configured.
For convenience, this page assumes that Apache Kafka 0.10.0 is installed via brew
.
We'll use below tools to prepare the JSON data which will be fed to the input data source.
Since they're Python projects, this page assumes Python 3.0.x with pip3
, virtualenv
is installed locally.
In this page, we will use four topics, apache-logs
, apache-errorlogs
, apache-slowlogs
.
Please create topics according to your environment.
For Apache Kafka 0.10.0 with brew installed,
kafka-topics --create --topic apache-logs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-errorlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-slowlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
Let's feed the data to input topics. In this page we will generate fake Apache logs, and parse to JSON format, and feed JSON to Kafka topic.
Let's create your working directory, since we will clone the project and also setup virtualenv.
In your working directory, virtualenv env
to setup virtualenv to env directory, and activate.
$ virtualenv env
$ source env/bin/activate
Feel free to deactivate
when you're done with example.
Fake-Apache-Log-Generator
is not presented to package, and also we need to modify the script.
$ git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git
$ cd Fake-Apache-Log-Generator
Open apache-fake-log-gen.py
and replace while (flag):
statements to below:
elapsed_us = random.randint(1 * 1000,1000 * 1000) # 1 ms to 1 sec
seconds=random.randint(30,300)
increment = datetime.timedelta(seconds=seconds)
otime += increment
ip = faker.ipv4()
dt = otime.strftime('%d/%b/%Y:%H:%M:%S')
tz = datetime.datetime.now(pytz.timezone('US/Pacific')).strftime('%z')
vrb = numpy.random.choice(verb,p=[0.6,0.1,0.1,0.2])
uri = random.choice(resources)
if uri.find("apps")>0:
uri += `random.randint(1000,10000)`
resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04])
byt = int(random.gauss(5000,50))
referer = faker.uri()
useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )()
f.write('%s - - [%s %s] %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % (ip,dt,tz,elapsed_us,vrb,uri,resp,byt,referer,useragent))
log_lines = log_lines - 1
flag = False if log_lines == 0 else True
to make sure fake elapsed_us is included to fake log.
For convenience, you can skip cloning project and download modified file from here: apache-fake-log-gen.py (gist)
apache-log-parser
can be installed via pip
.
$ pip3 install apache-log-parser
Since apache-log-parser is a library, in order to parse fake log we need to write small Python script.
Let's create file parse-fake-log-gen-to-json-with-incrementing-id.py
with below content:
import sys
import apache_log_parser
import json
auto_incr_id = 1
parser_format = '%a - - %t %D "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
line_parser = apache_log_parser.make_parser(parser_format)
while True:
# we'll use pipe
line = sys.stdin.readline()
if not line:
break
parsed_dict = line_parser(line)
parsed_dict['id'] = auto_incr_id
auto_incr_id += 1
parsed_dict = {k.upper(): v for k, v in parsed_dict.iteritems() if not k.endswith('datetimeobj')}
print(json.dumps(parsed_dict))
OK! We're prepared to feed the data to Kafka topic. Let's use kafka-console-producer
to feed parsed JSON.
$ python3 apache-fake-log-gen.py -n 0 | python3 parse-fake-log-gen-to-json-with-incrementing-id.py | kafka-console-producer --broker-list localhost:9092 --topic apache-logs
and execute below to another terminal session to confirm data is being fed.
$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-logs
If you can see the json like below, it's done:
{"TIME_US": "757467", "REQUEST_FIRST_LINE": "GET /wp-content HTTP/1.0", "REQUEST_METHOD": "GET", "RESPONSE_BYTES_CLF": "4988", "TIME_RECEIVED_ISOFORMAT": "2021-06-30T22:02:53", "TIME_RECEIVED_TZ_ISOFORMAT": "2021-06-30T22:02:53-07:00", "REQUEST_HTTP_VER": "1.0", "REQUEST_HEADER_USER_AGENT__BROWSER__FAMILY": "Firefox", "REQUEST_HEADER_USER_AGENT__IS_MOBILE": false, "REQUEST_HEADER_USER_AGENT__BROWSER__VERSION_STRING": "3.6.13", "REQUEST_URL_FRAGMENT": "", "REQUEST_HEADER_USER_AGENT": "Mozilla/5.0 (X11; Linux x86_64; rv:1.9.7.20) Gecko/2010-10-13 13:52:34 Firefox/3.6.13", "REQUEST_URL_SCHEME": "", "REQUEST_URL_PATH": "/wp-content", "REQUEST_URL_QUERY_SIMPLE_DICT": {}, "TIME_RECEIVED_UTC_ISOFORMAT": "2021-07-01T05:02:53+00:00", "REQUEST_URL_QUERY_DICT": {}, "STATUS": "200", "REQUEST_URL_NETLOC": "", "REQUEST_URL_QUERY_LIST": [], "REQUEST_URL_QUERY": "", "REQUEST_URL_USERNAME": null, "REQUEST_HEADER_USER_AGENT__OS__VERSION_STRING": "", "REQUEST_URL_HOSTNAME": null, "REQUEST_HEADER_USER_AGENT__OS__FAMILY": "Linux", "REQUEST_URL": "/wp-content", "ID": 904128, "REQUEST_HEADER_REFERER": "http://white.com/terms/", "REQUEST_URL_PORT": null, "REQUEST_URL_PASSWORD": null, "TIME_RECEIVED": "[30/Jun/2021:22:02:53 -0700]", "REMOTE_IP": "88.203.90.62"}
In this example we'll filter error logs from entire logs and store them to another topics. project
and filter
features will be used.
The content of script file is here:
CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://apache-logs?bootstrap-servers=localhost:9092'
CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://apache-error-logs?bootstrap-servers=localhost:9092' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}'
INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4
Save this file to apache_log_error_filtering.sql
.
Let's take a look at the script.
The first statement defines the table APACHE_LOGS
which represents the input stream. The LOCATION
clause specifies the Kafka host (localhost:9092
) and the topic (apache-logs
).
Note that Kafka data source requires primary key to be defined. That's why we put integer id for parsed JSON data.
Similarly, the second statement specifies the table APACHE_ERROR_LOGS
which represents the output stream. The TBLPROPERTIES
clause specifies the configuration of KafkaProducer and is required for a Kafka sink table.
The last statement defines the topology. Storm SQL only define the topology and run topology on DML statement. DDL statements define input data source, output data source, and user defined function which will be referred by DML statement.
Let's look at the where
statement first. Since we want to filter error logs, we divide status by 100 and compare quotient is equal or greater than 4. (easier representation is >= 400
)
Since status in JSON is string format (hence represented as VARCHAR for APACHE_LOGS table), we apply CAST(STATUS AS INT) to convert to integer type before applying division.
Now we have filtered only error logs.
Let's transform some columns to match the output stream. In this statement we apply CAST(STATUS AS INT) to convert to integer type, and divide TIME_US by 1000 to convert microsecond to millisecond.
Last, insert statement stores filtered and transformed rows (tuples) to the output stream.
To run this example, users need to include the data sources (storm-sql-kafka
in this case) and its dependency in the
class path. The Storm SQL core dependencies are automatically handled when users run storm sql
.
Users can include data sources at the submission step like below:
$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql apache_log_error_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka-client:2.0.0-SNAPSHOT,org.apache.kafka:kafka-clients:1.1.0^org.slf4j:slf4j-log4j12"
Above command submits the SQL statements to StormSQL. The command line syntax of Storm SQL is storm sql [script file] [topology name]
.
Users need to modify each artifacts' version if users are using different version of Storm or Kafka.
If your statements pass the validation phase, topology will be shown to Storm UI page.
You can see the output via console:
$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-error-logs
and the output will be similar to:
{"ID":854643,"REMOTE_IP":"4.227.214.159","REQUEST_URL":"/wp-content","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows 98; Win 9x 4.90; it-IT; rv:1.9.2.20) Gecko/2015-06-03 11:20:16 Firefox/3.6.17","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T19:14:44+00:00","TIME_RECEIVED_TIMESTAMP":1616958884000,"TIME_ELAPSED_MS":274.222}
{"ID":854693,"REMOTE_IP":"223.50.249.7","REQUEST_URL":"/apps/cart.jsp?appID=5578","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_6; rv:1.9.2.20) Gecko/2015-11-06 00:20:43 Firefox/3.8","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T21:41:02+00:00","TIME_RECEIVED_TIMESTAMP":1616967662000,"TIME_ELAPSED_MS":716.851}
...
You can also run Storm SQL runner to see the logical plan via placing --explain
to topology name:
$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka-client:2.0.0-SNAPSHOT,org.apache.kafka:kafka-clients:1.1.0^org.slf4j:slf4j-log4j12"
and the output will be similar to:
LogicalTableModify(table=[[APACHE_ERROR_LOGS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8
LogicalProject(ID=[$0], REMOTE_IP=[$1], REQUEST_URL=[$2], REQUEST_METHOD=[$3], STATUS=[CAST($4):INTEGER NOT NULL], REQUEST_HEADER_USER_AGENT=[$5], TIME_RECEIVED_UTC_ISOFORMAT=[$6], TIME_ELAPSED_MS=[/($7, 1000)]), id = 7
LogicalFilter(condition=[>=(/(CAST($4):INTEGER NOT NULL, 100), 4)]), id = 6
EnumerableTableScan(table=[[APACHE_LOGS]]), id = 5
It might be not same as you are seeing if Storm SQL applies query optimizations.
We're executing the first Storm SQL topology! Please kill the topology when you see enough output and the logs.
To be concise, we'll skip explaining the things we've already seen.
In this example we'll filter slow logs from entire logs and store them to another topics. project
and filter
, and User Defined Function (UDF)
features will be used.
This is very similar to filtering error logs
but we'll see how to define User Defined Function (UDF)
.
The content of script file is here:
CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://apache-logs?bootstrap-servers=localhost:9092' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}'
CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_RECEIVED_TIMESTAMP BIGINT, TIME_ELAPSED_MS INT) LOCATION 'kafka://apache-slow-logs?bootstrap-servers=localhost:9092' TBLPROPERTIES '{"producer":{"acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer"}}'
CREATE FUNCTION GET_TIME AS 'org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2'
INSERT INTO APACHE_SLOW_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ') AS TIME_RECEIVED_TIMESTAMP, TIME_US / 1000 AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (TIME_US / 1000) >= 100
Save this file to apache_log_slow_filtering.sql
.
We can skip the first 2 statements since it's almost same to the last example.
The third statement defines the User defined function
. We're defining GET_TIME
which uses org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2
class.
The implementation of GetTime2 is here:
package org.apache.storm.sql.runtime.functions.scalar.datetime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
public class GetTime2 {
public static Long evaluate(String dateString, String dateFormat) {
try {
DateTimeFormatter df = DateTimeFormat.forPattern(dateFormat).withZoneUTC();
return df.parseDateTime(dateString).getMillis();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
This class can be used for UDF since it defines static evaluate
method. The SQL type of parameters and return are determined by Calcite which Storm SQL depends on.
Note that this class should be in classpath, so in order to define UDF, you need to create jar file which contains UDF classes and run storm sql
with --jar
option.
This page assumes that GetTime2 is in classpath, for simplicity.
The last statement is very similar to filtering error logs. The only new thing is that we call GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ')
to convert string time to unix timestamp (BIGINT).
Let's execute it.
$ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql apache_log_slow_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka-client:2.0.0-SNAPSHOT,org.apache.kafka:kafka-clients:1.1.0^org.slf4j:slf4j-log4j12"
You can see the output via console:
$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-slow-logs
and the output will be similar to:
{"ID":890502,"REMOTE_IP":"136.156.159.160","REQUEST_URL":"/list","REQUEST_METHOD":"GET","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows NT 5.01) AppleWebKit/5311 (KHTML, like Gecko) Chrome/13.0.860.0 Safari/5311","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T03:44:59+00:00","TIME_RECEIVED_TIMESTAMP":1622864699000,"TIME_ELAPSED_MS":638.579}
{"ID":890542,"REMOTE_IP":"105.146.3.190","REQUEST_URL":"/search/tag/list","REQUEST_METHOD":"DELETE","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (X11; Linux i686) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.891.0 Safari/5332","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T05:54:27+00:00","TIME_RECEIVED_TIMESTAMP":1622872467000,"TIME_ELAPSED_MS":403.957}
...
That's it! Supposing we have UDF which queries geo location via remote ip, we can filter via geo location, or enrich geo location to transformed result.
We looked through several simple use cases for Storm SQL to learn Storm SQL features. If you haven't looked at Storm SQL integration and Storm SQL language, you need to read it to see full supported features.