GOPHERSPACE.DE - P H O X Y
gophering on hngopher.com
HN Gopher Feed (2017-06-29) - page 1 of 10
 
___________________________________________________________________
Delivering Billions of Messages Exactly Once
287 points by fouadmatin
https://segment.com/blog/exactly-once-delivery/
___________________________________________________________________
 
kortox - 5 hours ago
With deduplication state on the worker nodes, how does scaling up,
or provisioning new machines, or moving a partition between
machines work?
 
throwaway67 - 6 hours ago
... or they could have used BigQuery with a primary key on message
ID.
 
  redmalang - 2 hours ago
  BQ doesn't have primary keys. Perhaps you are thinking of the id
  that can be supplied with the streaming insert? This has very
  loose guarantees on what is de-duplicated (~5m iirc)
 
    vgt - 1 hours ago
    yea I think within the context of BigQuery the most sensible
    thing would be to do an aggregate per the column that would be
    considered a primary key. For example [0]. That said, Streaming
    API de-dupe window is very nice in practice.I mentioned
    elsewhere on Google Cloud the most elegant way of doing this is
    with Google Cloud Dataflow [1](work at
    G)[0]https://stackoverflow.com/questions/38446499/bigquery-
    dedupl...[1]https://cloud.google.com/blog/big-data/2017/06/how-
    qubit-ded...
 
PinguTS - 5 hours ago
That reminds me of the safety-related protocols we use since years
in embedded electronics like rail-road signaling, medical, and
other areas.
 
stratosgear - 6 hours ago
Site seems to be down. Any ideas how big these HN hugs of death
usually are? How big of a traffic spike brings these servers down?
 
  crgwbr - 5 hours ago
  I got front paged once a few years ago, it was about 15,000 page
  views in the course of a few hours. It's probably grown since
  then, but sure by what factor.
 
  Exuma - 6 hours ago
  It's not down
 
    stratosgear - 6 hours ago
    Hmm weird. I get: This site can?t be reachedsegment.com refused
    to connect.Try:Checking the
    connectionERR_CONNECTION_REFUSEDMaybe something local to me
    only?
 
      psadauskas - 6 hours ago
      Its probably an ad- or tracker-blocker. I had to disable mine
      to be able to load segment.com, since they're pretty much a
      tracking company.
 
      fouadmatin - 6 hours ago
      Might be an adblock issue? (some blockers treat direct access
      to a website and loading an anlytics snippet as the same)
 
      asadlionpk - 5 hours ago
      You are probably blocking this site in your hosts file. Since
      it's a tracking tool.
 
      disconnected - 5 hours ago
      Check your ad blocker/hosts file.In here, uMatrix just blocks
      the site with the message:> uMatrix has prevented the
      following page from loading:> https://segment.com/blog
      /exactly-once-delivery/I checked, and one of my uMatrix hosts
      files includes 'www.segment.com'.
 
        stratosgear - 5 hours ago
        Yep. Disabling AdAway seems to do the trick. Thanks for the
        heads up
 
    [deleted]
 
  fouadmatin - 6 hours ago
  hm seems to be up for me? you might've caught our autoscaler mid-
  scaling!
 
ratherbefuddled - 4 hours ago
"Almost Exactly Once" doesn't have quite the same ring to it, but
it is actually accurate.  We've already discovered better trade-
offs haven't we?
 
spullara - 5 hours ago
This isn't the solution I would architect. It is much easier to de-
duplicate when processing your analytics workload later and you
don't need to do so much work.
 
