Monday, June 29, 2015

Foxtrot Client

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package com.flipkart.foxtrot.client;

import com.flipkart.foxtrot.client.Document;
import com.flipkart.foxtrot.client.EventSender;
import com.flipkart.foxtrot.client.EventSerializationHandler;
import com.flipkart.foxtrot.client.FoxtrotClientConfig;
import com.flipkart.foxtrot.client.MemberSelector;
import com.flipkart.foxtrot.client.cluster.FoxtrotCluster;
import com.flipkart.foxtrot.client.selectors.RandomSelector;
import com.flipkart.foxtrot.client.senders.HttpAsyncEventSender;
import com.flipkart.foxtrot.client.senders.HttpSyncEventSender;
import com.flipkart.foxtrot.client.senders.QueuedSender;
import com.flipkart.foxtrot.client.serialization.JacksonJsonFoxtrotClusterResponseSerializationHandlerImpl;
import com.flipkart.foxtrot.client.serialization.JacksonJsonSerializationHandler;
import com.flipkart.foxtrot.client.util.TypeChecker;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.List;

public class FoxtrotClient {
    private final FoxtrotCluster foxtrotCluster;
    private final EventSender eventSender;

    public FoxtrotClient(FoxtrotClientConfig config) throws Exception 
    {
        this(config, new RandomSelector(), JacksonJsonSerializationHandler.INSTANCE);
    }

    public FoxtrotClient(FoxtrotClientConfig config, MemberSelector memberSelector, EventSerializationHandler serializationHandler) throws Exception 
    {
        this.foxtrotCluster = new FoxtrotCluster(config, memberSelector, 
                                    JacksonJsonFoxtrotClusterResponseSerializationHandlerImpl.INSTANCE);
        this.eventSender = (EventSender)(Strings.isNullOrEmpty(config.getLocalQueuePath())
                                        ?new HttpAsyncEventSender(config, this.foxtrotCluster, serializationHandler)
                                        :new QueuedSender(new HttpSyncEventSender(config, this.foxtrotCluster, serializationHandler),
                                         serializationHandler, config.getLocalQueuePath(), config.getBatchSize(), config.getRefreshIntervalSecs()));
    }

    public FoxtrotClient(FoxtrotCluster foxtrotCluster, EventSender eventSender) 
    {
        this.foxtrotCluster = foxtrotCluster;
        this.eventSender = eventSender;
    }

    public void send(Document document) throws Exception 
    {
        Preconditions.checkNotNull(document.getData());
        Preconditions.checkArgument(!TypeChecker.isPrimitive(document.getData()));
        this.eventSender.send(document);
    }

    public void send(List<Document> documents) throws Exception 
    {
        this.eventSender.send(documents);
    }

    public void close() throws Exception 
    {
        this.eventSender.close();
        this.foxtrotCluster.stop();
    }
}

Tuesday, June 23, 2015

ListingQueueingTopology

package flipkart.dsp.santa.bernard.zulu.topology;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import flipkart.dsp.santa.bernanrd.GuiceInjectorFactory;
import flipkart.dsp.santa.bernard.stormcore.kafka.KafkaBolt;
import flipkart.dsp.santa.bernard.stormcore.utils.StormUtils;
import flipkart.dsp.santa.bernard.zulu.zookeeper.VerticalRepository;
import org.yaml.snakeyaml.Yaml;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class ListingQueueingTopology
    {

        private static final String TOPOLOGY_NAME = "ListingQueueingTopology";
        private static final String ZULU_DELTA_SPOUT = "ZuluDeltaSpout";
        private static final String SPLIT_BOLT = "SplitBolt";
        private static final String KAFKA_BOLT = "KafkaBolt";
        private static final int WAIT_TIME = 120;
        private static List<String> verticalGroups;
        private static Config topologyConf = new Config();


        public static void main(String[] args) throws AlreadyAliveException,
                InvalidTopologyException, ClassNotFoundException, IOException
            {
                setup();
            }

        static void setup()
            {
                setupConfig();
                verticalGroups = GuiceInjectorFactory.getInstance().getInstance(VerticalRepository.class)
                                                                                            .getGroups();
                StormUtils.killTopologyAndWait(TOPOLOGY_NAME, WAIT_TIME);
                try                    {
                        StormSubmitter.submitTopology(TOPOLOGY_NAME, topologyConf, buildTopology());
                    } 
                catch (AlreadyAliveException | InvalidTopologyException | IOException e)
                    {
                        throw new RuntimeException();
                    }
                System.exit(0);
            }

        static void setupConfig()
            {
                Yaml yaml = new Yaml();
                Map<String, Object> conf = (Map<String, Object>) yaml.load(ListingQueueingTopology.class
                                            .getClassLoader().getResourceAsStream(TOPOLOGY_NAME + ".yaml"));
                topologyConf.putAll(conf);
            }

        static StormTopology buildTopology() throws IOException
            {
                TopologyBuilder builder = new TopologyBuilder();
                BoltDeclarer splitBoltDeclarer = builder.setBolt(SPLIT_BOLT, new SplitBolt(), 4);
                for (String verticalGroup : verticalGroups)
                    {
                        builder.setSpout(ZULU_DELTA_SPOUT + "-" + verticalGroup,
                                new ZuluDeltaSpout(verticalGroup), 1);
                        splitBoltDeclarer.shuffleGrouping(ZULU_DELTA_SPOUT + "-" + verticalGroup);
                    }

                builder.setBolt(KAFKA_BOLT, new KafkaBolt<String, String>(), 4).shuffleGrouping(SPLIT_BOLT);
                StormTopology stormTopology = builder.createTopology();
                return stormTopology;
            }
    }

Sunday, June 14, 2015

Utils & Constants

Utils & Constants

package flipkart.dsp.santa.bernard.es.client;

import flipkart.dsp.santa.bernard.es.client.handler.property.PropertyHandler;
import flipkart.dsp.santa.offer.classification.criteria.CriteriaKey;
import flipkart.dsp.santa.offer.classification.criteria.EntityCriteriaProperty;
import org.elasticsearch.index.query.BoolFilterBuilder;

import java.util.Map;

public class Utils
    {

        public static void addFilter(Map<CriteriaKey, EntityCriteriaProperty> 
                      criteria, BoolFilterBuilder filterBuilder, Boolean isInclude)
            {
              if (criteria != null)
                {
                  for (Map.Entry<CriteriaKey, EntityCriteriaProperty> entry :                                                              criteria.entrySet())
                    {
                       PropertyHandler classificationHandler = BernardClientImpl
                                     .getPropertyFactory().getClassificationHandler                                      (entry.getValue().getClass());
                       if (isInclude)
                            filterBuilder.must(classificationHandler
                                         .prepareAggregation(entry.getValue()));
                       else                            filterBuilder.mustNot(classificationHandler
                                         .prepareAggregation(entry.getValue()));
                     }
                  }
            }
    }


-----------------------------------------------------------------------------------

package flipkart.dsp.santa.bernard.es.client;

public class Constants
    {

        public static final String REF_ID = "ref_id";
        public static final String VICTOR_REF = "victor_ref";
        public static final String ELIGIBLE_SELLER = "eligible_seller";
        public static final String SELLER_ID = "seller_id";
        public static final String BRAND = "brand";
        public static final String VERTICAL = "vertical";
        public static final String FSP = "fsp";
        public static final String SERVICE_PROFILE = "service_profile";
        public static final String CATEGORY_NODE_ID = "category_node_id";
        public static final String DISCOUNT_PERCENT = "discount_percent";
    }