NATS 中文文档
  • 引言
  • 发布日志
  • NATS 2.0
  • 对比 NATS
  • FAQ
  • NATS Concepts
    • What is NATS
    • Subject-Based Messaging
    • Publish-Subscribe
    • Request-Reply
    • Queue Groups
    • Acknowledgements
    • Sequence Numbers
  • Developing With NATS
    • Introduction
    • Connecting
      • Connecting to the Default Server
      • Connecting to a Specific Server
      • Connecting to a Cluster
      • Connection Name
      • Setting a Connect Timeout
      • Ping/Pong Protocol
      • Turning Off Echo'd Messages
      • Miscellaneous functionalities
    • Automatic Reconnections
      • Disabling Reconnect
      • Set the Number of Reconnect Attempts
      • Avoiding the Thundering Herd
      • Pausing Between Reconnect Attempts
      • Listening for Reconnect Events
      • Buffering Messages During Reconnect Attempts
    • Securing Connections
      • Authenticating with a User and Password
      • Authenticating with a Token
      • Authenticating with an NKey
      • Authenticating with a Credentials File
      • Encrypting Connections with TLS
    • Receiving Messages
      • Synchronous Subscriptions
      • Asynchronous Subscriptions
      • Unsubscribing
      • Unsubscribing After N Messages
      • Replying to a Message
      • Wildcard Subscriptions
      • Queue Subscriptions
      • Draining Messages Before Disconnect
      • Structured Data
    • Sending Messages
      • Including a Reply Subject
      • Request-Reply Semantics
      • Caches, Flush and Ping
      • Sending Structured Data
    • Monitoring the Connection
      • Listen for Connection Events
      • Slow Consumers
    • Tutorials
      • Explore NATS Pub/Sub
      • Explore NATS Request/Reply
      • Explore NATS Queueing
      • Advanced Connect and Custom Dialer in Go
  • NATS Server
    • Installing
    • Running
      • Windows Service
    • Clients
    • Flags
    • Configuration
      • Securing NATS
        • Enabling TLS
        • Authentication
          • Tokens
          • Username/Password
          • TLS Authentication
          • NKeys
          • Authentication Timeout
        • Authorization
        • Multi Tenancy using Accounts
        • Decentralized JWT Authentication/Authorization
          • Account lookup using Resolver
          • Memory Resolver Tutorial
          • Mixed Authentication/Authorization Setup
      • Clustering
        • Configuration
        • TLS Authentication
      • Super-cluster with Gateways
        • Configuration
      • Leaf Nodes
        • Configuration
      • Logging
      • Monitoring
      • System Events
        • System Events & Decentralized JWT Tutorial
    • Managing A NATS Server
      • Upgrading a Cluster
      • Slow Consumers
      • Signals
    • NATS and Docker
      • Tutorial
      • Docker Swarm
      • Python and NGS Running in Docker
  • NATS Tools
    • Introduction
    • mkpasswd
    • nk
    • nsc
      • Basics
      • Streams
      • Services
      • Signing Keys
      • Revocation
      • Managed Operators
    • nats-account-server
      • Basics
      • Inspecting JWTs
      • Directory Store
      • Update Notifications
    • nats-top
      • Tutorial
    • nats-bench
  • NATS Streaming Concepts
    • Introduction
    • Relation to NATS
    • Client Connections
    • Channels
      • Message Log
      • Subscriptions
        • Regular
        • Durable
        • Queue Group
        • Redelivery
    • Store Interface
    • Store Encryption
    • Clustering
      • Supported Stores
      • Configuration
      • Auto Configuration
      • Containers
    • Fault Tolerance
      • Active Server
      • Standby Servers
      • Shared State
      • Failover
    • Partitioning
    • Monitoring
      • Endpoints
  • Developing With NATS Streaming
    • Introduction
    • Connecting to NATS Streaming
    • Publishing to a Channel
    • Receiving Messages from a Channel
    • Durable Subscriptions
    • Queue Subscriptions
    • Acknowledgements
    • The Streaming Protocol
  • NATS Streaming Server
    • Important Changes
    • Installing
    • Running
    • Configuring
      • Command Line Arguments
      • Configuration File
      • Store Limits
      • 持久化
        • 文件存储
        • SQL 存储
      • Securing
    • Process Signaling
    • Windows Service
    • Embedding NATS Streaming Server
    • Docker Swarm
  • NATS Protocol
    • Protocol Demo
    • Client Protocol
      • Developing a Client
    • NATS Cluster Protocol
  • 在 Kubernetes中使用NATS
    • 序言
    • 安装 NATS 和 NATS Streaming
    • 创建一个 Kubernetes 集群
    • 容错(Fault Tolerance)模式下的NATS Streaming 集群
    • NATS 和 Prometheus Operator
    • NATS 集群和证书管理
    • 使用 cfssl 来提高 NATS 集群的安全性
    • 使用负载均衡器(Load Balancer) 为NATS提供外部访问
    • 使用Helm在Digital Ocean 创建一个NATS 超级集群
    • 使用Helm从0到 K8s到 子节点