falcolas - 5 hours ago
So, a combination of a best effort "at least once" messaging with
deduplication near the receiving edge. Fairly standard,
honestly.There is still a potential for problems in the message
delivery to the endpoints (malformed messages, Kafka errors,
messages not being consumed fast enough and lost), or duplication
at that level (restart a listener on the Kafka stream with the
wrong message ID) as well.This is based on my own pains with
Kinesis and Lambda (which, I know, isn't Kafka).In my experience,
better to just allow raw "at least once" messaging and perform
idempotant actions based off the messages. It's not always possible
(and harder when it is possible), but its tradeoffs mean you're
less likely to lose messages.
 
  caust1c - 5 hours ago
  This is generally better, but we're delivering these messages to
  integrations which don't necessarily take idempotent actions.
 
ggcampinho - 6 hours ago
Isn't the new feature of Kafka about
this?https://issues.apache.org/jira/browse/KAFKA-4815
 
  caust1c - 6 hours ago
  It would help, but messages are sent to the API first.  We aren't
  sending messages directly to Kafka from the Internet, of
  course.So we can get duplicate API submissions regardless of
  whether or not we enabled transactional productions into kafka
  from a producer.
 
StreamBright - 6 hours ago
"The single requirement of all data pipelines is that they cannot
lose data."Unless the business value of data is derived after
applying some summary statistics, than even sampling the data
works, and you can lose events in an event stream, while not
changing the insight gained. Originally Kafka was designed to be a
high throughput data bus for analytical pipeline where losing
messages was ok. More recently they are experimenting with exactly
once delivery.
 
skMed - 35 minutes ago
Having built something similar with RabbitMQ in a high-volume
industry, there are a lot of benefits people in this thread are
glossing over while they focus on semantics. Yes, this is not
"exactly once" -- there really is no such thing in a distributed
system. The best you can hope for is that your edge consumers are
idempotent.There is a lot of value derived from de-duping near
ingress of a heavy stream such as this. You're saving downstream
consumers time (money) and potential headaches. You may be in an
industry where duplicates can be handled by a legacy system, but it
takes 5-10 minutes of manual checks and corrections by support
staff. That was my exact use case and I can't count the number of
times we were thankful our de-duping handled "most" cases.
 
vgt - 3 hours ago
Qubit's strategy to do this via streaming, leveraging Google Cloud
Dataflow:https://cloud.google.com/blog/big-data/2017/06/how-qubit-
ded...
 
sethammons - 4 hours ago
"Exactly Once" Over a window of time that changes depending on the
amount of ingested events.Basically, they read from a kafka stream
and have a deduplication layer in rocks db that produces to another
kafka stream. They process about 2.2 billion events through it per
day.While this will reduce duplicates and get closer to Exactly
Once (helping reduce the two generals problem on incoming requests
and potentially work inside their data center), they still have to
face the same problem again when they push data out to their
partners. Some packet loss, and they will be sending out duplicate
to the partner.Not to downplay what they have done as we are doing
a similar thing near our exit nodes to do our best to prevent
duplicate events making it out of our system.
 
wonderwonder - 6 hours ago
Would something like AWS SQS not scale for something like this? We
currently push about 25k daily transactions over SQS, obviously no
where near the scale of this, just wondering about what limitations
we will bump into potentially.
 
  timdorr - 5 hours ago
  The limitations are most likely on price. For the 200B messages
  they've already processed in the last 3 months, that would be
  $100,000 total on just the SQS FIFO queue, or $33,333 per month.
  And that's not counting data transfer.
 
    hilbertseries - 5 hours ago
    As long as everything is in ec2 data transfer will be free.
    You're cost calculations are also off base. You'll need to
    send, receive and delete every message that you process via
    SQS. These can all be done in batches of 10. So it's 200B *
    3/10 * .50 / million, which comes out to 60k over 3 months.
    Still not cheap, Kinesis is probably the better option in this
    case if you want an AWS managed service.
 
    wonderwonder - 5 hours ago
    That'd definitely be a pretty effective limitation.
 
  psadauskas - 5 hours ago
  SQS is not "exactly once", so might not meet their requirements.
 
    idunno246 - 5 hours ago
    sqs has fifo queues which claim to be exactly once
 
      lostcolony - 5 hours ago
      SQS queues deduplicate over a 5 minute window. This is
      claiming a much larger window.Either way your listener(s)
      still has to have its own deduplication. Ensuring a message
      ends up on the queue only once, and ensuring it's processed
      exactly once, are two different problems that require
      separate handling (and, the former is what most out of the
      box systems claim to solve, while the latter is more
      important, and, frankly, completely negates the need of the
      former).
 
    twodave - 5 hours ago
    Actually, it can be. You just pay more.
 
  eropple - 5 hours ago
  (edit: incorrect, my bad, see thread)
 
    twodave - 5 hours ago
    This is wrong. There is a type of SQS queue that indeed does
    exactly-once. It costs more, and not the default option, but it
    is there.
 
      eropple - 5 hours ago
      Oh, right, they added their FIFO queues. My bad; thanks for
      the correction. Worth noting, though, that AWS's own services
      can't talk to FIFO queues. If you want to wire up SNS or
      Lambda dead-letter queues to a FIFO queue, you are out of
      luck.
 
newobj - 5 hours ago
I don't want to ever see the phrase "Exactly Once" without several
asterisks behind it. It might be exactly once from an "overall"
point of view, but the client effectively needs infinitely durable
infinite memory to perform the "distributed transaction" of acting
on the message and responding to the server.Imagine:- Server
delivers message M- Client process event E entailed by message M-
Client tries to ack (A) message on server, but "packet loss"- To
make matters worse, let's say the client also immediately dies
after thisHow do you handle this situation? The client must
transactionally/simultaneously commit both E and A/intent-to-A.
Since the server never received an acknowledgment of M, it will
either redeliver the message, in which case some record of E must
be kept to deduplicate on, or it will wait for client to resend A,
or some mixture of both. Note: if you say "just make E idempotent",
then you don't need exactly-once delivery in the first place...I
suppose you could go back to some kind of lock-step processing of
messages to avoid needing to record all (E,A) that are in flight,
but that would obviously kill throughput of the message
queue.Exactly Once can only ever be At Least Once with some out-of-
the-box idempotency that may not be as cheap as the natural
idempotency of your system.EDIT: Recommended reading: "Life Beyond
Distributed Transactions", Pat Helland -
http://queue.acm.org/detail.cfm?id=3025012
 
  rusanu - 2 hours ago
  Having spent 7 years of my life working with Pat Helland in
  implementing Exactly Once In Order messaging with SQL Server
  Service Broker[0] I can assure you that practical EOIO messaging
  is possible, exists, and works as advertised. Delivering data
  EOIO is not rocket science, TCP has been doing it for decades.
  Extending the TCP paradigms (basically retries and acks) to
  messaging is not hard if you buy into transacted persisted
  storage (= a database) for keeping undelivered messages
  (transmission queue) and storing received messages before
  application consumption (destination queue). Just ack after you
  commit locally.We've been doing this in 2005 at +10k msgs/sec (1k
  payload), durable, transacted, fully encrypted, with no two phase
  commit, supporting long disconnects (I know for documented cases
  conversations that resumed and continued after +40 days of
  partner network disconnect).Running into resource limits
  (basically out of disk space) is something the database community
  knows how to monitor, detect and prevent for decades now.I really
  don't get why so many articles, blogs and comments claim this is
  not working or impossible or even hard. My team shipped this +12
  years ago, is used by major deployments, technology is proven and
  little changed in the original protocol.[0]
  https://docs.microsoft.com/en-us/sql/database-engine/configu...
 
    YZF - 1 hours ago
    TCP does deliver data more than once though.  Sure within a TCP
    session you are guaranteed not to get the same byte twice out
    of your socket, bytes are delivered exactly once, in order.
    Now if your application that uses TCP pulls data out of the
    socket and then dies the data will need to be delivered again
    and the TCP protocol is unable to help us there, it's
    application level logic at that point.So everyone is talking
    about the same thing here, data can get delivered twice, and
    the application must handle it in an idempotent way.  For TCP
    this happens to be looking at some counter and throwing away
    anything that's already been seen.  With a single client/server
    connection maintaining sequence numbers like TCP does solve the
    problem but it's harder to use this same technique in multi-
    client, multi-server, "stateless" requests that are common in
    the web world.EDIT: To clarify, what I mean by TCP "does
    deliver data more than once", is that one side of the
    connection can send the same data twice.  It's true that it's
    then discarded by the other end but this is what people are
    talking about when they talk about the theoretical
    impossibility of never sending anything twice.  The rest is
    basically the idem-potency thing of ensuring that data received
    twice somehow doesn't cause anything abnormal.
 
      bmelton - 1 hours ago
      Right.  If your definition of "delivered" is "showed a
      message to the user", then exactly once is a fairly trivial
      goal to attain.  If it's defined "data over the wire", then
      it's nearly impossible to avoid some form of ACK/RETRY
      mechanism on real networks, which means that messages will
      need to sometimes have to be 'delivered' many times to ensure
      a single display to the user.
 
      rusanu - 1 hours ago
      Not when your 'socket' is a persisted, durable, transacted
      medium (ie. a database). Sure, applications can 'pull data
      out of the socket and then die', but this is a common
      scenarios on databases which is handled with transaction and
      post-crash recovery. The application comes back after the
      crash and find the same state as before the crash (the
      'socket' still has the data ready to pull off), it pull
      again, process, and then commit. This is not duplicate
      delivery, since we're talking about an aborted and rolled
      back attempt, followed later by a successful processing.
      Again, databases and database apps have been dealing with
      this kind of problems for decades and know how handle
      them.I've been living in this problem space for many years
      now and seen the wheel reinvented many times. Whenever the
      plumbing does not guarantees EOIO but the business demands
      it, it gets pushed into the app layer where TCP (retries and
      acks) is reimplemented, to various success levels.
 
        YZF - 1 hours ago
        > The application comes back after the crash and find the
        same state as before the crash (the 'socket' still has the
        data ready to pull off), it pull again, process, and then
        commit.Isn't this the same app layer stuff that has to get
        reimplemented?  I can see how this is often pushed back to
        a human (oops, my money transfer didn't go through, I'll
        have to redo it) but it's still something that has to be
        dealt with somewhere.
 
          rusanu - 28 minutes ago
          > it's still something that has to be dealt with
          somewhereDatabase programmers have the means to deal with
          it off-the-shelf: BEGIN TRANSACTION ... COMMIT. When your
          queues are in the database, this becomes trivial. Even
          without the system I'm talking about (Service Broker)
          that has the queues stored in the database, most regular
          messaging systems do support enrolling into a distributed
          transaction and achieve an atomic dequeue/process
          sequence, is just that many apps/deployments don't bother
          to do it because the ops overhead (XA coordinator),
          reduced throughput and/or simply not understanding the
          consequences.Point is that durable, persisted, transacted
          'sockets' are behaving very differently from a TCP
          socket. Is a whole lot harder to simply lose a message in
          the app layer when interacting with a database.
 
    deepsun - 2 hours ago
    Are you talking about distributed environment, where network
    partitions can occur? If yes, then there's Two Generals Problem
    and "FLP result", that just prove it impossible. So I guess
    you're talking about non-distributed environment.In other
    words, to reliably agree on a system state (whether message id
    was delivered) you need the system to be Consistent. And per
    CAP theorem, it cannot be Available in presence of
    Partitions.So other people you're referring to probably talk
    about distributed systems.
 
      aetherson - 1 hours ago
      Exactly once delivery is theoretically impossible.Approaching
      exactly once delivery asymptotically is possible.  Your
      parent poster's point is that this is one where you can get
      so close to exactly once in order that in practice you never
      violate for years and years.
 
        rusanu - 45 minutes ago
        My point is that I've seen people making decisions to go
        with 'best effort delivery' and live with the (costly)
        consequences because they read here and there that EOIO is
        impossible, so why bother trying.
 
      rusanu - 2 hours ago
      Yes, I'm talking about distributed systems and I am aware of
      the CAP theorem. Hence my choice of the word 'practical'.As I
      said, users had cases when the plumbing (messaging system)
      recovered and delivered messages after +40 days of network
      partitioning. Correctly written apps completed the business
      process associated with those messages as normal, no special
      case. Humans can identify and fix outages and databases can
      easily outlast network outages (everything is durable,
      transacted, with HA/DR). And many business processes make
      perfect sense to resume/continue after the outage, even if it
      lasted for days.
 
        alexbeloi - 18 minutes ago
        I'm not really versed in this topic, but it seems like
        using a database for a socket makes the system entirely
        centralized around that database. Is there something I'm
        missing?
 
      rusanu - 1 hours ago
      Pat had some opinions about CAP and SOA and distributed
      systems, see [0]. I also remember a talk given by Pat and
      Eric Brewer together, that went deeper into the whole CAP
      ideas vis-a-vis the model Pat had been advocating (see
      Fiefdoms and Emissaries [1]), but I can't remember when it
      was or find a link for it.[0] https://blogs.msdn.microsoft.co
      m/pathelland/2007/05/20/soa-a...[1] http://download.microsoft
      .com/documents/uk/msdn/architecture...
 
  redy - 4 hours ago
  Everybody knows "exactly once" means deduplication. This is not
  exactly a new problem.That said it's still a difficult problem
  and I actually wish people would stop trying to roll their own
  schemes. For example, this scheme relies on examining a Kafka
  outbound topic to resolve in-doubt outbound messages. But what
  happens if the outbound message + commit is still "in flight"
  when the system recovers so the system retransmits the in-doubt
  outbound message and so rather than deduping it now is generating
  dupes? Yes, the chances of this are minimal which means it will
  happen.
 
    Spearchucker - 1 hours ago
    The protocol is in fact quite simple:1. Sender sends message.2.
    If no acknowledgement from recipient is returned to sender,
    resend message until an acknowledgement is received from
    recipient.3. If recipient receives message, check store to see
    if it's been received before.4. If it's not in the store, store
    it, acknowledge receipt to sender, and process it.5. If it's
    already in the recipient's store, acknowledge receipt to
    sender, and discard message.
 
    lvh - 4 hours ago
    Everyone might know that, but it's certainly the case that a
    lot of systems have _claimed_ it where they actually meant
    "disastrous characteristics under load and/or (partial)
    failure".
 
      [deleted]
 
  Cieplak - 5 hours ago
  Basically the Two General's Problem,
  eh?https://en.wikipedia.org/wiki/Two_Generals%27_Problem
 
    newobj - 5 hours ago
    Exactly.
 
      jancsika - 2 hours ago
      Question: what is the relationship (if any) between TCP and
      the Two Generals Problem?
 
        y4mi - 2 hours ago
        Maybe read the opening paragraph of the linked wiki
        article? It's within the first few sentences.
 
          [deleted]
 
        sillysaurus3 - 2 hours ago
        The Two Generals Problem deals with communicating over an
        unreliable channel, as does TCP.TCP solves the problem by
        retransmission, similar to approaches illustrated in https:
        //en.wikipedia.org/wiki/Two_Generals%27_Problem#Engine...No
        te that it's impossible to solve it completely, and TCP is
        no different. If you unplug your router, there is no hope
        of getting a message through. But it's highly unlikely that
        there is no path between you and your destination (a
        "partition") so TCP retransmits until it finds one. This
        would be similar to generals sending messengers until one
        gets through and a confirmation messenger comes back.
 
  urbit - 4 hours ago
  Exactly-once messaging is not a hard problem so long as you
  change the problem a little.  (Plug warning: this is the way
  Urbit does EOM, or EOM* if you prefer.)TLDR, you don't need
  infinitely durable infinite memory.  You just need (a) a single-
  level store in which every event is a transaction, (b) a message
  protocol with true end-to-end acks, and (c) a permanent session
  between every pair of nodes.  We don't have single-level storage
  hardware (although XPoint comes close) but it's easy to simulate
  semantically.Think about the problem intuitively.  I want to send
  you a stream of messages, each of which you act on exactly once.I
  do it like this: I put a sequence number in each message.  I keep
  sending you message 1 until you send me an acknowledgment that
  you heard message 1.  I can also send messages 2, 3, or whatever
  (lockstep isn't needed), but you process messages in order and
  ignore messages you've already heard.  Never ack an ack, always
  ack a dup.What does implementing this design require?  It
  requires a persistent sequence number, on the sending endpoint,
  for every receiving endpoint in the world. (Edit: and of course
  another SN on the receiving end.) Of course, for every endpoint
  you haven't sent to yet, the number is 0 and doesn't need to be
  stored.  This is not a terribly onerous amount of storage.A
  sequence number is the piece of state generally known as a
  "session" in networking theory parlance.  Of course a TCP
  connection is a session.  We're simply saying that every two
  endpoints have a sort of implicit, persistent
  connection.Moreover, every message must be a transaction.
  Acknowledging the message acknowledges that the application, as
  well as the sequence number, has been fully and persistently
  updated with the knowledge that the message contains.  One way to
  think of this is that we're adding the latency of a transaction
  to persistent storage to our packet latency.  SSDs are good
  here.Furthermore, since in real life semantic messages need to be
  delivered in packet-sized fragments, a fragmentation model with
  end-to-end "piggybacked" acks is needed.  There can't be a
  separate message-level ack (the "stack of acks" problem is the
  curse of the Internet stack) -- acknowledging all the fragment
  packets acknowledges the message.All this and more is explained
  here (plug alert):http://media.urbit.org/whitepaper.pdf
 
    jerf - 4 hours ago
    You've embedded the idempotency into the protocol, which is
    nice, but doesn't get around the problems of not being able to
    do EOM. You also have the case where you sent a message and
    lost network connectivity before any ack could come back,
    resulting in you not knowing if your message arrived 0 or 1
    times, which isn't surprising since that case is a constant on
    a network with anything less than 100% reliability.That's not
    EOM, that's just the sensible workarounds to use in light of
    the fact EOM doesn't exist. Obviously a lot of protocols and
    applications use such things since the inability to have EOM in
    the real world has not rendered our entire network edifice
    impossible or useless in real life.
 
      urbit - 3 hours ago
      If you define EOM as magic, or as a solution to the Two
      Generals problem, EOM is certainly impossible.If EOM means
      "the programmer doesn't have to think about idempotency," EOM
      is what I want.  Happy to call this "EOM asterisk" if you
      and/or the OP like.At any point in the conversation, you
      don't know whether your interlocutor has yet received the
      last message you sent.   This is because you are talking over
      a network, rather than over a magic bus.However, you know
      that before the endpoint processes each message, it has
      processed each previous message once and exactly once.  I
      think this is what the programmer wants -- asterisk or
      no.Network connectivity failure is best modeled as the limit
      case of network latency.  Of course, when you send a message
      over a network in the real world, you can't know whether it
      has been received or not until you get an
      acknowledgment.(Edit: asterisks.)
 
      urbit - 3 hours ago
      Also, I would say that the worst thing about the inability to
      do EOM on the Internet stack is that it puts the
      responsibility for idempotence on the application programmer,
      then tests that corner case 1 in 100 times, maybe 1 in
      1000.It is not impossible to handle a system that produces
      rare corner cases.  It's just expensive and a pain in the
      butt.
 
[deleted]
 
ju-st - 6 hours ago
52 requests, 5.4 MB and 8.63 seconds to load a simple blog post.
With a bonus XHR request every 5 seconds.
 
  numbsafari - 4 hours ago
  ... and this is why friends don't let friends use Segment.
 
  heipei - 4 hours ago
  That's not everything: This website contacted 56 IPs in 6
  countries across 47 domains to perform 100 HTTP transactions. In
  total, 3 MB of data was transfered, which is 5 MB uncompressed.
  It took 4.103 seconds to load this page. 37 cookies were set, and
  8 messages to the console were
  logged.https://urlscan.io/result/b2e27a08-1298-491a-863f-
  8cadc45e73...
 
  joshuagvk - 4 hours ago
  And each one made only once!
 
majidazimi - 3 hours ago
What is so exciting about this? There is still possibility of
duplicates. You still have to put the engineering effort to deal
with duplicates end-to-end. If the code is there to deal with
duplicates end-to-end, then does it really matter to have 5
duplicates or 35? Or may be they just did it to add some useful
cool-tech in to CV?
 
iampims - 5 hours ago
If the OP doesn't mind expanding a little on this bit, I'd be
grateful.> If the dedupe worker crashes for any reason or
encounters an error from Kafka, when it re-starts it will first
consult the ?source of truth? for whether an event was published:
the output topic.Does this mean that "on worker crash" the worker
replays the entire output topic and compare it to the rocksdb
dataset?Also, how do you handle scaling up or down the number of
workers/partitions?
 
  ryanworl - 5 hours ago
  I'm not the OP, but changing the number of Kafka partitions isn't
  a super graceful operation. You would be wise to add as many as
  you could reasonably need assuming one consumer thread per
  partition. But not too many because each one is at least two
  files on disk!
 
robotresearcher - 5 hours ago
> [I]t?s pretty much impossible to have messages only ever be
delivered once.IIRC, it's provably impossible in a distributed
system where processes might fail, i.e. all real systems.
 
incan1275 - 5 hours ago
To be fair, they are upfront in the beginning about not being able
to adhere to an exactly-once model."In the past three months we?ve
built an entirely new de-duplication system to get as close as
possible to exactly-once delivery"What's annoying is that they do
not get precise and formal about what they want out of their new
model. Also, their numbers only speak to performance, not
correctness.On the plus side, I think it's awesome to see bloom
filters successfully used in production. That sort of thing is easy
to implement, but not easy to get right for every use case.
 
qsymmachus - 5 hours ago
It's funny, at my company we implemented deduplication almost
exactly the same way for our push notification sender.The scale is
smaller (about 10k rpm), but the basic idea is the same (store a
message ID in a key-value store after each successful send).I like
the idea of invalidating records by overall size, we hadn't thought
of that. We just use a fixed 24-hour TTL.
 
mamon - 5 hours ago
"Exactly once" model of message is theoretically impossible to do
in distributed environment with nonzero possibility of failure. If
you haven't received acknowledgement from the other side of
communication in the specified amount of time you can only do one
of two things:1) do nothing, risking message loss2) retransmit,
risking duplicationBut of course that's only from messaging system
point of view. Deduplication at receiver end can help reduce
problem, but itself can fail (there is no foolproof way of
implementing that pseudocode's "has_seen(message.id)" method)
 
  LgWoodenBadger - 5 hours ago
  I agree.  I wish more messaging platforms would recognize this
  and stop trying to paper-over the failure mode (Kafka just came
  out with "Exactly Once," which I think is a variant of the 2
  -Phase-Commit protocol, which still does not solve the
  problem).My go-to for explaining to people is the Two Generals
  Problem https://en.wikipedia.org/wiki/Two_Generals%27_Problem
 
    LgWoodenBadger - 2 hours ago
    After watching the video from the Kafka Summit talk
    https://www.confluent.io/kafka-summit-nyc17/introducing-
    exac...it's an essentially-useless guarantee for any sort of
    Kafka consumer that interacts with an external system.Exactly-
    Once is not something that can be provided-for or delegated-to
    an arbitrary, non-integrated system.  For an "integrated"
    system, it requires idempotent behavior, in which case it's
    really At-Least-Once, so...
 
  iso-8859-1 - 3 hours ago
  > nonzeroIf you send the message with a hash of the previous
  state on the server, (like with proof-of-work in Bitcoin), since
  it is so unlikely that it will hash will be the same with and
  without the message appended, it doesn't really matter if it is
  strictly nonzero, if it is just small enough.
 
  ecesena - 5 hours ago
  I think "exactly once" doesn't imply "non-reprocessing" (sorry
  for the double negative).Meaning, you want "exactly once" and you
  don't want duplicates, yes. But you allow for reprocessing,
  provided that you have a way for deduplicating.You want a
  guarantee that if the producer (at the top of your data
  processing pipeline) sends a message, then this message
  eventually corresponds to exactly 1 record in your final
  storage(s).One easy-to-understand-yet-simplistic example is: send
  a message to kafka, use topic+partition+offset as primary key,
  store in a RDBMS. This is widely accepted as "exactly once", but
  clearly you may have multiple attempts to save the message into
  the db, which will fail due to the primary key integrity
  constrain.
 
    StavrosK - 5 hours ago
    So basically what's called "at least once with deduplicating".
    The parent comment addresses that.
 
  kmicklas - 5 hours ago
  > there is no foolproof way of implementing that pseudocode's
  "has_seen(message.id)" methodWait why? Just because you'd have to
  store the list of seen messages theoretically indefinitely?
 
    convolvatron - 5 hours ago
    sure, if you assume that everything can fail, then it doesnt
    help to store the list of messages you've seenbut if you can
    persist a monotonic sequence number, thats gets you pretty far.
    we use tcp all the time even though its has no magic answer to
    distributed consensus (and uses a super-weak checksum). 2pc
    doesnt guarantee progress and/or consistency either and its
    pretty effective.
 
    sethev - 5 hours ago
    There's also a race condition in there when you receive the
    duplicate before publish_and_commit is done doing its thing -
    assuming they're not actually serializing all messages through
    a single thread like the pseudocode implies.What they've done
    is shift the point of failure from something less reliable
    (client's network) to something more reliable (their rocksdb
    approach) - reducing duplicates but not guaranteeing exactly
    once processing.
 
      dastbe - 3 hours ago
      its not so much that they are serializing all messages
      through a single thread, but that they are consistently
      routing messages (duplicates and all) into separate shards
      that are processed by a single thread.
 
  malandrew - 5 hours ago
  An overview of FLP Impossibilityhttp://the-paper-trail.org/blog/a
  -brief-tour-of-flp-impossib...
 
