This is part 2 of 2-post series that shows you how to use Trill, an open source .NET library designed to process one trillion events a day, for impression feedback.

In part 1, we walked through how to write Trill queries to find out: 1) which impressions successfully joined to the feedback stream and 2) which impressions never joined to the feedback stream (expired). Today, we’ll walk through a final step — how to find out impressions immediately upon arrival, whether joined with feedback or not.

3. A stream of impressions immediately upon arrival, whether joined or not

(Note: step 2 and 3 were covered in part 1, which you can find here.)

Compared to the previous case, this one is easy – an antijoin sounds perfect:

internal static IStreamable JoinFeedbackToImpressions(
    this IStreamable impressions,
    IStreamable feedback)
{
    // set the lifetime of the input streams
    impressions = impressions.SetDuration(TimeSpan.FromMinutes(40));
    feedback    = feedback   .SetDuration(TimeSpan.FromMinutes(30));

    // produce a stream of successful joins
    IStreamable joined = impressions
        .Join(
            feedback,
            imp  => imp.RGUID,
            fdbk => fdbk.RGUID,
            (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk))
        .ToPointStream()
        ;

    // produce a stream of "expired" point events that coincide with the end time
    // of the impression stream, and suppress those which had a successful join
    IStreamable expired = impressions
        .PointAtEnd()
        .LeftAntiJoin(
            joined.SetDuration(TimeSpan.FromMinutes(40)),
            l => l.RGUID,
            r => r.RGUID)
        .Select(imp => new ImpressionsWithFeedback(imp, null))
        ;
+
+   // produce a stream of failed joins
+   IStreamable unjoined = impressions
+       .LeftAntiJoin(
+           feedback,
+           imp  => imp.RGUID,
+           fdbk => fdbk.RGUID)
+       .Select(imp, => new ImpressionsWithFeedback(imp, null))
+       ;

-   // union together the two streams and return the result
-   return joined.Union(expired); 
+   // union together the three streams and return the result
+   return joined.Union(unjoined).Union(expired);
}

Great!  So now we’re, done, right?

Of course not.  (What fun would that be?)  Let’s again look at what we are now outputting.

Trill impressions diagram

There are three of problems with the above solution:

  1. When an impression arrives before its feedback (the After and Way after cases), we will output two events – the unsuccessful join followed by the successful join. The downstream statistics computation query must either count both events, which will result in double-counting, or discard both events, which will produce no result at all.
  2. If an impression never joins with feedback (the No feedback case), a similar situation occurs; two events are output, the unsuccessful join followed by the expired event. This causes the same issue in statistics and an additional, similar problem in the final output – we can’t tell which event to use since we don’t know whether the event is done receiving feedback or not.
  3. If feedback arrives less than 10 minutes after the impression (the Coincident, Before, and Way before cases), the antijoin will produce a second, “tail antijoin” event that we don’t want to include in stats and which is not complete. These are the events outlined with a red dashed line in the above chart.

We need some way to signal to our downstream operators whether a given event should contribute to statistics and whether it is done receiving feedback. Let’s consider each case separately.

First, let’s examine the case of contributing to statistics. Remember that the requirement for statistics is that an impression must be counted exactly once, and it must be counted when it arrives. Although this doesn’t sound too bad, we can express this requirement even more simply: an impression may only be counted if it’s sync time upon output is equal to its sync time in the input.

This is much easier – all that we need is the ability to get an event’s sync time.  Luckily, there is an operator which does exactly that – an overload of Select which accepts a delegate that performs a transformation on the payload and sync time of each event in a stream.

Now let’s consider the second problem, whether an impression should be considered done. An impression must be output as soon as it receives feedback, or once it expires if feedback never arrives.  This case is more straightforward than the previous one since each of the internal streams is either always done or always not done.  In this case, we can just pass true or false to the ImpressionsWithFeedback constructor to differentiate.

Finally, the third case (extra tail antijoin events) can be addressed by filtering out events from the unjoined stream whose final sync time does not match its original sync time, very similar to how ContributeToStats is computed.

After making these changes, let’s see how our query looks.

internal static IStreamable JoinFeedbackToImpressions(
    this IStreamable impressions,
    IStreamable feedback)
{
-   // set the lifetime of the input streams
-   impressions = impressions.SetDuration(TimeSpan.FromMinutes(40));
+   // set the lifetime of the input streams, record the input sync time of the
+   // events in the impressions stream
+   impressions = impressions
+       .SetDuration(TimeSpan.FromMinutes(40))
+       .Select((t, x) => { x.OriginalSyncTime = t; return x; })
+       ;
+
    feedback = feedback.SetDuration(TimeSpan.FromMinutes(30));

    // produce a stream of successful joins
    IStreamable joined = impressions
        .Join(
            feedback,
            imp  => imp.RGUID,
            fdbk => fdbk.RGUID,
-           (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk))
+           (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk, done: true))
        .ToPointStream()
        ;

    // produce a stream of "expired" point events that coincide with the end time
    // of the impression stream, and suppress those which had a successful join
    IStreamable expired = impressions
        .PointAtEnd()
        .LeftAntiJoin(
            joined.SetDuration(TimeSpan.FromMinutes(40)),
            l => l.RGUID,
            r => r.RGUID)
-       .Select(imp => new ImpressionsWithFeedback(imp, null))
+       .Select(imp => new ImpressionsWithFeedback(imp, null, done: true))
        ;

-   // produce a stream of failed joins
+   // produce a stream of failed joins, dropping the tail join if present
    IStreamable unjoined = impressions
        .LeftAntiJoin(
            feedback,
            imp  => imp.RGUID,
            fdbk => fdbk.RGUID)
