CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES <zookeeper_id> bitnami/zookeeper:3.8.2 "/opt/bitnami/script…" <some_time_ago> Up <some_time> 2181/tcp zookeeper <kafka_id> bitnami/kafka:3.6.1 "/opt/bitnami/script…" <some_time_ago> Up <some_time> 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
2. 检查 ZooKeeper 日志
查看 ZooKeeper 容器的日志,以确保它已成功启动并正在运行:
1
docker logs zookeeper
日志中应包含类似以下内容:
1 2
INFO Started AdminServer on address 0.0.0.0, port 8080 INFO binding to port 0.0.0.0/0.0.0.0:2181
3. 检查 Kafka 日志
查看 Kafka 容器的日志,以确保它已成功连接到 ZooKeeper 并正在运行:
1
docker logs kafka
日志中应包含类似以下内容:
1 2
INFO [KafkaServer id=1] started (kafka.server.KafkaServer) INFO [ZooKeeperClient] Connected. (org.apache.zookeeper.ClientCnxn)
@Data @Component @ConfigurationProperties(prefix = "spring.data.redis") public class RedisProperties { private String host; private int port; private String password; private int database; private int lockWatchdogTimeout; }
12:17:43:771 INFO 21064 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672] 12:17:43:808 INFO 21064 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#50f40653:0/SimpleConnection@536d97f8 [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 59641]
消费者出现类似以下日志,收到消息。
language-txt
1 2 3
12:17:31:074 INFO 8924 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2a27cb34:0/SimpleConnection@671facee [delegate=amqp://rabbitmq@127.0.0.1:5672/, localPort= 59634] 12:17:31:141 INFO 8924 --- [ main] cn.itcast.mq.ConsumerApplication : Started ConsumerApplication in 1.011 seconds (JVM running for 1.462) 12:17:43:848 INFO 8924 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消费者接收到simple.queue的消息:【hello, spring amqp!】
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
-- 创建一个本地缓存对象item_cache local item_cache = ngx.shared.item_cache;
-- 函数,向openresty本身发送类似/path/item/10001请求,根据conf配置,将被删除/path前缀并代理至tomcat程序 local function read_get(path, params) local rsp = ngx.location.capture('/path'..path,{ method = ngx.HTTP_GET, args = params, }) if not rsp then ngx.log(ngx.ERR, "http not found, path: ", path, ", args: ", params); ngx.exit(404) end return rsp.body end
-- 函数,如果本地有缓存,使用缓存,如果没有代理到tomcat然后将数据存入缓存 local function read_data(key, expire, path, params) -- query local cache local rsp = item_cache:get(key) -- query tomcat if not rsp then ngx.log(ngx.ERR, "redis cache miss, try tomcat, key: ", key) rsp = read_get(path, params) end -- write into local cache item_cache:set(key, rsp, expire) return rsp end
local _M = { read_data = read_data }
return _M
item.lua是处理来自形如/api/item/10001请求的程序,内容如下
language-lua
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
-- include local commonUtils = require('common') local cjson = require("cjson")
-- get url params 10001 local id = ngx.var[1] -- redirect item, 缓存过期时间1800s, 适合长时间不改变的数据 local itemJson = commonUtils.read_data("item:id:"..id, 1800,"/item/"..id,nil) -- redirect item/stock, 缓存过期时间4s, 适合经常改变的数据 local stockJson = commonUtils.read_data("item:stock:id:"..id, 4 ,"/item/stock/"..id, nil) -- json2table local item = cjson.decode(itemJson) local stock = cjson.decode(stockJson) -- combine item and stock item.stock = stock.stock item.sold = stock.sold -- return result ngx.say(cjson.encode(item))