2019年7月2日 #


SPRING BATCH 讀文件時,是按一行一行來讀取數據,再按CHUNKSIZE提交到REMOTE操作,有時要整合當前行和下幾行,再決定CHUNKSIZE,以便相關的數據能在遠程同一個PROCESSOR中按順序進行處理,因為相關的數據被拆成幾個CHUNK來處理的話,就有可能不按順序來處理。這樣就需要動態調整CHUNKSIZE。



posted @ 2019-07-02 11:13 paulwong 閱讀(11) | 評論 (0)編輯 收藏

2019年6月28日 #


消息隊列之 RabbitMQ

Spring Boot 中使用 RabbitMQ

posted @ 2019-06-28 10:24 paulwong 閱讀(14) | 評論 (0)編輯 收藏

2019年6月27日 #

How to implement JMS ReplyTo using SpringBoot

Request-Response is a message-exchange-pattern. In some cases, a message producer may want the consumers to reply to a message. The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).

In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.

For simplicity, this pattern is typically implemented in a purely synchronous fashion, as in web service calls over HTTP, which holds a connection open and waits until the response is delivered or the timeout period expires. However, request–response may also be implemented asynchronously, with a response being returned at some unknown later time.

For more information, check here.

Now, let’s jump into the code. In Spring, there are 2 ways to implement this (at least I know of).

  1. Using JMSTemplate
  2. Using Spring Integration

For demo purpose, I used ActiveMQ. However, you can implement this in other messaging systems like IBM MQ, Rabbit MQ, Tibco EMS, etc. In this demo, I send an ObjectMessage of type Order and reply with a Shipment object.

