paulwong

2019年7月2日 #

動態改變SPRING BATCH 的 CHUNKSIZE

 在SPRING BATCH REMOTE CHUNKING的模式下:
SPRING BATCH 讀文件時,是按一行一行來讀取數據,再按CHUNKSIZE提交到REMOTE操作,有時要整合當前行和下幾行,再決定CHUNKSIZE,以便相關的數據能在遠程同一個PROCESSOR中按順序進行處理,因為相關的數據被拆成幾個CHUNK來處理的話,就有可能不按順序來處理。這樣就需要動態調整CHUNKSIZE。

參照如下:
https://stackoverflow.com/questions/37390602/spring-batch-custom-completion-policy-for-dynamic-chunk-size

并結合SingleItemPeekableItemReader(裝飾者,允許查看下一條數據,真正的操作委托給代理)。

posted @ 2019-07-02 11:13 paulwong 閱讀(11) | 評論 (0)編輯 收藏

2019年6月28日 #

RABBITMQ資源

消息隊列之 RabbitMQ
https://www.jianshu.com/p/79ca08116d57

Spring Boot 中使用 RabbitMQ
https://juejin.im/post/59f194e06fb9a0451329ec53




posted @ 2019-06-28 10:24 paulwong 閱讀(14) | 評論 (0)編輯 收藏

2019年6月27日 #

How to implement JMS ReplyTo using SpringBoot

Request-Response is a message-exchange-pattern. In some cases, a message producer may want the consumers to reply to a message. The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).

In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.

For simplicity, this pattern is typically implemented in a purely synchronous fashion, as in web service calls over HTTP, which holds a connection open and waits until the response is delivered or the timeout period expires. However, request–response may also be implemented asynchronously, with a response being returned at some unknown later time.

For more information, check here.

Now, let’s jump into the code. In Spring, there are 2 ways to implement this (at least I know of).

  1. Using JMSTemplate
  2. Using Spring Integration

For demo purpose, I used ActiveMQ. However, you can implement this in other messaging systems like IBM MQ, Rabbit MQ, Tibco EMS, etc. In this demo, I send an ObjectMessage of type Order and reply with a Shipment object.

Using JMSTemplate

  1. First, we include the required dependencies. Replace the activemq dependency with your messaging system’s jars if not using ActiveMQ

     <dependencies>
         
    <dependency>
             
    <groupId>org.springframework.boot</groupId>
             
    <artifactId>spring-boot-starter-activemq</artifactId>
         
    </dependency>
         
    <dependency>
             
    <groupId>org.apache.activemq.tooling</groupId>
             
    <artifactId>activemq-junit</artifactId>
             
    <version>${activemq.version}</version>
             
    <scope>test</scope>
         
    </dependency>
         
    <dependency>
             
    <groupId>org.springframework.boot</groupId>
             
    <artifactId>spring-boot-starter-test</artifactId>
             
    <scope>test</scope>
         
    </dependency>
     
    </dependencies>
  2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

     spring:
       activemq:
         broker-url: tcp://localhost:
    61616
         non-blocking-redelivery: true
         packages:
           trust-all: true    
  3. Note in the above configuration spring.activemq.packages.trust-all can be changed to spring.activemq.packages.trusted with the appropriate packages.
  4. Now spring will do it’s magic and inject all the required Beans as usual :) However, in our code, we need to EnableJms

    import org.springframework.context.annotation.Configuration;
     
    import org.springframework.jms.annotation.EnableJms;

     @EnableJms
     @Configuration
     
    public class ActiveMQConfig {

         
    public static final String ORDER_QUEUE = "order-queue";
         
    public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

     }
  5. First, we will configure the Producer

     @Slf4j
     @Service
     
    public class Producer {

         @Autowired
         JmsMessagingTemplate jmsMessagingTemplate;

         @Autowired
         JmsTemplate jmsTemplate;

         
    public Shipment sendWithReply(Order order) throws JMSException {
             jmsTemplate.setReceiveTimeout(
    1000L);
             jmsMessagingTemplate.setJmsTemplate(jmsTemplate);

             Session session 
    = jmsMessagingTemplate.getConnectionFactory().createConnection()
                     .createSession(
    false, Session.AUTO_ACKNOWLEDGE);

             ObjectMessage objectMessage 
    = session.createObjectMessage(order);

             objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
             objectMessage.setJMSReplyTo(
    new ActiveMQQueue(ORDER_REPLY_2_QUEUE));
             objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
             objectMessage.setJMSExpiration(
    1000L);
             objectMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);

             
    return jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue(ORDER_QUEUE),
                     objectMessage, Shipment.
    class); //this operation seems to be blocking + sync
         }
     }
  6. Note in the above code that, JmsMessagingTemplate is used instead of JmsTemplatebecause, we are interested in the method convertSendAndReceive. As seen in the method signature, it waits to receive the Shipment object from the consumer.
  7. Next, we can see the Receiver

     @Component
     
    public class Receiver implements SessionAwareMessageListener<Message> {

         @Override
         @JmsListener(destination 
    = ORDER_QUEUE)
         
    public void onMessage(Message message, Session session) throws JMSException {
             Order order 
    = (Order) ((ActiveMQObjectMessage) message).getObject();
             Shipment shipment 
    = new Shipment(order.getId(), UUID.randomUUID().toString());

             
    // done handling the request, now create a response message
             final ObjectMessage responseMessage = new ActiveMQObjectMessage();
             responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
             responseMessage.setObject(shipment);

             
    // Message sent back to the replyTo address of the income message.
             final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
             producer.send(responseMessage);
         }
     }
  8. Using the javax.jms.Session the javax.jms.MessageProducer is created and used to send the reply message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

