Skip to content
Open Source blog

On the Microsoft BingAds team, one of my primary responsibilities is the development and maintenance of the FastBI pipeline – the system responsible for all revenue coming from the Bing search engine.

We have been working on streaming technologies for the last five years, combining the scale and stability of our internal Cosmos compute platform with the expressivity and power of Trill as a processing engine, to produce a world-class streaming compute solution that is capable of handling some of our most important workloads.

In this 2-part series, I’ll introduce one of several challenging scenarios that we faced in the FastBI pipeline, followed by a detailed discussion of how we address it. I’ve streamlined and simplified things a bit in the interest of making things quicker to understand, but for the most part, this is representative of the kinds of problems we face on a daily basis when modeling our system temporally.

Note: this post assumes you have read Trill 101 and Trill 102 blog posts by my colleague, James Terwilliger.

Without further ado, let’s dig on in!

Scenario

User activity on the Bing search engine and other search providers powered by Bing causes ad requests to be made and logged to an impression log. Search providers have the option of making modifications to the ads that were returned, including things like reordering ads within a block, moving ads between blocks, or dropping them entirely. These modifications are collected and logged as a separate feedback signal that will be processed by the Bing Ads processing pipeline. To get a view of the final, served ad slate, the feedback stream must be joined to the impression stream.

One of the other major responsibilities of the pipeline is fraud detection. How fraud detection works is beyond the scope of this article; the important thing here is that the pipeline computes various statistics about the input traffic as one of several inputs that determines whether traffic is considered billable or not. Some of these statistics are very time-sensitive – the closer the events are processed to the actual order in which they occurred, the better.

Requirements

The above business scenario translates to the following technical requirements.

  • Output impressions as soon as they join successfully (and not before)
  • Don’t output an impression more than once
  • Don’t drop any impressions
  • Output impressions immediately as they arrive so they can contribute to statistics
  • Don’t count an impression more than once in statistics
  • Feedback can arrive up to 10 minutes after the associated impression
  • Delivery of the input logs can be delayed by up to 30 minutes on either side

As stated, some of these requirements seem mutually exclusive – we can’t output an impression until it is joined, but we must output the impression immediately for statistics; we can’t output an impression until it is joined, but we can’t drop any impressions (what about impressions that never receive feedback?).  This seems like a problem.

Fear not, dear reader, for not all is lost. What we really need is the ability to distinguish between these cases so that downstream queries can filter according to their specific requirements. This query, then, needs to be able to provide answers to the following questions for each event:

  • Is the impression done receiving feedback?
  • Should the impression contribute to statistics?

Let’s look at each of these requirements more closely.

Is the impression done receiving feedback? An impression is considered done as soon as it receives feedback, which matches perfectly with the behavior of an inner join.  But what if feedback never arrives? This implies the operator must produce a second stream; namely, we need an expired event to be produced at the end of the impression’s lifetime only if it never received feedback previously. We’ll examine how to accomplish this later, but for now, the combination of these two streams contains all the impressions in the input, some with feedback and some that are expired.

Should the impression contribute to statistics? An impression must contribute to statistics immediately upon arrival. This means the query needs to produce output as soon as the impression arrives, which suggests a combination of an antijoin (when feedback is not present) and an inner join (when feedback is present).

However, there is one problem here – in the case where feedback arrives after the impression, both the antijoin and inner join will produce an output, and only one of them may contribute to statistics. This means we need a way to identify whether the operator produced an antijoin before the inner join or not.  We can record this in the event we produce so that the statistics query can properly filter incoming impressions to avoid double-counting.

The following diagram illustrates several possible ways that impression and feedback signals can arrive and the expected output of this operator for each one. For clarity, the join lifetime of the input events is shown as a dotted region; the input events are originally points.

illustrates several possible ways that impression and feedback signals can arrive and the expected output of this operator for each one

*Due to the imprecise nature of the diagram, it is difficult to show exactly what these cases are intended to illustrate. The Way before and Way after cases represent when impression and feedback events overlap by the absolute minimum amount, 1 tick. In the Way before case, the end time of the feedback is one tick larger than the start time of the impression, and in the Way after case, the start time of the feedback is one tick smaller than the end time of the impression.

Don’t worry too much about the event descriptors yet (Joined, Stats, Done). We’ll cover those later. For now, focus on the events we produce and their lifetimes.

Creating the operator

Now that we’ve defined the problem, let’s work on the solution. We are going to create a user-defined operator that abstracts away the complexity of this join into a single, logical operator. Let’s start with the signature. We need to have both the impression and feedback streams, so we’ll start off by supplying those as arguments.

internal static IStreamable JoinFeedbackToImpressions(
    this IStreamable impressions,
    IStreamable feedback)
{
    throw new NotImplementedException();
}
  1. A stream of impressions that successfully join to the feedback stream
  2. A stream of impressions that never joined to the feedback stream (expired)
  3. A stream of impressions immediately upon arrival, whether joined or not

Let’s tackle these one at a time.

1. A stream of impressions that successfully join to the feedback stream

This stream is the easiest – an inner join produces exactly what we want. We also must consider the allowed delays of the system – up to 30 minutes on either side, plus 10 additional minutes on the feedback side. We can update the implementation of our method like so:

internal static IStreamable JoinFeedbackToImpressions(
    this IStreamable impressions,
    IStreamable feedback)
{
-   throw new NotImplementedException();
+   return impressions
+       .SetDuration(TimeSpan.FromMinutes(40))
+       .Join(
+           feedback.SetDuration(TimeSpan.FromMinutes(30)),
+           imp  => imp.RGUID,
+           fdbk => fdbk.RGUID,
+           (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk))
+       .ToPointStream()
+       ;
}