由 GitBook 提供支持
在本页
  • Limiting Incoming/Pending Messages by Count and Bytes
  • Detect a Slow Consumer and Check for Dropped Messages

这有帮助吗?

  1. Developing With NATS
  2. Monitoring the Connection

Slow Consumers

上一页Listen for Connection Events下一页Tutorials

最后更新于4年前

这有帮助吗?

NATS is designed to move messages through the server quickly. As a result, NATS depends on the applications to consider and respond to changing message rates. The server will do a bit of impedance matching, but if a client is too slow the server will eventually cut them off by closing the connection. These cut off connections are called .

One way some of the libraries deal with bursty message traffic is to buffer incoming messages for a subscription. So if an application can handle 10 messages per second and sometimes receives 20 messages per second, the library may hold the extra 10 to give the application time to catch up. To the server, the application will appear to be handling the messages and consider the connection healthy. Most client libraries will notify the application that there is a SlowConsumer error and discard messages.

Receiving and dropping messages from the server keeps the connection to the server healthy, but creates an application requirement. There are several common patterns:

  • Use request/reply to throttle the sender and prevent overloading the subscriber

  • Use a queue with multiple subscribers splitting the work

  • Persist messages with something like NATS streaming

Libraries that cache incoming messages may provide two controls on the incoming queue, or pending messages. These are useful if the problem is bursty publishers and not a continuous performance mismatch. Disabling these limits can be dangerous in production and although setting these limits to 0 may help find problems, it is also a dangerous proposition in production.

Check your libraries documentation for the default settings, and support for disabling these limits.

The incoming cache is usually per subscriber, but again, check the specific documentation for your client library.

Limiting Incoming/Pending Messages by Count and Bytes

The first way that the incoming queue can be limited is by message count. The second way to limit the incoming queue is by total size. For example, to limit the incoming cache to 1,000 messages or 5mb whichever comes first:

nc, err := nats.Connect("demo.nats.io")
if err != nil {
    log.Fatal(err)
}
defer nc.Close()

// Subscribe
sub1, err := nc.Subscribe("updates", func(m *nats.Msg) {})
if err != nil {
    log.Fatal(err)
}

// Set limits of 1000 messages or 5MB, whichever comes first
sub1.SetPendingLimits(1000, 5*1024*1024)

// Subscribe
sub2, err := nc.Subscribe("updates", func(m *nats.Msg) {})
if err != nil {
    log.Fatal(err)
}

// Set no limits for this subscription
sub2.SetPendingLimits(-1, -1)

// Close the connection
nc.Close()
Connection nc = Nats.connect("nats://demo.nats.io:4222");

Dispatcher d = nc.createDispatcher((msg) -> {
    // do something
});

d.subscribe("updates");

d.setPendingLimits(1_000, 5 * 1024 * 1024); // Set limits on a dispatcher

// Subscribe
Subscription sub = nc.subscribe("updates");

sub.setPendingLimits(1_000, 5 * 1024 * 1024); // Set limits on a subscription

// Do something

// Close the connection
nc.close();
// slow pending limits are not configurable on node-nats
nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

future = asyncio.Future()

async def cb(msg):
  nonlocal future
  future.set_result(msg)

# Set limits of 1000 messages or 5MB
await nc.subscribe("updates", cb=cb, pending_bytes_limit=5*1024*1024, pending_msgs_limit=1000)
# The Ruby NATS client currently does not have option to specify a subscribers pending limits.
// slow pending limits are not configurable on TypeScript NATS client.
natsConnection      *conn      = NULL;
natsSubscription    *sub1      = NULL;
natsSubscription    *sub2      = NULL;
natsStatus          s          = NATS_OK;