Using Spring Integration

  1. First, we include the required dependencies in addition to the above dependencies

     <dependency>
       
    <groupId>org.springframework.integration</groupId>
       
    <artifactId>spring-integration-jms</artifactId>
     
    </dependency>
  2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

     spring:
       activemq
    :
         broker
    -url: tcp://localhost:61616
         non
    -blocking-redelivery: true
         packages
    :
           trust
    -all: true   
  3. Note in the above configuration spring.activemq.packages.trust-all can be changed to spring.activemq.packages.trusted with the appropriate packages.
  4. Next we create the required Beans for the Spring Integration.

     @EnableIntegration
     @IntegrationComponentScan
     @Configuration
     
    public class ActiveMQConfig {

         
    public static final String ORDER_QUEUE = "order-queue";
         
    public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

         @Bean
         
    public MessageConverter messageConverter() {
             MappingJackson2MessageConverter converter 
    = new MappingJackson2MessageConverter();
             converter.setTargetType(MessageType.TEXT);
             converter.setTypeIdPropertyName(
    "_type");
             
    return converter;
         }

         @Bean
         
    public MessageChannel requests() {
             
    return new DirectChannel();
         }

         @Bean
         @ServiceActivator(inputChannel 
    = "requests")
         
    public JmsOutboundGateway jmsGateway(ActiveMQConnectionFactory activeMQConnectionFactory) {
             JmsOutboundGateway gateway 
    = new JmsOutboundGateway();
             gateway.setConnectionFactory(activeMQConnectionFactory);
             gateway.setRequestDestinationName(ORDER_QUEUE);
             gateway.setReplyDestinationName(ORDER_REPLY_2_QUEUE);
             gateway.setCorrelationKey(
    "JMSCorrelationID");
             gateway.setSendTimeout(
    100L);
             gateway.setReceiveTimeout(
    100L);
             
    return gateway;
         }

         @Autowired
         Receiver receiver;

         @Bean
         
    public DefaultMessageListenerContainer responder(ActiveMQConnectionFactory activeMQConnectionFactory) {
             DefaultMessageListenerContainer container 
    = new DefaultMessageListenerContainer();
             container.setConnectionFactory(activeMQConnectionFactory);
             container.setDestinationName(ORDER_QUEUE);
             MessageListenerAdapter adapter 
    = new MessageListenerAdapter(new Object() {

                 @SuppressWarnings(
    "unused")
                 
    public Shipment handleMessage(Order order) {
                     
    return receiver.receiveMessage(order);
                 }

             });
             container.setMessageListener(adapter);
             
    return container;
         }
     }
  5. Next, we will configure the MessagingGateway

     @MessagingGateway(defaultRequestChannel = "requests")
     
    public interface ClientGateway {
         Shipment sendAndReceive(Order order);
     }
  6. We then Autowire this gateway in our Component class when we want to send and receive the message. A sample is shown below.

     @Slf4j
     @Component
     
    public class Receiver {
         
    public Shipment receiveMessage(@Payload Order order) {
             Shipment shipment 
    = new Shipment(order.getId(), UUID.randomUUID().toString());
             
    return shipment;
         }
     }
  7. Next we configure the Componen to process the Order message. After successful execution, this component will send the Shipment message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

For those, who just want to clone the code, head out to aniruthmp/jms

Written on June 5, 2018
https://aniruthmp.github.io/Spring-JMS-request-response/

posted @ 2019-06-27 09:20 paulwong 閱讀(19) | 評論 (0)編輯 收藏

2019年6月26日 #

ACTIVE MQ高級特性

https://blog.51cto.com/1754966750/category17.html

posted @ 2019-06-26 14:13 paulwong 閱讀(19) | 評論 (0)編輯 收藏

2019年6月24日 #

JMS消息消費的順序性

