package flipkart.dsp.santa.bernard.zulu.topology;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import flipkart.dsp.santa.bernanrd.GuiceInjectorFactory;
import flipkart.dsp.santa.bernard.stormcore.kafka.KafkaBolt;
import flipkart.dsp.santa.bernard.stormcore.utils.StormUtils;
import flipkart.dsp.santa.bernard.zulu.zookeeper.VerticalRepository;
import org.yaml.snakeyaml.Yaml;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class ListingQueueingTopology
{
private static final String TOPOLOGY_NAME = "ListingQueueingTopology";
private static final String ZULU_DELTA_SPOUT = "ZuluDeltaSpout";
private static final String SPLIT_BOLT = "SplitBolt";
private static final String KAFKA_BOLT = "KafkaBolt";
private static final int WAIT_TIME = 120;
private static List<String> verticalGroups;
private static Config topologyConf = new Config();
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException, ClassNotFoundException, IOException
{
setup();
}
static void setup()
{
setupConfig();
verticalGroups = GuiceInjectorFactory.getInstance().getInstance(VerticalRepository.class)
.getGroups();
StormUtils.killTopologyAndWait(TOPOLOGY_NAME, WAIT_TIME);
try {
StormSubmitter.submitTopology(TOPOLOGY_NAME, topologyConf, buildTopology());
}
catch (AlreadyAliveException | InvalidTopologyException | IOException e)
{
throw new RuntimeException();
}
System.exit(0);
}
static void setupConfig()
{
Yaml yaml = new Yaml();
Map<String, Object> conf = (Map<String, Object>) yaml.load(ListingQueueingTopology.class
.getClassLoader().getResourceAsStream(TOPOLOGY_NAME + ".yaml"));
topologyConf.putAll(conf);
}
static StormTopology buildTopology() throws IOException
{
TopologyBuilder builder = new TopologyBuilder();
BoltDeclarer splitBoltDeclarer = builder.setBolt(SPLIT_BOLT, new SplitBolt(), 4);
for (String verticalGroup : verticalGroups)
{
builder.setSpout(ZULU_DELTA_SPOUT + "-" + verticalGroup,
new ZuluDeltaSpout(verticalGroup), 1);
splitBoltDeclarer.shuffleGrouping(ZULU_DELTA_SPOUT + "-" + verticalGroup);
}
builder.setBolt(KAFKA_BOLT, new KafkaBolt<String, String>(), 4).shuffleGrouping(SPLIT_BOLT);
StormTopology stormTopology = builder.createTopology();
return stormTopology;
}
}
No comments:
Post a Comment