Note: in the above code snippet, “-” indicates that a line is removed and “+” indicates that a line is added

This query defines a simple inner join between the impression and feedback streams on “RGUID” – short for “request GUID” – which serves as the join key between all logs in the Bing Ads pipeline. When the join succeeds, a new ImpressionsWithFeedback instance is created from the impressions and feedback that joined. When we are done, we convert the events to points since we don’t care about their lifetime after this.

Let’s see how this result compares to our original goal. The output produced by the query is in color; missing events are in gray.

diagram of how this result compares to our original goal

Not bad, but we’ve got a bit of a way to go yet.

2. A stream of impressions that never joined to the feedback stream (expired)

The next stream is a bit more complicated because it requires us to look into the past to determine whether or not to output an event. There are two possible outcomes, depending on what happened previously.

  1. The impression joined at some point to its feedback. In this case, we do not want to output anything.
  2. The impression never joined to its feedback. In this case, we want to produce a point event that coincides with the impression’s end time.

Let’s see how we can accomplish this by combining other temporal primitives.

When you see a pattern like “output an event only when some condition is satisfied,” this often indicates that an antijoin should be used. The antijoin operator suppresses output of the events in a stream based on the content of another stream, so the basic strategy is to produce the desired output for every event in the input stream and then to use an antijoin to selectively suppress the ones which are not needed.

Side note: there is some similarity in function between the Where and LeftAntiJoin/RightAntiJoin operators in that they both cause events to be dropped from their input. However, there are some important differences.

  • The Where operator is a scalar (payload-based) operator that filters events according to a user-supplied predicate function. If the filter is not satisfied, the entire event is dropped.
  • The LeftAntiJoin/RightAntiJoin operators are temporal operators that suppress the output of an event as long as it matches an event in another stream. Notably, a matching event does not cause the input event to be dropped completely. It instead is “turned off” for the duration of the match, after which it can become active again.

In this case, since we need to suppress events based on whether they joined successfully in the past, we must use an antijoin.

Here is how we can modify the query to produce a stream of expired events.

internal static IStreamable JoinFeedbackToImpressions(
    this IStreamable impressions,
    IStreamable feedback)
{
+   // set the lifetime of the input streams
+   impressions = impressions.SetDuration(TimeSpan.FromMinutes(40));
+   feedback    = feedback   .SetDuration(TimeSpan.FromMinutes(30));
+
+   // produce a stream of successful joins
-   return impressions
-       .SetDuration(TimeSpan.FromMinutes(40))
+   IStreamable joined = impressions
        .Join(
-           feedback.SetDuration(TimeSpan.FromMinutes(30)),
+           feedback,
            imp  => imp.RGUID,
            fdbk => fdbk.RGUID,
            (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk))
        .ToPointStream()
        ;
+
+   // produce a stream of "expired" point events that coincide with the end time
+   // of the impression stream, and suppress those which had a successful join
+   IStreamable expired = impressions
+       .PointAtEnd()
+       .LeftAntiJoin(
+           joined.SetDuration(TimeSpan.FromMinutes(40)),
+           l => l.RGUID,
+           r => r.RGUID)
+       .Select(imp => new ImpressionsWithFeedback(imp, null))
+       ;
+
+   // union together the two streams and return the result
+   return joined.Union(expired);
}

Note: in the above code snippet, “-” indicates that a line is removed and “+” indicates that a line is added

There are a lot of changes, so let’s talk about them.

First, I pulled out the SetDuration calls so that they could be reused without being repeated. For simplicity’s sake, I reused the same local variable for each alteration. I could have instead defined a new IStreamable<T> variable for each one; the result is the same.

Second, instead of returning the join result immediately, I created a joined stream that contains the result of the join. This is used in the expired event computation and is also returned at the end of the method.

Third, I computed a stream of expired events. As explained above, this creates an expired event for every impression (impressions.PointAtEnd()) and then suppresses those which are preceded by a successful join (LeftAntiJoin(joined…)).  But what about the SetDuration operator being applied to joined on the right side of the antijoin?

It may help to visualize what is going on here. The below chart shows how each case will behave in this subquery:

chart shows how each case will behave in the subquery

As you can see, the lifetime of the events in joined changes significantly depending on the relative position of the input events in time. To ensure that we correctly suppress the appropriate expired events, the lifetime of the events in the filtering stream must always overlap with the events in the expired stream.

In the worst case, this means that the join lifetime needs to be extended by 40 minutes, which happens to be the same as the lifetime of the impressions going into the join. (This is not a coincidence; proof of this is left as an exercise for the reader.)

Notice that after changing the duration of the joined events, all of them overlap with the point at the end of the impression’s lifetime. When these two streams are antijoined, this will cause the output to be suppressed in all cases except for No feedback, which is exactly what we want.

After making the above changes, we add the expired events to the output by unioning the joined and expired streams.

Now let’s look again at the result after making these changes.

diagram of the results

“Wait a minute,” you say, “that looks almost exactly the same as it did before!” You are right – they are almost the same. The only difference is an additional event in the No feedback case which coincides with the end of the impression, the expired event.

This brings us to the end of Part 1. In Part 2, we will show how to find out impressions that don’t ever have a joined feedback. We will then put all 3 cases together into a single query that answer our original question.

Questions in the meantime? Let us know in the comments?