HN Gopher Feed (2017-06-29) - page 1 of 10 ___________________________________________________________________
Delivering Billions of Messages Exactly Once
287 points by fouadmatin
https://segment.com/blog/exactly-once-delivery/___________________________________________________________________
kortox - 5 hours ago
With deduplication state on the worker nodes, how does scaling up,
or provisioning new machines, or moving a partition between
machines work?
throwaway67 - 6 hours ago
... or they could have used BigQuery with a primary key on message
ID.
redmalang - 2 hours ago
BQ doesn't have primary keys. Perhaps you are thinking of the id
that can be supplied with the streaming insert? This has very
loose guarantees on what is de-duplicated (~5m iirc)
vgt - 1 hours ago
yea I think within the context of BigQuery the most sensible
thing would be to do an aggregate per the column that would be
considered a primary key. For example [0]. That said, Streaming
API de-dupe window is very nice in practice.I mentioned
elsewhere on Google Cloud the most elegant way of doing this is
with Google Cloud Dataflow [1](work at
G)[0]https://stackoverflow.com/questions/38446499/bigquery-
dedupl...[1]https://cloud.google.com/blog/big-data/2017/06/how-
qubit-ded...
PinguTS - 5 hours ago
That reminds me of the safety-related protocols we use since years
in embedded electronics like rail-road signaling, medical, and
other areas.
stratosgear - 6 hours ago
Site seems to be down. Any ideas how big these HN hugs of death
usually are? How big of a traffic spike brings these servers down?
crgwbr - 5 hours ago
I got front paged once a few years ago, it was about 15,000 page
views in the course of a few hours. It's probably grown since
then, but sure by what factor.
Exuma - 6 hours ago
It's not down
stratosgear - 6 hours ago
Hmm weird. I get: This site can?t be reachedsegment.com refused
to connect.Try:Checking the
connectionERR_CONNECTION_REFUSEDMaybe something local to me
only?
psadauskas - 6 hours ago
Its probably an ad- or tracker-blocker. I had to disable mine
to be able to load segment.com, since they're pretty much a
tracking company.
fouadmatin - 6 hours ago
Might be an adblock issue? (some blockers treat direct access
to a website and loading an anlytics snippet as the same)
asadlionpk - 5 hours ago
You are probably blocking this site in your hosts file. Since
it's a tracking tool.
disconnected - 5 hours ago
Check your ad blocker/hosts file.In here, uMatrix just blocks
the site with the message:> uMatrix has prevented the
following page from loading:> https://segment.com/blog
/exactly-once-delivery/I checked, and one of my uMatrix hosts
files includes 'www.segment.com'.
stratosgear - 5 hours ago
Yep. Disabling AdAway seems to do the trick. Thanks for the
heads up
[deleted]
fouadmatin - 6 hours ago
hm seems to be up for me? you might've caught our autoscaler mid-
scaling!
ratherbefuddled - 4 hours ago
"Almost Exactly Once" doesn't have quite the same ring to it, but
it is actually accurate. We've already discovered better trade-
offs haven't we?
spullara - 5 hours ago
This isn't the solution I would architect. It is much easier to de-
duplicate when processing your analytics workload later and you
don't need to do so much work.
falcolas - 5 hours ago
So, a combination of a best effort "at least once" messaging with
deduplication near the receiving edge. Fairly standard,
honestly.There is still a potential for problems in the message
delivery to the endpoints (malformed messages, Kafka errors,
messages not being consumed fast enough and lost), or duplication
at that level (restart a listener on the Kafka stream with the
wrong message ID) as well.This is based on my own pains with
Kinesis and Lambda (which, I know, isn't Kafka).In my experience,
better to just allow raw "at least once" messaging and perform
idempotant actions based off the messages. It's not always possible
(and harder when it is possible), but its tradeoffs mean you're
less likely to lose messages.
caust1c - 5 hours ago
This is generally better, but we're delivering these messages to
integrations which don't necessarily take idempotent actions.
ggcampinho - 6 hours ago
Isn't the new feature of Kafka about
this?https://issues.apache.org/jira/browse/KAFKA-4815
caust1c - 6 hours ago
It would help, but messages are sent to the API first. We aren't
sending messages directly to Kafka from the Internet, of
course.So we can get duplicate API submissions regardless of
whether or not we enabled transactional productions into kafka
from a producer.
StreamBright - 6 hours ago
"The single requirement of all data pipelines is that they cannot
lose data."Unless the business value of data is derived after
applying some summary statistics, than even sampling the data
works, and you can lose events in an event stream, while not
changing the insight gained. Originally Kafka was designed to be a
high throughput data bus for analytical pipeline where losing
messages was ok. More recently they are experimenting with exactly
once delivery.
skMed - 35 minutes ago
Having built something similar with RabbitMQ in a high-volume
industry, there are a lot of benefits people in this thread are
glossing over while they focus on semantics. Yes, this is not
"exactly once" -- there really is no such thing in a distributed
system. The best you can hope for is that your edge consumers are
idempotent.There is a lot of value derived from de-duping near
ingress of a heavy stream such as this. You're saving downstream
consumers time (money) and potential headaches. You may be in an
industry where duplicates can be handled by a legacy system, but it
takes 5-10 minutes of manual checks and corrections by support
staff. That was my exact use case and I can't count the number of
times we were thankful our de-duping handled "most" cases.
vgt - 3 hours ago
Qubit's strategy to do this via streaming, leveraging Google Cloud
Dataflow:https://cloud.google.com/blog/big-data/2017/06/how-qubit-
ded...
sethammons - 4 hours ago
"Exactly Once" Over a window of time that changes depending on the
amount of ingested events.Basically, they read from a kafka stream
and have a deduplication layer in rocks db that produces to another
kafka stream. They process about 2.2 billion events through it per
day.While this will reduce duplicates and get closer to Exactly
Once (helping reduce the two generals problem on incoming requests
and potentially work inside their data center), they still have to
face the same problem again when they push data out to their
partners. Some packet loss, and they will be sending out duplicate
to the partner.Not to downplay what they have done as we are doing
a similar thing near our exit nodes to do our best to prevent
duplicate events making it out of our system.
wonderwonder - 6 hours ago
Would something like AWS SQS not scale for something like this? We
currently push about 25k daily transactions over SQS, obviously no
where near the scale of this, just wondering about what limitations
we will bump into potentially.
timdorr - 5 hours ago
The limitations are most likely on price. For the 200B messages
they've already processed in the last 3 months, that would be
$100,000 total on just the SQS FIFO queue, or $33,333 per month.
And that's not counting data transfer.
hilbertseries - 5 hours ago
As long as everything is in ec2 data transfer will be free.
You're cost calculations are also off base. You'll need to
send, receive and delete every message that you process via
SQS. These can all be done in batches of 10. So it's 200B *
3/10 * .50 / million, which comes out to 60k over 3 months.
Still not cheap, Kinesis is probably the better option in this
case if you want an AWS managed service.
wonderwonder - 5 hours ago
That'd definitely be a pretty effective limitation.
psadauskas - 5 hours ago
SQS is not "exactly once", so might not meet their requirements.
idunno246 - 5 hours ago
sqs has fifo queues which claim to be exactly once
lostcolony - 5 hours ago
SQS queues deduplicate over a 5 minute window. This is
claiming a much larger window.Either way your listener(s)
still has to have its own deduplication. Ensuring a message
ends up on the queue only once, and ensuring it's processed
exactly once, are two different problems that require
separate handling (and, the former is what most out of the
box systems claim to solve, while the latter is more
important, and, frankly, completely negates the need of the
former).
twodave - 5 hours ago
Actually, it can be. You just pay more.
eropple - 5 hours ago
(edit: incorrect, my bad, see thread)
twodave - 5 hours ago
This is wrong. There is a type of SQS queue that indeed does
exactly-once. It costs more, and not the default option, but it
is there.
eropple - 5 hours ago
Oh, right, they added their FIFO queues. My bad; thanks for
the correction. Worth noting, though, that AWS's own services
can't talk to FIFO queues. If you want to wire up SNS or
Lambda dead-letter queues to a FIFO queue, you are out of
luck.
newobj - 5 hours ago
I don't want to ever see the phrase "Exactly Once" without several
asterisks behind it. It might be exactly once from an "overall"
point of view, but the client effectively needs infinitely durable
infinite memory to perform the "distributed transaction" of acting
on the message and responding to the server.Imagine:- Server
delivers message M- Client process event E entailed by message M-
Client tries to ack (A) message on server, but "packet loss"- To
make matters worse, let's say the client also immediately dies
after thisHow do you handle this situation? The client must
transactionally/simultaneously commit both E and A/intent-to-A.
Since the server never received an acknowledgment of M, it will
either redeliver the message, in which case some record of E must
be kept to deduplicate on, or it will wait for client to resend A,
or some mixture of both. Note: if you say "just make E idempotent",
then you don't need exactly-once delivery in the first place...I
suppose you could go back to some kind of lock-step processing of
messages to avoid needing to record all (E,A) that are in flight,
but that would obviously kill throughput of the message
queue.Exactly Once can only ever be At Least Once with some out-of-
the-box idempotency that may not be as cheap as the natural
idempotency of your system.EDIT: Recommended reading: "Life Beyond
Distributed Transactions", Pat Helland -
http://queue.acm.org/detail.cfm?id=3025012
rusanu - 2 hours ago
Having spent 7 years of my life working with Pat Helland in
implementing Exactly Once In Order messaging with SQL Server
Service Broker[0] I can assure you that practical EOIO messaging
is possible, exists, and works as advertised. Delivering data
EOIO is not rocket science, TCP has been doing it for decades.
Extending the TCP paradigms (basically retries and acks) to
messaging is not hard if you buy into transacted persisted
storage (= a database) for keeping undelivered messages
(transmission queue) and storing received messages before
application consumption (destination queue). Just ack after you
commit locally.We've been doing this in 2005 at +10k msgs/sec (1k
payload), durable, transacted, fully encrypted, with no two phase
commit, supporting long disconnects (I know for documented cases
conversations that resumed and continued after +40 days of
partner network disconnect).Running into resource limits
(basically out of disk space) is something the database community
knows how to monitor, detect and prevent for decades now.I really
don't get why so many articles, blogs and comments claim this is
not working or impossible or even hard. My team shipped this +12
years ago, is used by major deployments, technology is proven and
little changed in the original protocol.[0]
https://docs.microsoft.com/en-us/sql/database-engine/configu...
YZF - 1 hours ago
TCP does deliver data more than once though. Sure within a TCP
session you are guaranteed not to get the same byte twice out
of your socket, bytes are delivered exactly once, in order.
Now if your application that uses TCP pulls data out of the
socket and then dies the data will need to be delivered again
and the TCP protocol is unable to help us there, it's
application level logic at that point.So everyone is talking
about the same thing here, data can get delivered twice, and
the application must handle it in an idempotent way. For TCP
this happens to be looking at some counter and throwing away
anything that's already been seen. With a single client/server
connection maintaining sequence numbers like TCP does solve the
problem but it's harder to use this same technique in multi-
client, multi-server, "stateless" requests that are common in
the web world.EDIT: To clarify, what I mean by TCP "does
deliver data more than once", is that one side of the
connection can send the same data twice. It's true that it's
then discarded by the other end but this is what people are
talking about when they talk about the theoretical
impossibility of never sending anything twice. The rest is
basically the idem-potency thing of ensuring that data received
twice somehow doesn't cause anything abnormal.
bmelton - 1 hours ago
Right. If your definition of "delivered" is "showed a
message to the user", then exactly once is a fairly trivial
goal to attain. If it's defined "data over the wire", then
it's nearly impossible to avoid some form of ACK/RETRY
mechanism on real networks, which means that messages will
need to sometimes have to be 'delivered' many times to ensure
a single display to the user.
rusanu - 1 hours ago
Not when your 'socket' is a persisted, durable, transacted
medium (ie. a database). Sure, applications can 'pull data
out of the socket and then die', but this is a common
scenarios on databases which is handled with transaction and
post-crash recovery. The application comes back after the
crash and find the same state as before the crash (the
'socket' still has the data ready to pull off), it pull
again, process, and then commit. This is not duplicate
delivery, since we're talking about an aborted and rolled
back attempt, followed later by a successful processing.
Again, databases and database apps have been dealing with
this kind of problems for decades and know how handle
them.I've been living in this problem space for many years
now and seen the wheel reinvented many times. Whenever the
plumbing does not guarantees EOIO but the business demands
it, it gets pushed into the app layer where TCP (retries and
acks) is reimplemented, to various success levels.
YZF - 1 hours ago
> The application comes back after the crash and find the
same state as before the crash (the 'socket' still has the
data ready to pull off), it pull again, process, and then
commit.Isn't this the same app layer stuff that has to get
reimplemented? I can see how this is often pushed back to
a human (oops, my money transfer didn't go through, I'll
have to redo it) but it's still something that has to be
dealt with somewhere.
rusanu - 28 minutes ago
> it's still something that has to be dealt with
somewhereDatabase programmers have the means to deal with
it off-the-shelf: BEGIN TRANSACTION ... COMMIT. When your
queues are in the database, this becomes trivial. Even
without the system I'm talking about (Service Broker)
that has the queues stored in the database, most regular
messaging systems do support enrolling into a distributed
transaction and achieve an atomic dequeue/process
sequence, is just that many apps/deployments don't bother
to do it because the ops overhead (XA coordinator),
reduced throughput and/or simply not understanding the
consequences.Point is that durable, persisted, transacted
'sockets' are behaving very differently from a TCP
socket. Is a whole lot harder to simply lose a message in
the app layer when interacting with a database.
deepsun - 2 hours ago
Are you talking about distributed environment, where network
partitions can occur? If yes, then there's Two Generals Problem
and "FLP result", that just prove it impossible. So I guess
you're talking about non-distributed environment.In other
words, to reliably agree on a system state (whether message id
was delivered) you need the system to be Consistent. And per
CAP theorem, it cannot be Available in presence of
Partitions.So other people you're referring to probably talk
about distributed systems.
aetherson - 1 hours ago
Exactly once delivery is theoretically impossible.Approaching
exactly once delivery asymptotically is possible. Your
parent poster's point is that this is one where you can get
so close to exactly once in order that in practice you never
violate for years and years.
rusanu - 45 minutes ago
My point is that I've seen people making decisions to go
with 'best effort delivery' and live with the (costly)
consequences because they read here and there that EOIO is
impossible, so why bother trying.
rusanu - 2 hours ago
Yes, I'm talking about distributed systems and I am aware of
the CAP theorem. Hence my choice of the word 'practical'.As I
said, users had cases when the plumbing (messaging system)
recovered and delivered messages after +40 days of network
partitioning. Correctly written apps completed the business
process associated with those messages as normal, no special
case. Humans can identify and fix outages and databases can
easily outlast network outages (everything is durable,
transacted, with HA/DR). And many business processes make
perfect sense to resume/continue after the outage, even if it
lasted for days.
alexbeloi - 18 minutes ago
I'm not really versed in this topic, but it seems like
using a database for a socket makes the system entirely
centralized around that database. Is there something I'm
missing?
rusanu - 1 hours ago
Pat had some opinions about CAP and SOA and distributed
systems, see [0]. I also remember a talk given by Pat and
Eric Brewer together, that went deeper into the whole CAP
ideas vis-a-vis the model Pat had been advocating (see
Fiefdoms and Emissaries [1]), but I can't remember when it
was or find a link for it.[0] https://blogs.msdn.microsoft.co
m/pathelland/2007/05/20/soa-a...[1] http://download.microsoft
.com/documents/uk/msdn/architecture...
redy - 4 hours ago
Everybody knows "exactly once" means deduplication. This is not
exactly a new problem.That said it's still a difficult problem
and I actually wish people would stop trying to roll their own
schemes. For example, this scheme relies on examining a Kafka
outbound topic to resolve in-doubt outbound messages. But what
happens if the outbound message + commit is still "in flight"
when the system recovers so the system retransmits the in-doubt
outbound message and so rather than deduping it now is generating
dupes? Yes, the chances of this are minimal which means it will
happen.
Spearchucker - 1 hours ago
The protocol is in fact quite simple:1. Sender sends message.2.
If no acknowledgement from recipient is returned to sender,
resend message until an acknowledgement is received from
recipient.3. If recipient receives message, check store to see
if it's been received before.4. If it's not in the store, store
it, acknowledge receipt to sender, and process it.5. If it's
already in the recipient's store, acknowledge receipt to
sender, and discard message.
lvh - 4 hours ago
Everyone might know that, but it's certainly the case that a
lot of systems have _claimed_ it where they actually meant
"disastrous characteristics under load and/or (partial)
failure".
[deleted]
Cieplak - 5 hours ago
Basically the Two General's Problem,
eh?https://en.wikipedia.org/wiki/Two_Generals%27_Problem
newobj - 5 hours ago
Exactly.
jancsika - 2 hours ago
Question: what is the relationship (if any) between TCP and
the Two Generals Problem?
y4mi - 2 hours ago
Maybe read the opening paragraph of the linked wiki
article? It's within the first few sentences.
[deleted]
sillysaurus3 - 2 hours ago
The Two Generals Problem deals with communicating over an
unreliable channel, as does TCP.TCP solves the problem by
retransmission, similar to approaches illustrated in https:
//en.wikipedia.org/wiki/Two_Generals%27_Problem#Engine...No
te that it's impossible to solve it completely, and TCP is
no different. If you unplug your router, there is no hope
of getting a message through. But it's highly unlikely that
there is no path between you and your destination (a
"partition") so TCP retransmits until it finds one. This
would be similar to generals sending messengers until one
gets through and a confirmation messenger comes back.
urbit - 4 hours ago
Exactly-once messaging is not a hard problem so long as you
change the problem a little. (Plug warning: this is the way
Urbit does EOM, or EOM* if you prefer.)TLDR, you don't need
infinitely durable infinite memory. You just need (a) a single-
level store in which every event is a transaction, (b) a message
protocol with true end-to-end acks, and (c) a permanent session
between every pair of nodes. We don't have single-level storage
hardware (although XPoint comes close) but it's easy to simulate
semantically.Think about the problem intuitively. I want to send
you a stream of messages, each of which you act on exactly once.I
do it like this: I put a sequence number in each message. I keep
sending you message 1 until you send me an acknowledgment that
you heard message 1. I can also send messages 2, 3, or whatever
(lockstep isn't needed), but you process messages in order and
ignore messages you've already heard. Never ack an ack, always
ack a dup.What does implementing this design require? It
requires a persistent sequence number, on the sending endpoint,
for every receiving endpoint in the world. (Edit: and of course
another SN on the receiving end.) Of course, for every endpoint
you haven't sent to yet, the number is 0 and doesn't need to be
stored. This is not a terribly onerous amount of storage.A
sequence number is the piece of state generally known as a
"session" in networking theory parlance. Of course a TCP
connection is a session. We're simply saying that every two
endpoints have a sort of implicit, persistent
connection.Moreover, every message must be a transaction.
Acknowledging the message acknowledges that the application, as
well as the sequence number, has been fully and persistently
updated with the knowledge that the message contains. One way to
think of this is that we're adding the latency of a transaction
to persistent storage to our packet latency. SSDs are good
here.Furthermore, since in real life semantic messages need to be
delivered in packet-sized fragments, a fragmentation model with
end-to-end "piggybacked" acks is needed. There can't be a
separate message-level ack (the "stack of acks" problem is the
curse of the Internet stack) -- acknowledging all the fragment
packets acknowledges the message.All this and more is explained
here (plug alert):http://media.urbit.org/whitepaper.pdf
jerf - 4 hours ago
You've embedded the idempotency into the protocol, which is
nice, but doesn't get around the problems of not being able to
do EOM. You also have the case where you sent a message and
lost network connectivity before any ack could come back,
resulting in you not knowing if your message arrived 0 or 1
times, which isn't surprising since that case is a constant on
a network with anything less than 100% reliability.That's not
EOM, that's just the sensible workarounds to use in light of
the fact EOM doesn't exist. Obviously a lot of protocols and
applications use such things since the inability to have EOM in
the real world has not rendered our entire network edifice
impossible or useless in real life.
urbit - 3 hours ago
If you define EOM as magic, or as a solution to the Two
Generals problem, EOM is certainly impossible.If EOM means
"the programmer doesn't have to think about idempotency," EOM
is what I want. Happy to call this "EOM asterisk" if you
and/or the OP like.At any point in the conversation, you
don't know whether your interlocutor has yet received the
last message you sent. This is because you are talking over
a network, rather than over a magic bus.However, you know
that before the endpoint processes each message, it has
processed each previous message once and exactly once. I
think this is what the programmer wants -- asterisk or
no.Network connectivity failure is best modeled as the limit
case of network latency. Of course, when you send a message
over a network in the real world, you can't know whether it
has been received or not until you get an
acknowledgment.(Edit: asterisks.)
urbit - 3 hours ago
Also, I would say that the worst thing about the inability to
do EOM on the Internet stack is that it puts the
responsibility for idempotence on the application programmer,
then tests that corner case 1 in 100 times, maybe 1 in
1000.It is not impossible to handle a system that produces
rare corner cases. It's just expensive and a pain in the
butt.
[deleted]
ju-st - 6 hours ago
52 requests, 5.4 MB and 8.63 seconds to load a simple blog post.
With a bonus XHR request every 5 seconds.
numbsafari - 4 hours ago
... and this is why friends don't let friends use Segment.
heipei - 4 hours ago
That's not everything: This website contacted 56 IPs in 6
countries across 47 domains to perform 100 HTTP transactions. In
total, 3 MB of data was transfered, which is 5 MB uncompressed.
It took 4.103 seconds to load this page. 37 cookies were set, and
8 messages to the console were
logged.https://urlscan.io/result/b2e27a08-1298-491a-863f-
8cadc45e73...
joshuagvk - 4 hours ago
And each one made only once!
majidazimi - 3 hours ago
What is so exciting about this? There is still possibility of
duplicates. You still have to put the engineering effort to deal
with duplicates end-to-end. If the code is there to deal with
duplicates end-to-end, then does it really matter to have 5
duplicates or 35? Or may be they just did it to add some useful
cool-tech in to CV?
iampims - 5 hours ago
If the OP doesn't mind expanding a little on this bit, I'd be
grateful.> If the dedupe worker crashes for any reason or
encounters an error from Kafka, when it re-starts it will first
consult the ?source of truth? for whether an event was published:
the output topic.Does this mean that "on worker crash" the worker
replays the entire output topic and compare it to the rocksdb
dataset?Also, how do you handle scaling up or down the number of
workers/partitions?
ryanworl - 5 hours ago
I'm not the OP, but changing the number of Kafka partitions isn't
a super graceful operation. You would be wise to add as many as
you could reasonably need assuming one consumer thread per
partition. But not too many because each one is at least two
files on disk!
robotresearcher - 5 hours ago
> [I]t?s pretty much impossible to have messages only ever be
delivered once.IIRC, it's provably impossible in a distributed
system where processes might fail, i.e. all real systems.
incan1275 - 5 hours ago
To be fair, they are upfront in the beginning about not being able
to adhere to an exactly-once model."In the past three months we?ve
built an entirely new de-duplication system to get as close as
possible to exactly-once delivery"What's annoying is that they do
not get precise and formal about what they want out of their new
model. Also, their numbers only speak to performance, not
correctness.On the plus side, I think it's awesome to see bloom
filters successfully used in production. That sort of thing is easy
to implement, but not easy to get right for every use case.
qsymmachus - 5 hours ago
It's funny, at my company we implemented deduplication almost
exactly the same way for our push notification sender.The scale is
smaller (about 10k rpm), but the basic idea is the same (store a
message ID in a key-value store after each successful send).I like
the idea of invalidating records by overall size, we hadn't thought
of that. We just use a fixed 24-hour TTL.
mamon - 5 hours ago
"Exactly once" model of message is theoretically impossible to do
in distributed environment with nonzero possibility of failure. If
you haven't received acknowledgement from the other side of
communication in the specified amount of time you can only do one
of two things:1) do nothing, risking message loss2) retransmit,
risking duplicationBut of course that's only from messaging system
point of view. Deduplication at receiver end can help reduce
problem, but itself can fail (there is no foolproof way of
implementing that pseudocode's "has_seen(message.id)" method)
LgWoodenBadger - 5 hours ago
I agree. I wish more messaging platforms would recognize this
and stop trying to paper-over the failure mode (Kafka just came
out with "Exactly Once," which I think is a variant of the 2
-Phase-Commit protocol, which still does not solve the
problem).My go-to for explaining to people is the Two Generals
Problem https://en.wikipedia.org/wiki/Two_Generals%27_Problem
LgWoodenBadger - 2 hours ago
After watching the video from the Kafka Summit talk
https://www.confluent.io/kafka-summit-nyc17/introducing-
exac...it's an essentially-useless guarantee for any sort of
Kafka consumer that interacts with an external system.Exactly-
Once is not something that can be provided-for or delegated-to
an arbitrary, non-integrated system. For an "integrated"
system, it requires idempotent behavior, in which case it's
really At-Least-Once, so...
iso-8859-1 - 3 hours ago
> nonzeroIf you send the message with a hash of the previous
state on the server, (like with proof-of-work in Bitcoin), since
it is so unlikely that it will hash will be the same with and
without the message appended, it doesn't really matter if it is
strictly nonzero, if it is just small enough.
ecesena - 5 hours ago
I think "exactly once" doesn't imply "non-reprocessing" (sorry
for the double negative).Meaning, you want "exactly once" and you
don't want duplicates, yes. But you allow for reprocessing,
provided that you have a way for deduplicating.You want a
guarantee that if the producer (at the top of your data
processing pipeline) sends a message, then this message
eventually corresponds to exactly 1 record in your final
storage(s).One easy-to-understand-yet-simplistic example is: send
a message to kafka, use topic+partition+offset as primary key,
store in a RDBMS. This is widely accepted as "exactly once", but
clearly you may have multiple attempts to save the message into
the db, which will fail due to the primary key integrity
constrain.
StavrosK - 5 hours ago
So basically what's called "at least once with deduplicating".
The parent comment addresses that.
kmicklas - 5 hours ago
> there is no foolproof way of implementing that pseudocode's
"has_seen(message.id)" methodWait why? Just because you'd have to
store the list of seen messages theoretically indefinitely?
convolvatron - 5 hours ago
sure, if you assume that everything can fail, then it doesnt
help to store the list of messages you've seenbut if you can
persist a monotonic sequence number, thats gets you pretty far.
we use tcp all the time even though its has no magic answer to
distributed consensus (and uses a super-weak checksum). 2pc
doesnt guarantee progress and/or consistency either and its
pretty effective.
sethev - 5 hours ago
There's also a race condition in there when you receive the
duplicate before publish_and_commit is done doing its thing -
assuming they're not actually serializing all messages through
a single thread like the pseudocode implies.What they've done
is shift the point of failure from something less reliable
(client's network) to something more reliable (their rocksdb
approach) - reducing duplicates but not guaranteeing exactly
once processing.
dastbe - 3 hours ago
its not so much that they are serializing all messages
through a single thread, but that they are consistently
routing messages (duplicates and all) into separate shards
that are processed by a single thread.
malandrew - 5 hours ago
An overview of FLP Impossibilityhttp://the-paper-trail.org/blog/a
-brief-tour-of-flp-impossib...
siliconc0w - 1 hours ago
Was thinking a 'reverse bloom filter' could be cool to possibly
avoid the RocksDB for situations like this- turns out it already
exists: https://github.com/jmhodges/opposite_of_a_bloom_filterI
love it when that happens.
[deleted]
redmalang - 2 hours ago
Another possible approach: https://cloud.google.com/blog/big-
data/2017/06/how-qubit-ded...
bmsatierf - 5 hours ago
In terms of connectivity, we deal with a similar problem here at
CloudWalk to process payment transactions from POS terminals, where
most of them rely on GPRS connections.Our network issues are nearly
6 times higher (~3.5%) due to GPRS, and we solved the duplication
problem with an approach involving both client and server
side.Clients would always ensure that all the information sent by
the server was successfully received. If something goes wrong,
instead of retrying (sending the payment again), the client sends
just the transaction UUID to the server, and the server might
either respond with: A. the corresponding response for the
transaction or B. not found.In the scenario A, the POS terminal
managed to properly send all the information to the server but
failed to receive the response.In the scenario B, the POS terminal
didn't even manage to properly send the information to the server,
so the POS can safely retry.
travisjeffery - 5 hours ago
Kafka 0.11 (recently released) has exactly once semantics and
transactional messages built-in.- Talk from Kafka Summit:
https://www.confluent.io/kafka-summit-nyc17/resource/#exactl...-
Proposal:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...
linkmotif - 5 hours ago
It's worth noting that the next major Kafka release (0.11, out
soon) will include exactly once semantics! With basically no
configuration and no code changes for the user. Perhaps even more
noteworthy is this feature is built on top of a new transactions
feature [0]. With this release, you'll be able to atomically write
to multiple topics.[0]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...
mooneater - 5 hours ago
Awesome story. What I would like to hear more about, is the people
side. The teams and personalities involved with coming up with this
new system and the transition.
squiguy7 - 5 hours ago
I wonder how they partition by "messageID" they use to ensure that
the de-duplication happens on the same worker. I would imagine that
this affects their ability to add more brokers in the
future.Perhaps they expect a 1:1 mapping of RocksDB, partition, and
de-duplication worker.
squeaky-clean - 4 hours ago
I'm also curious about this. I'm guessing it's some function on
the id that maps it to a partition. What happens when you add
more consumers? Is there a way to know which partitions include
messageIds that would be put into another partition's ownership
by the change and then move anything from one RocksDB instance to
the other?
bytecodes - 1 hours ago
Kafka does this as part of its design. A topic has a declared
number of partitions (which can't really be changed on the fly,
you choose a theoretical high number and hope it's enough), and
an agreed upon hash algorithm chooses between those partitions
(probably in Java, so hashCode is readily available for
primitives as well as objects). Each partition is really like its
own topic, so you lose in-order messaging for anything not
included in your partition key.
gsmethells - 5 hours ago
Why do I get the feeling this is repeating TCP features at the
Message level? There must a protocol that can hide this exactly
once need away. TCP doesn't create downloads, generally, that are
bad and fail their checksum test, hence packets that make up the
file are not duplicated.
jkarneges - 4 hours ago
Yes there is some duplication of TCP capability here.The problem
with relying on TCP for reliability is that its state is in
memory, associated with a particular peer IP address, and
acknowledgements passed back to the sender only indicate that the
receiver has the data in local memory, not that the data has been
processed.A file download over TCP can fail, for example due to a
network problem. Ensuring reliable delivery requires additional
measures outside of TCP, such as retrying the download using a
new connection.In practice, this means that TCP is primarily
useful for providing flow control and offering a streaming
interface (no worry about packet sizes). Less so as a complete
solution for transmission reliability.
jmaygarden - 5 hours ago
How would you use TCP sockets to de-duplicate Kafka streams with
a many-to-many communication pattern? Surely there is a valid
scalability reason for why AWS IoT only provides "at least once"
guarantees in their MQTT broker even when TCP is the underlying
transport [1].[1]
http://docs.aws.amazon.com/iot/latest/developerguide/protoco...
jkestelyn - 1 hours ago
Relevant to this topic: Description of exactly-once implementation
in Google Cloud Dataflow + what "exactly once" means in context of
streaming:https://cloud.google.com/blog/big-data/2017/05/after-
lambda-...(Google Cloud emp speaking)