Tuesday, June 23, 2015

ListingQueueingTopology

package flipkart.dsp.santa.bernard.zulu.topology;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import flipkart.dsp.santa.bernanrd.GuiceInjectorFactory;
import flipkart.dsp.santa.bernard.stormcore.kafka.KafkaBolt;
import flipkart.dsp.santa.bernard.stormcore.utils.StormUtils;
import flipkart.dsp.santa.bernard.zulu.zookeeper.VerticalRepository;
import org.yaml.snakeyaml.Yaml;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class ListingQueueingTopology
    {

        private static final String TOPOLOGY_NAME = "ListingQueueingTopology";
        private static final String ZULU_DELTA_SPOUT = "ZuluDeltaSpout";
        private static final String SPLIT_BOLT = "SplitBolt";
        private static final String KAFKA_BOLT = "KafkaBolt";
        private static final int WAIT_TIME = 120;
        private static List<String> verticalGroups;
        private static Config topologyConf = new Config();


        public static void main(String[] args) throws AlreadyAliveException,
                InvalidTopologyException, ClassNotFoundException, IOException
            {
                setup();
            }

        static void setup()
            {
                setupConfig();
                verticalGroups = GuiceInjectorFactory.getInstance().getInstance(VerticalRepository.class)
                                                                                            .getGroups();
                StormUtils.killTopologyAndWait(TOPOLOGY_NAME, WAIT_TIME);
                try                    {
                        StormSubmitter.submitTopology(TOPOLOGY_NAME, topologyConf, buildTopology());
                    } 
                catch (AlreadyAliveException | InvalidTopologyException | IOException e)
                    {
                        throw new RuntimeException();
                    }
                System.exit(0);
            }

        static void setupConfig()
            {
                Yaml yaml = new Yaml();
                Map<String, Object> conf = (Map<String, Object>) yaml.load(ListingQueueingTopology.class
                                            .getClassLoader().getResourceAsStream(TOPOLOGY_NAME + ".yaml"));
                topologyConf.putAll(conf);
            }

        static StormTopology buildTopology() throws IOException
            {
                TopologyBuilder builder = new TopologyBuilder();
                BoltDeclarer splitBoltDeclarer = builder.setBolt(SPLIT_BOLT, new SplitBolt(), 4);
                for (String verticalGroup : verticalGroups)
                    {
                        builder.setSpout(ZULU_DELTA_SPOUT + "-" + verticalGroup,
                                new ZuluDeltaSpout(verticalGroup), 1);
                        splitBoltDeclarer.shuffleGrouping(ZULU_DELTA_SPOUT + "-" + verticalGroup);
                    }

                builder.setBolt(KAFKA_BOLT, new KafkaBolt<String, String>(), 4).shuffleGrouping(SPLIT_BOLT);
                StormTopology stormTopology = builder.createTopology();
                return stormTopology;
            }
    }

No comments:

Post a Comment