s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);

// Subscribe
if (s == NATS_OK)
    s = natsConnection_Subscribe(&sub1, conn, "updates", onMsg, NULL);

// Set limits of 1000 messages or 5MB, whichever comes first
if (s == NATS_OK)
    s = natsSubscription_SetPendingLimits(sub1, 1000, 5*1024*1024);

// Subscribe
if (s == NATS_OK)
    s = natsConnection_Subscribe(&sub2, conn, "updates", onMsg, NULL);

// Set no limits for this subscription
if (s == NATS_OK)
    s = natsSubscription_SetPendingLimits(sub2, -1, -1);

(...)

// Destroy objects that were created
natsSubscription_Destroy(sub1);
natsSubscription_Destroy(sub2);
natsConnection_Destroy(conn);

Detect a Slow Consumer and Check for Dropped Messages

When a slow consumer is detected and messages are about to be dropped, the library may notify the application. This process may be similar to other errors or may involve a custom callback.

Some libraries, like Java, will not send this notification on every dropped message because that could be noisy. Rather the notification may be sent once per time the subscriber gets behind. Libraries may also provide a way to get a count of dropped messages so that applications can at least detect a problem is occurring.

// Set the callback that will be invoked when an asynchronous error occurs.
nc, err := nats.Connect("demo.nats.io", nats.ErrorHandler(logSlowConsumer))
if err != nil {
    log.Fatal(err)
}
defer nc.Close()

// Do something with the connection
class SlowConsumerReporter implements ErrorListener {
    public void errorOccurred(Connection conn, String error)
    {
    }

    public void exceptionOccurred(Connection conn, Exception exp) {
    }

    // Detect slow consumers
    public void slowConsumerDetected(Connection conn, Consumer consumer) {
        // Get the dropped count
        System.out.println("A slow consumer dropped messages: "+ consumer.getDroppedCount());
    }
}

public class SlowConsumerListener {
    public static void main(String[] args) {

        try {
            Options options = new Options.Builder().
                                        server("nats://demo.nats.io:4222").
                                        errorListener(new SlowConsumerReporter()). // Set the listener
                                        build();
            Connection nc = Nats.connect(options);

            // Do something with the connection

            nc.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
// slow consumer detection is not configurable on NATS JavaScript client.
   nc = NATS()

   async def error_cb(e):
     if type(e) is nats.aio.errors.ErrSlowConsumer:
       print("Slow consumer error, unsubscribing from handling further messages...")
       await nc.unsubscribe(e.sid)

   await nc.connect(
      servers=["nats://demo.nats.io:4222"],
      error_cb=error_cb,
      )

   msgs = []
   future = asyncio.Future()
   async def cb(msg):
       nonlocal msgs
       nonlocal future
       print(msg)
       msgs.append(msg)

       if len(msgs) == 3:
         # Head of line blocking on other messages caused
         # by single message processing taking too long...
         await asyncio.sleep(1)

   await nc.subscribe("updates", cb=cb, pending_msgs_limit=5)

   for i in range(0, 10):
     await nc.publish("updates", "msg #{}".format(i).encode())
     await asyncio.sleep(0)

   try:
     await asyncio.wait_for(future, 1)
   except asyncio.TimeoutError:
     pass

   for msg in msgs:
     print("[Received]", msg)

   await nc.close()
# The Ruby NATS client currently does not have option to customize slow consumer limits per sub.
// slow consumer detection is not configurable on NATS TypeScript client.
static void
errorCB(natsConnection *conn, natsSubscription *sub, natsStatus s, void *closure)
{

    // Do something
    printf("Error: %d - %s", s, natsStatus_GetText(s));
}

(...)

natsConnection      *conn      = NULL;
natsOptions         *opts      = NULL;
natsStatus          s          = NATS_OK;

s = natsOptions_Create(&opts);
if (s == NATS_OK)
    s = natsOptions_SetErrorHandler(opts, errorCB, NULL);
if (s == NATS_OK)
    s = natsConnection_Connect(&conn, opts);

(...)

// Destroy objects that were created
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
slow consumers