Using JMSTemplate

  1. First, we include the required dependencies. Replace the activemq dependency with your messaging system’s jars if not using ActiveMQ

  2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

         broker-url: tcp://localhost:
         non-blocking-redelivery: true
           trust-all: true    
  3. Note in the above configuration can be changed to spring.activemq.packages.trusted with the appropriate packages.
  4. Now spring will do it’s magic and inject all the required Beans as usual :) However, in our code, we need to EnableJms

    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.annotation.EnableJms;

    public class ActiveMQConfig {

    public static final String ORDER_QUEUE = "order-queue";
    public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

  5. First, we will configure the Producer

    public class Producer {

         JmsMessagingTemplate jmsMessagingTemplate;

         JmsTemplate jmsTemplate;

    public Shipment sendWithReply(Order order) throws JMSException {

             Session session 
    = jmsMessagingTemplate.getConnectionFactory().createConnection()
    false, Session.AUTO_ACKNOWLEDGE);

             ObjectMessage objectMessage 
    = session.createObjectMessage(order);

    new ActiveMQQueue(ORDER_REPLY_2_QUEUE));

    return jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue(ORDER_QUEUE),
                     objectMessage, Shipment.
    class); //this operation seems to be blocking + sync
  6. Note in the above code that, JmsMessagingTemplate is used instead of JmsTemplatebecause, we are interested in the method convertSendAndReceive. As seen in the method signature, it waits to receive the Shipment object from the consumer.
  7. Next, we can see the Receiver

    public class Receiver implements SessionAwareMessageListener<Message> {

    public void onMessage(Message message, Session session) throws JMSException {
             Order order 
    = (Order) ((ActiveMQObjectMessage) message).getObject();
             Shipment shipment 
    = new Shipment(order.getId(), UUID.randomUUID().toString());

    // done handling the request, now create a response message
             final ObjectMessage responseMessage = new ActiveMQObjectMessage();

    // Message sent back to the replyTo address of the income message.
             final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
  8. Using the javax.jms.Session the javax.jms.MessageProducer is created and used to send the reply message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

Using Spring Integration

  1. First, we include the required dependencies in addition to the above dependencies

  2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

    -url: tcp://localhost:61616
    -blocking-redelivery: true
    -all: true   
  3. Note in the above configuration can be changed to spring.activemq.packages.trusted with the appropriate packages.
  4. Next we create the required Beans for the Spring Integration.

    public class ActiveMQConfig {

    public static final String ORDER_QUEUE = "order-queue";
    public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

    public MessageConverter messageConverter() {
             MappingJackson2MessageConverter converter 
    = new MappingJackson2MessageConverter();
    return converter;

    public MessageChannel requests() {
    return new DirectChannel();

    = "requests")
    public JmsOutboundGateway jmsGateway(ActiveMQConnectionFactory activeMQConnectionFactory) {
             JmsOutboundGateway gateway 
    = new JmsOutboundGateway();
    return gateway;

         Receiver receiver;

    public DefaultMessageListenerContainer responder(ActiveMQConnectionFactory activeMQConnectionFactory) {
             DefaultMessageListenerContainer container 
    = new DefaultMessageListenerContainer();
             MessageListenerAdapter adapter 
    = new MessageListenerAdapter(new Object() {

    public Shipment handleMessage(Order order) {
    return receiver.receiveMessage(order);

    return container;
  5. Next, we will configure the MessagingGateway

     @MessagingGateway(defaultRequestChannel = "requests")
    public interface ClientGateway {
         Shipment sendAndReceive(Order order);
  6. We then Autowire this gateway in our Component class when we want to send and receive the message. A sample is shown below.

    public class Receiver {
    public Shipment receiveMessage(@Payload Order order) {
             Shipment shipment 
    = new Shipment(order.getId(), UUID.randomUUID().toString());
    return shipment;
  7. Next we configure the Componen to process the Order message. After successful execution, this component will send the Shipment message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

For those, who just want to clone the code, head out to aniruthmp/jms

Written on June 5, 2018

posted @ 2019-06-27 09:20 paulwong 閱讀(19) | 評論 (0)編輯 收藏

2019年6月26日 #


posted @ 2019-06-26 14:13 paulwong 閱讀(19) | 評論 (0)編輯 收藏

2019年6月24日 #





posted @ 2019-06-24 17:42 paulwong 閱讀(22) | 評論 (0)編輯 收藏

2019年6月20日 #

Multiple MongoDB connectors with Spring Boot

posted @ 2019-06-20 15:12 paulwong 閱讀(19) | 評論 (0)編輯 收藏

2019年6月18日 #

NFS Server 架設

  1. 安裝 nfs-utils 套件。
    [root@kvm5 ~]# yum install -y nfs-utils
  2. 建立 NFS 分享目錄。
    [root@kvm5 ~]# mkdir /public /protected
  3. 修改 NFS 分享目錄的 SELinux 檔案 context。
    [root@kvm5 ~]# semanage fcontext -a -t public_content_t "/public(/.*)?"
    [root@kvm5 ~]# semanage fcontext -a -t public_content_t "/protected(/.*)?"
    [root@kvm5 ~]# restorecon -Rv /public /protected
  4. 考試時不用自行產生 kerberos keytab,只要依照指定的位置下載,存放在目錄 /etc/ 下,且檔名必須為 krb5.keytab。
    [root@kvm5 ~]# wget -O /etc/krb5.keytab 
  5. kerberos keytab 的驗證跟時間有關,server 與 client 都必須校時。
    [root@kvm5 ~]# date
    Sun Jan  7 14:50:04 CST 2018
    [root@kvm5 ~]# chronyc -a makestep
    200 OK
    200 OK
    [root@kvm5 ~]# date
    Mon Nov 20 15:53:22 CST 2017
  6. 在 /protected 下建立次目錄 restricted,並將其擁有者設定為 deyu3,讓 deyu3 可以寫入資料。
    [root@kvm5 ~]# mkdir -p  /protected/restricted 
    [root@kvm5 ~]# chown deyu3 /protected/restricted 
  7. 編輯設定檔 /etc/exports,分享 /protected 及 /public 兩個目錄給網域。
    [root@kvm5 ~]# echo '/protected,sync,sec=krb5p)' > /etc/exports
    [root@kvm5 ~]# echo '/public,sync)' >> /etc/exports
    [root@kvm5 ~]# vim /etc/exports
    [root@kvm5 ~]# cat /etc/exports

  8. NFS 掛載參數說明如下,詳細說明請參考 man 5 nfs 手冊。
    1. rw:read-write,可讀寫的權限;
    2. ro:read-only,唯讀的權限;
    3. sec=mode:安全認證模式;
      1. sec=sys 預設,使用本地 UNIX UIDs 及 GIDs 進行身份認證。
      2. sec=krb5 使用 Kerberos V5 取代本地 UNIX UIDs 及 GIDs 進行身份認證。
      3. sec=krb5i 使用 Kerberos V5 進行身份認證,資料完整性檢查,以防止數據被篡改。
      4. sec=krb5p 使用 Kerberos V5 進行身份認證,資料完整性檢查及 NFS 傳輸加密,以防止數據被篡改,這是最安全的方式。
    4. sync:資料同步寫入到記憶體與硬碟當中;
    [root@kvm5 ~]# man 5 nfs 
  9. 設定使用 4.2 版本,以匯出分享 SELinux context。無適合的版本 client 端掛載時會出現 mount.nfs: Protocol not supported 的訊息。
    [root@kvm5 ~]# vim /etc/sysconfig/nfs  sed -i 's/^\(RPCNFSDARGS=\).*$/\1\"-V 4.2\"/' /etc/sysconfig/nfs 
    [root@kvm5 ~]# grep ^RPCNFSDARGS /etc/sysconfig/nfs  RPCNFSDARGS="-V 4.2" 
  10. 設定開機啟動 nfs 服務,NFS server 端的服務為 nfs-server 及 nfs-secure-server,本版本只要啟動 nfs-server 就同時啟動 nfs-secure-server,而且使用 tab 鍵也不會出現 nfs-secure-server 服務,但有些版本則是兩者分開,必須確認是不是兩種服務都啟動。
    [root@kvm5 ~]# systemctl enable nfs-server.service nfs-secure-server.service 
  11. 啟動 nfs 服務
    [root@kvm5 ~]# systemctl start nfs-server.service nfs-secure-server.service 
  12. 查看目前啟動的 nfs 版本,因 server 指定使用 4.2,若出現 -4.2 表示 nfs server 沒有成功啟動。
    [root@kvm5 ~]# cat /proc/fs/nfsd/versions -2 +3 +4 +4.1 +4.2 
  13. 要確定 nfs-secure-server nfs-server 服務都正常運作。
    [root@kvm5 ~]# systemctl status nfs-secure-server.service nfs-server.service 
    nfs-secure-server.service - Secure NFS Server
    : loaded (/usr/lib/systemd/system/nfs-secure-server.service; enabled)
    : active (running) since Mon 2015-09-21 20:04:10 CST; 8s ago
    : 3075 ExecStart=/usr/sbin/rpc.svcgssd $RPCSVCGSSDARGS (code=exited, status=0/SUCCESS)
     Main PID
    : 3077 (rpc.svcgssd)
    : /system.slice/nfs-secure-server.service
    3077 /usr/sbin/rpc.svcgssd

    21 20:04:10 systemd[1]: Starting Secure NFS Server
    21 20:04:10 systemd[1]: Started Secure NFS Server.

    -server.service - NFS Server
    : loaded (/usr/lib/systemd/system/nfs-server.service; enabled)
    : active (exited) since Mon 2015-09-21 20:04:10 CST; 8s ago
    : 3078 ExecStopPost=/usr/sbin/exportfs -f (code=exited, status=0/SUCCESS)
    : 3076 ExecStop=/usr/sbin/rpc.nfsd 0 (code=exited, status=0/SUCCESS)
    : 3087 ExecStart=/usr/sbin/rpc.nfsd $RPCNFSDARGS $RPCNFSDCOUNT (code=exited, status=0/SUCCESS)
    : 3084 ExecStartPre=/usr/sbin/exportfs -r (code=exited, status=0/SUCCESS)
    : 3083 ExecStartPre=/usr/libexec/nfs-utils/scripts/nfs-server.preconfig (code=exited, status=0/SUCCESS)
     Main PID
    : 3087 (code=exited, status=0/SUCCESS)
    : /system.slice/nfs-server.service

    21 20:04:10 systemd[1]: Starting NFS Server
    21 20:04:10 systemd[1]: Started NFS Server.
  14. 建議不論是否 TAB 有沒有出現提示,都同時啟動這兩個服務。CentOS 安裝版本 nfs-utils-1.3.0-8.el7.x86_64 啟動 nfs-secure-server 出現錯誤訊息,請執行 yum downgrade nfs-utils 換成 nfs-utils-1.3.0-0.el7.x86_64 套件。
    [root@kvm5 ~]# rpm -qa | grep nfs-utils
    @kvm5 ~]# yum downgrade nfs-utils -y
    [root@kvm5 ~]# rpm -qa | grep nfs-utils
  15. 再重新啟動 nfs 服務,並查看是否正常運作。
    [root@kvm5 ~]# systemctl restart nfs-server.service nfs-secure-server.service 
  16. 輸出所有設定的 nfs 分享目錄。
    [root@kvm5 ~]# exportfs -arv

posted @ 2019-06-18 09:08 paulwong 閱讀(24) | 評論 (0)編輯 收藏

2019年6月14日 #

centos7 mysql數據庫安裝和配置

posted @ 2019-06-14 10:24 paulwong 閱讀(20) | 評論 (0)編輯 收藏

2019年6月13日 #


posted @ 2019-06-13 15:22 paulwong 閱讀(26) | 評論 (0)編輯 收藏

2019年6月12日 #

Spring batch 的高級特性--監聽,異常處理,事務

posted @ 2019-06-12 17:03 paulwong 閱讀(23) | 評論 (0)編輯 收藏

僅列出標題  下一頁