siliconc0w - 1 hours ago
Was thinking a 'reverse bloom filter' could be cool to possibly
avoid the RocksDB for situations like this- turns out it already
exists: https://github.com/jmhodges/opposite_of_a_bloom_filterI
love it when that happens.
 
[deleted]
 
redmalang - 2 hours ago
Another possible approach: https://cloud.google.com/blog/big-
data/2017/06/how-qubit-ded...
 
bmsatierf - 5 hours ago
In terms of connectivity, we deal with a similar problem here at
CloudWalk to process payment transactions from POS terminals, where
most of them rely on GPRS connections.Our network issues are nearly
6 times higher (~3.5%) due to GPRS, and we solved the duplication
problem with an approach involving both client and server
side.Clients would always ensure that all the information sent by
the server was successfully received. If something goes wrong,
instead of retrying (sending the payment again), the client sends
just the transaction UUID to the server, and the server might
either respond with: A. the corresponding response for the
transaction or B. not found.In the scenario A, the POS terminal
managed to properly send all the information to the server but
failed to receive the response.In the scenario B, the POS terminal
didn't even manage to properly send the information to the server,
so the POS can safely retry.
 
travisjeffery - 5 hours ago
Kafka 0.11 (recently released) has exactly once semantics and
transactional messages built-in.- Talk from Kafka Summit:
https://www.confluent.io/kafka-summit-nyc17/resource/#exactl...-
Proposal:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...
 