-       .Select(imp, => new ImpressionsWithFeedback(imp, null))
+       .Select(imp, => new ImpressionsWithFeedback(imp, null, done: false))
+       .Where((t, x) => x.OriginalSyncTime == t)
        ;

-   // union together the three streams and return the result
-   return joined.Union(unjoined).Union(expired);
+   // union together the three streams, mark records which should contribute
+   // to stats, and return the result
+   return joined
+       .Union(unjoined)
+       .Union(expired)
+       .Select((t, x) =>
+           {
+               x.ContributeToStats = x.OriginalSyncTime == t;
+               return x;
+           })
+       ;
}

Astute readers will notice that filtering out the tail antijoins is not strictly necessary, since these events will neither contribute to statistics (their original sync time is not equal to their final sync time) nor be considered done (antijoin events are incomplete by definition).

The reason to remove them here is to reduce the total number of events in the stream. Although they have no effect semantically, they still contribute to I/O costs related to checkpointing and cross-node communication, as well as they use memory in any stateful operator that they flow through.

The final query

Phew! It was a long journey, but we are finally done!  Let’s admire the fruits of our labor.

internal static IStreamable JoinFeedbackToImpressions(
    this IStreamable impressions,
    IStreamable feedback)
{
    // set the lifetime of the input streams, record the input sync time of the
    // events in the impressions stream
    impressions = impressions
        .SetDuration(TimeSpan.FromMinutes(40))
        .Select((t, x) => { x.OriginalSyncTime = t; return x; })
        ;
 
    feedback = feedback.SetDuration(TimeSpan.FromMinutes(30));

    // produce a stream of successful joins
    IStreamable joined = impressions
        .Join(
            feedback,
            imp  => imp.RGUID,
            fdbk => fdbk.RGUID,
            (imp, fdbk) => new ImpressionsWithFeedback(imp, fdbk, done: true))
        .ToPointStream()
        ;

    // produce a stream of "expired" point events that coincide with the end time
    // of the impression stream, and suppress those which had a successful join
    IStreamable expired = impressions
        .PointAtEnd()
        .LeftAntiJoin(
            joined.SetDuration(TimeSpan.FromMinutes(40)),
            l => l.RGUID,
            r => r.RGUID)
        .Select(imp => new ImpressionsWithFeedback(imp, null, done: true))
        ;

    // produce a stream of failed joins, dropping the tail join if present
    IStreamable unjoined = impressions
        .LeftAntiJoin(
            feedback,
            imp  => imp.RGUID,
            fdbk => fdbk.RGUID)
        .Select(imp, => new ImpressionsWithFeedback(imp, null, done: false))
        .Where((t, x) => x.OriginalSyncTime == t)
        ;

    // union together the three streams, mark records which should contribute
    // to stats, and return the result
    return joined
        .Union(unjoined)
        .Union(expired)
        .Select((t, x) =>
            {
                x.ContributeToStats = x.OriginalSyncTime == t;
                return x;
            })
        ;
}

And here is the same query visualized using a directed-acyclic graph (DAG) of query operators:

diagram of directed-acyclic graph (DAG) of query operators

Finally, let’s go back to our favorite chart one last time to make sure we’re producing the right thing. Whether an impression received feedback and the state of the two Boolean conditions (contribute to statistics, is done) are represented by the Joined, Stats, and Done monikers, respectively, preceded by ‘+’ and written in green for true and preceded by ‘-‘ and written in red for false.

Trill impression diagram

Yep, that looks perfect!

Wrapping up

We have finally produced our user-defined join operator for associating feedback with impressions, keeping in mind the requirements of our downstream operators.  Let’s reexamine the original stated requirements and briefly summarize how we addressed each one.

  • Output impressions as soon as they join successfully (but not before): We use an inner join to output impressions as soon as feedback arrives. An explicit signal in the event indicates that the event is done and ready to be output.
  • Don’t output an impression more than once: Events are included in the output only if the join succeeds or if the event ends (expires) before joining, which are mutually exclusive cases.
  • Don’t drop any impressions: If feedback is never received for an impression, we output an explicit expired event at the end of the impression’s lifetime. An explicit signal in the event indicates that the event is done and ready to be output.
  • Output impressions immediately as they arrive so they can contribute to statistics: We include an antijoin as well as an inner join – the antijoin produces events for the case where feedback arrives after the impression; the inner join produces events for the case where feedback arrives before or at the same time as the impression. These events are marked explicitly as contributors to statistics. They will additionally be marked as done in the successful join case.
  • Don’t count an impression more than once in statistics: We add an explicit signal in all the events we produce based on a comparison of the event’s input and output sync times. Only events whose input and output sync times are the same may contribute to statistics.
  • Feedback can arrive up to 10 minutes after the associated impression: The lifetime of the impression stream is 10 minutes longer than the lifetime of the feedback stream.
  • Delivery of the input logs can be delayed by up to 30 minutes on either side: The base lifetime of both the impression and feedback streams is 30 minutes.

Although the problem seemed complicated at first, breaking it down into sub-problems and solving each of them individually made the entire process much more manageable. This is a skill that can take some time to master, but once you have a good understanding of what temporal operators are available and how they behave, the streamable APIs allow for near limitless expressiveness and power.

Next time

In our next Trill tutorial, we’ll cover a different impression feedback mechanism that the Bing Ads processing pipeline handles: visibility. Visibility feedback is distinct from impression feedback in that it is based on user activity and does not arrive all at once.  Until then!

Feedback or questions? Let me know in the comments below.