現有的系統架構都是分布式的。有多個消息的發送者和多個消息的消費者。例如訂單創建消息和訂單支付消息,我們需要保證先消費訂單創建消息,然后消費訂單支付消息。

如何解決MQ消息消費順序問題
https://segmentfault.com/a/1190000014512075

jms-如何保證消息的順序
https://leokongwq.github.io/2017/01/23/jms-message-order.html







posted @ 2019-06-24 17:42 paulwong 閱讀(22) | 評論 (0)編輯 收藏

2019年6月20日 #

Multiple MongoDB connectors with Spring Boot

http://blog.marcosbarbero.com/multiple-mongodb-connectors-in-spring-boot/

posted @ 2019-06-20 15:12 paulwong 閱讀(19) | 評論 (0)編輯 收藏

2019年6月18日 #

NFS Server 架設

  1. 安裝 nfs-utils 套件。
    [root@kvm5 ~]# yum install -y nfs-utils
  2. 建立 NFS 分享目錄。
    [root@kvm5 ~]# mkdir /public /protected
  3. 修改 NFS 分享目錄的 SELinux 檔案 context。
    [root@kvm5 ~]# semanage fcontext -a -t public_content_t "/public(/.*)?"
    [root@kvm5 ~]# semanage fcontext -a -t public_content_t "/protected(/.*)?"
    [root@kvm5 ~]# restorecon -Rv /public /protected
  4. 考試時不用自行產生 kerberos keytab,只要依照指定的位置下載,存放在目錄 /etc/ 下,且檔名必須為 krb5.keytab。
    [root@kvm5 ~]# wget http://deyu.wang/kvm5.keytab -O /etc/krb5.keytab 
  5. kerberos keytab 的驗證跟時間有關,server 與 client 都必須校時。
    [root@kvm5 ~]# date
    Sun Jan  7 14:50:04 CST 2018
    [root@kvm5 ~]# chronyc -a makestep
    200 OK
    200 OK
    [root@kvm5 ~]# date
    Mon Nov 20 15:53:22 CST 2017
  6. 在 /protected 下建立次目錄 restricted,並將其擁有者設定為 deyu3,讓 deyu3 可以寫入資料。
    [root@kvm5 ~]# mkdir -p  /protected/restricted 
    [root@kvm5 ~]# chown deyu3 /protected/restricted 
  7. 編輯設定檔 /etc/exports,分享 /protected 及 /public 兩個目錄給網域 192.168.122.0/24。
    [root@kvm5 ~]# echo '/protected 192.168.122.0/24(rw,sync,sec=krb5p)' > /etc/exports
    [root@kvm5 ~]# echo '/public 192.168.122.0/24(ro,sync)' >> /etc/exports
    [root@kvm5 ~]# vim /etc/exports
    [root@kvm5 ~]# cat /etc/exports
    /protected 192.168.122.0/24(rw,sync,sec=krb5p)
    /public 192.168.122.0/24(ro,sync)

  8. NFS 掛載參數說明如下,詳細說明請參考 man 5 nfs 手冊。
    1. rw:read-write,可讀寫的權限;
    2. ro:read-only,唯讀的權限;
    3. sec=mode:安全認證模式;
      1. sec=sys 預設,使用本地 UNIX UIDs 及 GIDs 進行身份認證。
      2. sec=krb5 使用 Kerberos V5 取代本地 UNIX UIDs 及 GIDs 進行身份認證。
      3. sec=krb5i 使用 Kerberos V5 進行身份認證,資料完整性檢查,以防止數據被篡改。
      4. sec=krb5p 使用 Kerberos V5 進行身份認證,資料完整性檢查及 NFS 傳輸加密,以防止數據被篡改,這是最安全的方式。
    4. sync:資料同步寫入到記憶體與硬碟當中;
    [root@kvm5 ~]# man 5 nfs 
  9. 設定使用 4.2 版本,以匯出分享 SELinux context。無適合的版本 client 端掛載時會出現 mount.nfs: Protocol not supported 的訊息。
    [root@kvm5 ~]# vim /etc/sysconfig/nfs  sed -i 's/^\(RPCNFSDARGS=\).*$/\1\"-V 4.2\"/' /etc/sysconfig/nfs 
    [root@kvm5 ~]# grep ^RPCNFSDARGS /etc/sysconfig/nfs  RPCNFSDARGS="-V 4.2" 
  10. 設定開機啟動 nfs 服務,NFS server 端的服務為 nfs-server 及 nfs-secure-server,本版本只要啟動 nfs-server 就同時啟動 nfs-secure-server,而且使用 tab 鍵也不會出現 nfs-secure-server 服務,但有些版本則是兩者分開,必須確認是不是兩種服務都啟動。
    [root@kvm5 ~]# systemctl enable nfs-server.service nfs-secure-server.service 
  11. 啟動 nfs 服務
    [root@kvm5 ~]# systemctl start nfs-server.service nfs-secure-server.service 
  12. 查看目前啟動的 nfs 版本,因 server 指定使用 4.2,若出現 -4.2 表示 nfs server 沒有成功啟動。
    [root@kvm5 ~]# cat /proc/fs/nfsd/versions -2 +3 +4 +4.1 +4.2 
  13. 要確定 nfs-secure-server nfs-server 服務都正常運作。
    [root@kvm5 ~]# systemctl status nfs-secure-server.service nfs-server.service 
    nfs-secure-server.service - Secure NFS Server
       Loaded
    : loaded (/usr/lib/systemd/system/nfs-secure-server.service; enabled)
       Active
    : active (running) since Mon 2015-09-21 20:04:10 CST; 8s ago
      Process
    : 3075 ExecStart=/usr/sbin/rpc.svcgssd $RPCSVCGSSDARGS (code=exited, status=0/SUCCESS)
     Main PID
    : 3077 (rpc.svcgssd)
       CGroup
    : /system.slice/nfs-secure-server.service
               └─
    3077 /usr/sbin/rpc.svcgssd

    Sep 
    21 20:04:10 kvm5.deyu.wang systemd[1]: Starting Secure NFS Server
    Sep 
    21 20:04:10 kvm5.deyu.wang systemd[1]: Started Secure NFS Server.

    nfs
    -server.service - NFS Server
       Loaded
    : loaded (/usr/lib/systemd/system/nfs-server.service; enabled)
       Active
    : active (exited) since Mon 2015-09-21 20:04:10 CST; 8s ago
      Process
    : 3078 ExecStopPost=/usr/sbin/exportfs -f (code=exited, status=0/SUCCESS)
      Process
    : 3076 ExecStop=/usr/sbin/rpc.nfsd 0 (code=exited, status=0/SUCCESS)
      Process
    : 3087 ExecStart=/usr/sbin/rpc.nfsd $RPCNFSDARGS $RPCNFSDCOUNT (code=exited, status=0/SUCCESS)
      Process
    : 3084 ExecStartPre=/usr/sbin/exportfs -r (code=exited, status=0/SUCCESS)
      Process
    : 3083 ExecStartPre=/usr/libexec/nfs-utils/scripts/nfs-server.preconfig (code=exited, status=0/SUCCESS)
     Main PID
    : 3087 (code=exited, status=0/SUCCESS)
       CGroup
    : /system.slice/nfs-server.service

    Sep 
    21 20:04:10 kvm5.deyu.wang systemd[1]: Starting NFS Server
    Sep 
    21 20:04:10 kvm5.deyu.wang systemd[1]: Started NFS Server.
  14. 建議不論是否 TAB 有沒有出現提示,都同時啟動這兩個服務。CentOS 安裝版本 nfs-utils-1.3.0-8.el7.x86_64 啟動 nfs-secure-server 出現錯誤訊息,請執行 yum downgrade nfs-utils 換成 nfs-utils-1.3.0-0.el7.x86_64 套件。
    [root@kvm5 ~]# rpm -qa | grep nfs-utils
    nfs-utils-1.3.0-8.el7.x86_64
    [root
    @kvm5 ~]# yum downgrade nfs-utils -y
    [root@kvm5 ~]# rpm -qa | grep nfs-utils
    nfs-utils-1.3.0-0.el7.x86_64
  15. 再重新啟動 nfs 服務,並查看是否正常運作。
    [root@kvm5 ~]# systemctl restart nfs-server.service nfs-secure-server.service 
  16. 輸出所有設定的 nfs 分享目錄。
    [root@kvm5 ~]# exportfs -arv
    exporting 192.168.122.0/24:/public
    exporting 
    192.168.122.0/24:/protected

posted @ 2019-06-18 09:08 paulwong 閱讀(24) | 評論 (0)編輯 收藏

2019年6月14日 #

centos7 mysql數據庫安裝和配置


https://www.cnblogs.com/starof/p/4680083.html

posted @ 2019-06-14 10:24 paulwong 閱讀(20) | 評論 (0)編輯 收藏

2019年6月13日 #

SPRING BOOT 打包部署指南


https://segmentfault.com/a/1190000017386408

posted @ 2019-06-13 15:22 paulwong 閱讀(26) | 評論 (0)編輯 收藏

2019年6月12日 #

Spring batch 的高級特性--監聽,異常處理,事務


https://my.oschina.net/u/2600078/blog/909346

posted @ 2019-06-12 17:03 paulwong 閱讀(23) | 評論 (0)編輯 收藏

僅列出標題  下一頁
云南11选5软件