linkmotif - 5 hours ago
It's worth noting that the next major Kafka release (0.11, out
soon) will include exactly once semantics! With basically no
configuration and no code changes for the user. Perhaps even more
noteworthy is this feature is built on top of a new transactions
feature [0]. With this release, you'll be able to atomically write
to multiple topics.[0]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...
 
mooneater - 5 hours ago
Awesome story. What I would like to hear more about, is the people
side. The teams and personalities involved with coming up with this
new system and the transition.
 
squiguy7 - 5 hours ago
I wonder how they partition by "messageID" they use to ensure that
the de-duplication happens on the same worker. I would imagine that
this affects their ability to add more brokers in the
future.Perhaps they expect a 1:1 mapping of RocksDB, partition, and
de-duplication worker.
 
  squeaky-clean - 4 hours ago
  I'm also curious about this. I'm guessing it's some function on
  the id that maps it to a partition. What happens when you add
  more consumers? Is there a way to know which partitions include
  messageIds that would be put into another partition's ownership
  by the change and then move anything from one RocksDB instance to
  the other?
 
  bytecodes - 1 hours ago
  Kafka does this as part of its design. A topic has a declared
  number of partitions (which can't really be changed on the fly,
  you choose a theoretical high number and hope it's enough), and
  an agreed upon hash algorithm chooses between those partitions
  (probably in Java, so hashCode is readily available for
  primitives as well as objects). Each partition is really like its
  own topic, so you lose in-order messaging for anything not
  included in your partition key.
 
gsmethells - 5 hours ago
Why do I get the feeling this is repeating TCP features at the
Message level? There must a protocol that can hide this exactly
once need away. TCP doesn't create downloads, generally, that are
bad and fail their checksum test, hence packets that make up the
file are not duplicated.
 
  jkarneges - 4 hours ago
  Yes there is some duplication of TCP capability here.The problem
  with relying on TCP for reliability is that its state is in
  memory, associated with a particular peer IP address, and
  acknowledgements passed back to the sender only indicate that the
  receiver has the data in local memory, not that the data has been
  processed.A file download over TCP can fail, for example due to a
  network problem. Ensuring reliable delivery requires additional
  measures outside of TCP, such as retrying the download using a
  new connection.In practice, this means that TCP is primarily
  useful for providing flow control and offering a streaming
  interface (no worry about packet sizes). Less so as a complete
  solution for transmission reliability.
 
  jmaygarden - 5 hours ago
  How would you use TCP sockets to de-duplicate Kafka streams with
  a many-to-many communication pattern? Surely there is a valid
  scalability reason for why AWS IoT only provides "at least once"
  guarantees in their MQTT broker even when TCP is the underlying
  transport [1].[1]
  http://docs.aws.amazon.com/iot/latest/developerguide/protoco...
 
jkestelyn - 1 hours ago
Relevant to this topic: Description of exactly-once implementation
in Google Cloud Dataflow + what "exactly once" means in context of
streaming:https://cloud.google.com/blog/big-data/2017/05/after-
lambda-...(Google Cloud emp speaking)